[Assorted-commits] SF.net SVN: assorted:[1277] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-09 23:00:13
|
Revision: 1277 http://assorted.svn.sourceforge.net/assorted/?rev=1277&view=rev Author: yangzhang Date: 2009-03-09 22:59:57 +0000 (Mon, 09 Mar 2009) Log Message: ----------- - fast memcpy recovery serialization with direct access to fast_map table - using recovery_t = array<entry> instead of Recovery - using c++0x unique_ptr/move instead of c++03/boost - using raw_reader/raw_writer - added line-counts to test.bash - adde/updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/README 2009-03-09 22:59:57 UTC (rev 1277) @@ -518,18 +518,16 @@ containers - built something really fast, faster than even google dense_hash_map -- TODO experiment with large pages +- DONE use rb instead of pb for recovery state -- TODO use rb instead of pb for recovery state - - TODO test out recovery mode more thoroughly, make sure progress is being made, see how fast it is -- TODO fix multi-recovery if necessary +- DONE fix multi-recovery if necessary -- TODO speed up map dump; don't use range partitioning, but hash partitioning +- DONE speed up map dump; don't use range partitioning, but hash partitioning -- TODO refactor st_reader, etc. to be generic opportunistic buffered readers +- DONE refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) - TODO try making a streambuf for st_write, then try it in conj with @@ -538,12 +536,12 @@ - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) -- TODO batch up the responses until they make large-enough buffer in pb mode - - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore +- TODO experiment with libhugetlbfs + - TODO remove extraneous copies; use custom buffer-backed data structures designed for serialization/deserialization - TODO flushing Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/src/main.lzz.clamp 2009-03-09 22:59:57 UTC (rev 1277) @@ -8,12 +8,13 @@ #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> #include <boost/tuple/tuple.hpp> -#include <boost/unique_ptr.hpp> #include <commons/fast_map.h> +#include <commons/memory.h> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> #include <commons/time.h> +#include <commons/unique_ptr.h> #include <csignal> // sigaction etc. #include <cstdio> #include <cstring> // strsignal @@ -57,10 +58,13 @@ //#define map_t dense_hash_map #define map_t fast_map typedef map_t<int, int> mii; -typedef pair<int, int> pii; +typedef mii::value_type entry; typedef tuple<sized_array<char>, char*, char*> chunk; +//typedef unique_ptr<Recovery> recovery_t; +typedef commons::array<char> recovery_t; + template<typename T> void init_map(T &map) {} template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); @@ -637,7 +641,7 @@ // Generate some random transactions. start_txn(batch); - for (int t = 0; t < batch_size; ++t) { + for (int t = 0; t < batch_size && !stop_hub; ++t) { char *txn_start = w.cur(); Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); @@ -856,7 +860,7 @@ template<typename Types, typename RTypes> void process_txns(st_netfd_t leader, mii &map, int &seqno, - st_channel<unique_ptr<Recovery> > &send_states, + st_channel<recovery_t> &send_states, /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) @@ -896,7 +900,7 @@ showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); } - __ref(send_states).push(unique_ptr<Recovery>()); + __ref(send_states).push(recovery_t()); __ref(w).mark_and_flush(); st_sleep(1); }); @@ -971,9 +975,10 @@ if (reader.unread() + reader.rem() < prefix + sizeof(uint32_t) - headerlen) { // Move current partial message to new buffer. sized_array<char> tmp(new char[read_buf_size], read_buf_size); - *reinterpret_cast<uint32_t*>(tmp.get()) = prefix; - *reinterpret_cast<short*>(tmp.get() + sizeof(uint32_t)) = short(batch.txn_size()); - *reinterpret_cast<int*>(tmp.get() + sizeof(uint32_t) + sizeof(short)) = first_txn.seqno(); + raw_writer ser(tmp.get()); + ser.write(prefix); + ser.write(short(batch.txn_size())); + ser.write(first_txn.seqno()); memcpy(tmp.get() + headerlen, reader.start(), reader.unread()); // Swap the buffers. @@ -1028,9 +1033,7 @@ } } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." - unique_ptr<Recovery> recovery(make_recovery(map, mypos, nnodes)); - recovery->set_seqno(seqno); - send_states.push(boost::move(recovery)); + send_states.push(make_recovery(map, mypos, nnodes, seqno)); } } } catch (break_exception &ex) { @@ -1038,8 +1041,11 @@ } +#if 0 template<typename mii> -unique_ptr<Recovery> make_recovery(const mii &map, int mypos, int nnodes) { +unique_ptr<Recovery> +make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +{ // TODO make this faster cout << "generating recovery..." << endl; unique_ptr<Recovery> recovery(new Recovery); @@ -1062,9 +1068,55 @@ } cout << "generating recovery took " << current_time_millis() - start_snap << " ms" << endl; - return boost::move(recovery); + recovery->set_seqno(seqno); + return move(recovery); } +#endif +template<typename mii> +recovery_t +make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +{ + return recovery_t(); +} + +struct recovery_header +{ + int seqno; + size_t count; + size_t total; + size_t size; +}; + +pair<size_t, size_t> +recovery_range(size_t size, int mypos, int nnodes) +{ + return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, + multirecover ? size * (mypos + 1) / size_t(nnodes) : size); +} + +template<> +recovery_t +make_recovery(const fast_map<int, int> &map, int mypos, int nnodes, int &seqno) +{ + const commons::array<entry> &src = map.get_table(); + pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); + size_t begin = range.first, end = range.second; + assert(end > begin); + recovery_header hdr = { seqno, end - begin, src.size(), map.size() }; + size_t bodylen = sizeof(entry) * hdr.count; + cout << "generating recovery of " << hdr.size << " records in " + << hdr.count << " slots (" + << bodylen << " bytes); range is [" + << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; + commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); + raw_writer ser(recovery.begin()); + ser.write(recovery.size()); + ser.write(hdr); + memcpy(ser.ptr(), src.begin() + begin, bodylen); + return recovery; +} + class response_handler { public: @@ -1254,10 +1306,10 @@ */ void recover_joiner(st_netfd_t listener, - st_channel<unique_ptr<Recovery> > &send_states) + st_channel<recovery_t> &send_states) { st_netfd_t joiner; - unique_ptr<Recovery> recovery; + recovery_t recovery; { st_intr intr(stop_hub); // Wait for the snapshot. @@ -1272,9 +1324,16 @@ st_closing closing(joiner); cout << "got joiner's connection, sending recovery of " - << recovery->pair_size() << " records" << endl; + << recovery.size() << " bytes" << endl; + long long start_time = current_time_millis(); + checkeqnneg(st_write(joiner, recovery.get(), recovery.size(), + ST_UTIME_NO_TIMEOUT), + ssize_t(recovery.size())); + long long diff = current_time_millis() - start_time; +#if 0 sendmsg(joiner, *recovery); - cout << "sent recovery" << endl; +#endif + cout << "sent recovery in " << diff << " ms" << endl; } void @@ -1353,7 +1412,7 @@ cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, g_map) { + foreach (const entry &p, g_map) { of << p.first << ": " << p.second << endl; } } @@ -1432,12 +1491,12 @@ cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, __ref(map)) { + foreach (const entry &p, __ref(map)) { of << p.first << ": " << p.second << endl; } } }); - st_channel<unique_ptr<Recovery> > send_states; + st_channel<recovery_t> send_states; cout << "starting as replica on port " << listen_port << endl; @@ -1502,9 +1561,61 @@ vector<st_thread_t> recovery_builders; assert(seqno == -1); + bool first = true; for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message. + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + + long long tm = current_time_millis(); + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + cout << "got recovery message of " << len << " bytes (" + << hdr.size << " records in " << hdr.count << " slots) in " + << tm - __ref(before_recv) << " ms; now at seqno " + << hdr.seqno << endl; +#if 0 Recovery recovery; long long receive_start = 0, receive_end = 0; size_t len = 0; @@ -1533,6 +1644,7 @@ << " ms; built up map of " << recovery.pair_size() << " records in " << build_end - build_start << " ms; now at seqno " << seqno << endl; +#endif }, "recovery_builder" + lexical_cast<string>(i))); } foreach (st_thread_t t, recovery_builders) { Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/tools/test.bash 2009-03-09 22:59:57 UTC (rev 1277) @@ -527,6 +527,11 @@ hostargs p2-helper } +line-counts() { + wc -l "$(dirname "$0")/../src/"{main.lzz.clamp,ser.{h,cc},p2.cc,ydb.proto,Makefile,serperf.cc} \ + ~/ccom/src/{commons/{,st/}*.h,test/{*.*,Makefile}} +} + # # Main # This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |