[Assorted-commits] SF.net SVN: assorted:[1211] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-20 19:44:26
|
Revision: 1211 http://assorted.svn.sourceforge.net/assorted/?rev=1211&view=rev Author: yangzhang Date: 2009-02-20 19:44:17 +0000 (Fri, 20 Feb 2009) Log Message: ----------- - falling back to pb for 0-node cases - removed swallower - using google dense_hash_map - optimized/simplified the function pointers - fixed the accidental total exclusion of local (0-node) processing - using operation_not_supported exceptions Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-20 19:37:33 UTC (rev 1210) +++ ydb/trunk/README 2009-02-20 19:44:17 UTC (rev 1211) @@ -388,10 +388,17 @@ - 3: 122K/122K/97K - DONE commit - DONE add zero-copy structs/(de-)serialization + - -1: 245K (same as before; actually using pb's) + - 0: 320K (same as before; actually using pb's) + - 1: 300K + - 2: 300K + - 3: 300K Period 2/17- - DONE removed class outstream +- TODO get raw-buffer working in wal, 0-node +- TODO add raw-buffer versions of the response message classes as well - TODO refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) @@ -402,8 +409,15 @@ - TODO implement new recovery (add buffer swapping, add buffers to a list) - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) -- TODO google dense hash map +- DONE google dense hash map + - big improvement, again not in the direction we'd like + - 0: 550K + - 1: 490K + - 2: 485K + - 3: 475K +- TODO reuse the serialization buffer in the pb path of ydb + - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-20 19:37:33 UTC (rev 1210) +++ ydb/trunk/src/main.lzz.clamp 2009-02-20 19:44:17 UTC (rev 1211) @@ -63,13 +63,16 @@ check(msg.ParseFromArray(buf, len)); #end -#define map_t unordered_map +//#define map_t unordered_map //#define map_t map -//#define map_t dense_hash_map +#define map_t dense_hash_map typedef pair<int, int> pii; typedef map_t<int, int> mii; typedef string ser_t; +template<typename T> void init_map(T &map) {} +template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); map.set_deleted_key(-2); } + // Configuration. st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, @@ -550,23 +553,17 @@ }); reader r(nullptr); - //function<void(const void*, size_t)> fn = use_wal ? - // lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); } : - // lambda(const void *buf, size_t len) { - // }; - //if (use_wal) fn = lambda(const void *buf, size_t len) {}; - //else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; - // TODO why doesn't this work? - // else fn = boost::bind(&wal::logbuf, g_wal); + function<void(const void*, size_t)> fn; + if (use_wal) + fn = boost::bind(&wal::logbuf, g_wal, _1, _2); + else + fn = lambda(const void *buf, size_t len) { + foreach (st_netfd_t dst, __ref(fds)) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }; - writer w(lambda(const void *buf, size_t len) { - if (__ref(use_wal)) - g_wal->logbuf(buf, len); - else - foreach (st_netfd_t dst, __ref(fds)) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - }, buf_size); + writer w(fn, buf_size); stream s(r,w); scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); TxnBatch batch = *pbatch; @@ -651,7 +648,7 @@ if (batch.txn_size() == 0) w.reset(); // Broadcast. - if (!fds.empty() && !suppress_txn_msgs) { + if (Types::is_pb() && !fds.empty() && !suppress_txn_msgs) { bcastmsg(fds, batch); } else if (use_wal) { g_wal->log(batch); @@ -846,7 +843,9 @@ st_reader reader(leader); vector<st_netfd_t> leader_v(1, leader); - writer w(lambda(const void*, size_t) { throw std::exception(); }, buf_size); + writer w(lambda(const void*, size_t) { + throw operation_not_supported("process_txns should not be writing"); + }, buf_size); stream s(reader, w); try { @@ -1184,9 +1183,9 @@ const function<void()> f = use_pb ? bind(issue_txns<pb_types>, ref(newreps), ref(seqno), ref(accept_joiner)) : bind(issue_txns<rb_types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); + st_thread_t issue_txns_thread = my_spawn(f, "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_swallower(swallower); + st_joining join_issue_txns(issue_txns_thread); finally fin(lambda () { cout << "LEADER SUMMARY" << endl; @@ -1264,7 +1263,7 @@ } // Initialize database state. - mii map; + mii &map = g_map; int seqno = -1; finally f(lambda () { cout << "REPLICA SUMMARY" << endl; @@ -1586,7 +1585,7 @@ check(max_ops > 0); check(max_ops >= min_ops); - if (minreps == 0) use_pb = true; // XXX + if (minreps == 0 && !use_wal) use_pb = true; // XXX } catch (std::exception &ex) { cerr << ex.what() << endl << endl << desc << endl; return 1; @@ -1663,6 +1662,9 @@ } }); + // Initialize the map. + init_map(g_map); + // Which role are we? if (is_leader) { run_leader(minreps, leader_port); @@ -1682,5 +1684,4 @@ * Compile-time options: * * - map, unordered_map, dense_hash_map - * - SERIALIZATION METHOD */ Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-20 19:37:33 UTC (rev 1210) +++ ydb/trunk/src/ser.h 2009-02-20 19:44:17 UTC (rev 1211) @@ -2,6 +2,7 @@ #define YDB_MSG_H #include <commons/array.h> +#include <commons/exceptions.h> #include <commons/st/st.h> #include <iomanip> #include <iostream> @@ -36,7 +37,7 @@ assert(size_t(p - mark_ + n) <= a_.size()); flush(); size_t diff = mark_ - a_.get(); - memmove(a_.get(), mark_, diff); + memmove(a_.get(), mark_, p_ - mark_); unsent_ = mark_ = a_.get(); p_ -= diff; p -= diff; @@ -154,12 +155,12 @@ return ntxn_; } const Txn &txn(int t) const { txn_.Clear(); return txn_; } - bool AppendToString(string *s) const { throw std::exception(); } - bool SerializeToString(string *s) const { throw std::exception(); } - bool SerializeToOstream(ostream *s) const { throw std::exception(); } - bool ParseFromArray(void *p, size_t len) { throw std::exception(); } - size_t GetCachedSize() const { throw std::exception(); } - size_t ByteSize() const { throw std::exception(); } + bool AppendToString(string *s) const { throw_operation_not_supported(); } + bool SerializeToString(string *s) const { throw_operation_not_supported(); } + bool SerializeToOstream(ostream *s) const { throw_operation_not_supported(); } + bool ParseFromArray(void *p, size_t len) { throw_operation_not_supported(); } + size_t GetCachedSize() const { throw_operation_not_supported(); } + size_t ByteSize() const { throw_operation_not_supported(); } }; template<typename T> void start_txn(T &batch); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |