[Assorted-commits] SF.net SVN: assorted:[1269] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-08 09:39:18
|
Revision: 1269 http://assorted.svn.sourceforge.net/assorted/?rev=1269&view=rev Author: yangzhang Date: 2009-03-08 09:39:11 +0000 (Sun, 08 Mar 2009) Log Message: ----------- - cleanup - using fast_map - added -DNDEBUG to optimized build - specialized recovery message generation - using unique_ptr instead of shared_ptr for Recovery channel - added some more notes/todos - added ghash to setup Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-07 21:30:30 UTC (rev 1268) +++ ydb/trunk/README 2009-03-08 09:39:11 UTC (rev 1269) @@ -473,8 +473,58 @@ - abysmal perf; long wait at the map dump + almost never catch up, but at least it works -- TODO speed up backlogging; don't create pb objects, just take buffers +- report for sam + - got the speed up to as fast as it'll go before 1000 + - added disk logging for workers; still need to grab numbers for the + single-node ('no replica') case + - added physical logging: slower + - adding log transfer vs. state transfer +- DONE added byte length prefixes for faster backlogging +- DONE speed up backlogging; don't create pb objects, just take buffers + + pseudocode (out of date/buggy) + r.setanchor + first_start = r.start + while true + start = r.start + headerlen = sizeof([prefix, ntxns, seqno]) + if r.unread + r.rem < headerlen + buf = new buf + buf.write([r.start..r.end]) + swap(r.buf, buf) + backlog.push(buf, first_start, start) + r.reset + first_start = r.start + prefix = r.read + ntxns = r.read + seqno = r.read + if ...seqno... + if r.rem < prefix - headerlen + buf = new buf + buf.write([prefix, ntxns, seqno] + [r.start..r.end]) + swap(r.buf, buf) + backlog.push(buf, first_start, start) + r.reset + first_start = r.start + assert r.rem >= prefix - headerlen + check0 r.accum(prefix - headerlen) + +- DONE notify process_txns to "flush" to backlog (caught up) + +- DONE pb_types -> pb_traits, etc. + +- DONE try building and using your own map type; compare against other + containers + - built something really fast, faster than even google dense_hash_map + +- TODO experiment with large pages + +- TODO use rb instead of pb for recovery state + +- TODO test out recovery mode more thoroughly, make sure progress is being + made, see how fast it is + - TODO fix multi-recovery if necessary - TODO speed up map dump; don't use range partitioning, but hash partitioning Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-07 21:30:30 UTC (rev 1268) +++ ydb/trunk/src/Makefile 2009-03-08 09:39:11 UTC (rev 1269) @@ -29,7 +29,7 @@ PPROF := -lprofiler endif ifneq ($(OPT),) - OPT := -O3 -Wdisabled-optimization + OPT := -O3 -Wdisabled-optimization -DNDEBUG else OPT := -g3 endif Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-07 21:30:30 UTC (rev 1268) +++ ydb/trunk/src/main.lzz.clamp 2009-03-08 09:39:11 UTC (rev 1269) @@ -8,6 +8,8 @@ #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> #include <boost/tuple/tuple.hpp> +#include <boost/unique_ptr.hpp> +#include <commons/fast_map.h> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -52,9 +54,10 @@ //#define map_t unordered_map //#define map_t map -#define map_t dense_hash_map -typedef pair<int, int> pii; +//#define map_t dense_hash_map +#define map_t fast_map typedef map_t<int, int> mii; +typedef pair<int, int> pii; typedef tuple<sized_array<char>, char*, char*> chunk; @@ -63,6 +66,10 @@ map.set_empty_key(-1); map.set_deleted_key(-2); } +template<> void init_map(fast_map<int, int> &map) { + map.set_empty_key(-1); + map.set_deleted_key(-2); +} // Configuration. st_utime_t timeout; @@ -731,7 +738,7 @@ */ template<typename Types, typename RTypes> void -process_txn(mii&map, const typename Types::Txn &txn, int &seqno, +process_txn(mii &map, const typename Types::Txn &txn, int &seqno, typename RTypes::Response *res) { typedef typename Types::Txn Txn; @@ -821,23 +828,6 @@ } #end -template<typename Txn> inline shared_ptr<pb::Txn> to_pb_Txn(Txn txn); -template<> inline shared_ptr<pb::Txn> to_pb_Txn(pb::Txn txn) { - return shared_ptr<pb::Txn>(new pb::Txn(txn)); -} -template<> inline shared_ptr<pb::Txn> to_pb_Txn(msg::Txn txn) { - shared_ptr<pb::Txn> ptxn(new pb::Txn()); - ptxn->set_seqno(txn.seqno()); - for (int o = 0; o < txn.op_size(); ++o) { - pb::Op *pop = ptxn->add_op(); - const msg::Op &op = txn.op(o); - pop->set_type(static_cast<Op_OpType>(op.type())); - pop->set_key(op.key()); - pop->set_value(op.value()); - } - return ptxn; -} - /** * Actually do the work of executing a transaction and sending back the reply. * @@ -866,7 +856,7 @@ template<typename Types, typename RTypes> void process_txns(st_netfd_t leader, mii &map, int &seqno, - st_channel<shared_ptr<Recovery> > &send_states, + st_channel<unique_ptr<Recovery> > &send_states, /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) @@ -906,7 +896,7 @@ showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); } - __ref(send_states).push(shared_ptr<Recovery>()); + __ref(send_states).push(unique_ptr<Recovery>()); __ref(w).mark_and_flush(); st_sleep(1); }); @@ -1038,30 +1028,9 @@ } } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." - // TODO make this faster - cout << "generating recovery..." << endl; - shared_ptr<Recovery> recovery(new Recovery); - typedef ::map<int, int> mii_; - mii_ map_(map.begin(), map.end()); - mii_::const_iterator begin = - map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); - mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? - map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); - cout << "generating recovery over " << begin->first << ".." - << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); - if (multirecover) - cout << " (node " << mypos << " of " << nnodes << ")"; - cout << endl; - long long start_snap = current_time_millis(); - foreach (const pii &p, make_iterator_range(begin, end)) { - Recovery_Pair *pair = recovery->add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - cout << "generating recovery took " - << current_time_millis() - start_snap << " ms" << endl; + unique_ptr<Recovery> recovery(make_recovery(map, mypos, nnodes)); recovery->set_seqno(seqno); - send_states.push(recovery); + send_states.push(boost::move(recovery)); } } } catch (break_exception &ex) { @@ -1069,6 +1038,33 @@ } +template<typename mii> +unique_ptr<Recovery> make_recovery(const mii &map, int mypos, int nnodes) { + // TODO make this faster + cout << "generating recovery..." << endl; + unique_ptr<Recovery> recovery(new Recovery); + typedef ::map<int, int> mii_; + mii_ map_(map.begin(), map.end()); + mii_::const_iterator begin = + map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); + mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? + map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); + cout << "generating recovery over " << begin->first << ".." + << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); + if (multirecover) + cout << " (node " << mypos << " of " << nnodes << ")"; + cout << endl; + long long start_snap = current_time_millis(); + foreach (const pii &p, make_iterator_range(begin, end)) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + cout << "generating recovery took " + << current_time_millis() - start_snap << " ms" << endl; + return boost::move(recovery); +} + class response_handler { public: @@ -1100,7 +1096,7 @@ commons::array<char> rbuf(read_buf_size), wbuf(buf_size); st_reader reader(replica, rbuf.get(), rbuf.size()); writer w(lambda(const void*, size_t) { - throw operation_not_supported("response handler should not be writing"); + throw not_supported_exception("response handler should not be writing"); }, wbuf.get(), wbuf.size()); stream s(reader,w); @@ -1258,10 +1254,10 @@ */ void recover_joiner(st_netfd_t listener, - st_channel<shared_ptr<Recovery> > &send_states) + st_channel<unique_ptr<Recovery> > &send_states) { st_netfd_t joiner; - shared_ptr<Recovery> recovery; + unique_ptr<Recovery> recovery; { st_intr intr(stop_hub); // Wait for the snapshot. @@ -1441,7 +1437,7 @@ } } }); - st_channel<shared_ptr<Recovery> > send_states; + st_channel<unique_ptr<Recovery> > send_states; cout << "starting as replica on port " << listen_port << endl; @@ -1551,7 +1547,7 @@ commons::array<char> rbuf(0), wbuf(buf_size); reader reader(nullptr, rbuf.get(), rbuf.size()); writer writer(lambda(const void*, size_t) { - throw operation_not_supported("should not be writing responses during catch-up phase"); + throw not_supported_exception("should not be writing responses during catch-up phase"); }, wbuf.get(), wbuf.size()); stream s(reader, writer); TxnBatch batch(s); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-07 21:30:30 UTC (rev 1268) +++ ydb/trunk/src/ser.h 2009-03-08 09:39:11 UTC (rev 1269) @@ -34,13 +34,13 @@ } #define EXPAND_PB \ - bool AppendToString(string*) const { throw_operation_not_supported(); } \ - bool SerializeToArray(void*, int) const { throw_operation_not_supported(); } \ - bool SerializeToString(string*) const { throw_operation_not_supported(); } \ - bool SerializeToOstream(ostream*) const { throw_operation_not_supported(); } \ - bool ParseFromArray(void*, int) { throw_operation_not_supported(); } \ - int GetCachedSize() const { throw_operation_not_supported(); } \ - int ByteSize() const { throw_operation_not_supported(); } \ + bool AppendToString(string*) const { throw_not_supported(); } \ + bool SerializeToArray(void*, int) const { throw_not_supported(); } \ + bool SerializeToString(string*) const { throw_not_supported(); } \ + bool SerializeToOstream(ostream*) const { throw_not_supported(); } \ + bool ParseFromArray(void*, int) { throw_not_supported(); } \ + int GetCachedSize() const { throw_not_supported(); } \ + int ByteSize() const { throw_not_supported(); } \ #define MAKE_TYPE_BATCH(name, ns, b) \ struct name##_traits { \ Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-07 21:30:30 UTC (rev 1268) +++ ydb/trunk/tools/test.bash 2009-03-08 09:39:11 UTC (rev 1269) @@ -225,6 +225,7 @@ parremote node-setup-bison parremote node-setup-clamp parremote node-setup-gtest + parremote node-setup-ghash } setup-ydb() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |