[Assorted-commits] SF.net SVN: assorted:[1181] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-15 03:20:54
|
Revision: 1181 http://assorted.svn.sourceforge.net/assorted/?rev=1181&view=rev Author: yangzhang Date: 2009-02-15 03:20:46 +0000 (Sun, 15 Feb 2009) Log Message: ----------- - rename array_view to managed_array - tried adding google dense_hash_map - added REUSE_SER macro to control whether serialized stuff is reused - refactored serialization/network code: added ser(), st_timed_write() - added --force-ser - added --fake-exec - changed wal to use same txn serialization as net rather than its own in-line serialization - added st_reader for txnbatches Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/p2.cc Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-13 20:57:48 UTC (rev 1180) +++ ydb/trunk/src/main.lzz.clamp 2009-02-15 03:20:46 UTC (rev 1181) @@ -7,7 +7,6 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> -//#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -17,6 +16,7 @@ #include <cstring> // strsignal #include <iostream> #include <fstream> // ofstream +#include <google/dense_hash_map> #include <gtest/gtest.h> #include <malloc.h> #include <map> @@ -31,9 +31,11 @@ #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref +#define REUSE_SER using namespace boost; using namespace boost::archive; using namespace commons; +using namespace google; using namespace std; using namespace testing; using namespace tr1; @@ -46,8 +48,11 @@ #end #define map_t unordered_map +//#define map_t map +//#define map_t dense_hash_map typedef pair<int, int> pii; typedef map_t<int, int> mii; +typedef string ser_t; // Configuration. st_utime_t timeout; @@ -57,7 +62,7 @@ bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, - suppress_txn_msgs, use_bcast_async, fake_bcast; + suppress_txn_msgs, use_bcast_async, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. @@ -247,12 +252,59 @@ st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; /** + * Serialization. + * + * TODO: experiment with which method is the fastest: using a string as shown + * here or computing the bytesize then allocating (or grabbing/reserving) the + * array. + */ +template<typename T> +void +ser(string &s, const T &msg) +{ + // Serialize message to a buffer. + uint32_t len; + s.append(sizeof len, '\0'); + check(msg.AppendToString(&s)); + + // Warn if the message is large. + if (s.size() > 1000000) + cout << "serializing large message of " << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + copy(plen, plen + sizeof len, s.begin()); +} + +/** + * Helper for getting the cached ByteSize of a message. + */ +template <typename T> +int +pb_size(const T &msg) { + int len = msg.GetCachedSize(); + return len == 0 ? msg.ByteSize() : len; +} + +/** + * Serialization. + */ +template<typename T> +void +ser(ostream &s, const T &msg) +{ + uint32_t len = htonl(pb_size(msg)); + s.write(reinterpret_cast<const char*>(&len), sizeof len); + check(msg.SerializeToOstream(&s)); +} + +/** * The worker that performs the actual broadcasting. */ void bcaster() { - int counter = 0; while (!kill_hub) { pair<st_netfd_t, shared_ptr<string> > pr; { @@ -264,32 +316,8 @@ if (p.get() == nullptr) break; string &s = *p.get(); - int dstno = 0; - // XXX - // foreach (st_netfd_t dst, *gdsts) { - if (!fake_bcast) { - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - - checksize(st_write(dst, s.data(), s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write #" << counter - << " of size " << s.size() - //<< " bytes to dst #" << dstno - << " bytes" - << " took " << write_time << " ms" << endl; - } - } - ++dstno; - } - ++counter; + if (!fake_bcast) + st_timed_write(dst, s.data(), s.size()); } } @@ -298,24 +326,34 @@ */ template<typename T> void -bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) +bcastmsg_async(const vector<st_netfd_t> &dsts, const T &msg) { - // Serialize message to a buffer. - uint32_t len; - shared_ptr<string> p(new string(sizeof len, '\0')); - string &s = *p.get(); - check(msg.AppendToString(&s)); + shared_ptr<string> p(new string); + ser(*p.get(), msg); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); +} - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; +/** + * Perform an st_write but warn if it took over write_thresh ms. + */ +void +st_timed_write(st_netfd_t dst, const void *buf, size_t len) +{ + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } - // Prefix the message with a four-byte length. - len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); - char *plen = reinterpret_cast<char*>(&len); - copy(plen, plen + sizeof len, s.begin()); + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); - foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() << " write of " << len + << " bytes took " << write_time << " ms" << endl; + } + } } /** @@ -325,48 +363,12 @@ void bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) { - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); - - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - int dstno = 0; - foreach (st_netfd_t dst, dsts) { - size_t resid = sizeof len; -#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) - int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); + ser_t s; + ser(s, msg); + if (!fake_bcast) { + foreach (st_netfd_t dst, dsts) { + st_timed_write(dst, s.data(), s.size()); } - if (res == -1 && errno == ETIME) { - checksize(st_write(dst, - reinterpret_cast<char*>(&len) + sizeof len - resid, - resid, - ST_UTIME_NO_TIMEOUT), - resid); - } else { - check0x(res); - } - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write to dst #" << dstno - << " took " << write_time << " ms" << endl; - } - } - checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - ++dstno; } } @@ -463,7 +465,7 @@ void readmsg(st_reader &src, T & msg) { - array_view<char> a = src.read(sizeof(uint32_t)); + managed_array<char> a = src.read(sizeof(uint32_t)); uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); check(msg.ParseFromArray(src.read(len), len)); } @@ -475,6 +477,9 @@ { public: wal() : of("wal"), out(of) {} + template <typename T> + void log(const T &msg) { ser(of, msg); } +#if 0 void del(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; @@ -487,6 +492,7 @@ int op = op_commit; out & op; } +#endif private: enum { op_del, op_write, op_commit }; ofstream of; @@ -527,15 +533,24 @@ 0); }); + TxnBatch batch; + for (int t = 0; t < batch_size; ++t) batch.add_txn(); + while (!stop_hub) { +#ifdef REUSE_SER + batch.Clear(); +#else + TxnBatch batch; +#endif + // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcast(fds, TxnBatch()); + bcast(fds, batch); } else { - sendmsg(fds[0], TxnBatch()); + sendmsg(fds[0], batch); } } // Bring in any new members. @@ -544,7 +559,6 @@ } // Generate some random transactions. - TxnBatch batch; for (int t = 0; t < batch_size; ++t) { Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); @@ -597,8 +611,14 @@ } // Broadcast. - if (!fds.empty() && !suppress_txn_msgs) + if (!fds.empty() && !suppress_txn_msgs) { bcast(fds, batch); + } else if (use_wal) { + g_wal->log(batch); + } else if (force_ser) { + string s; + ser(s, batch); + } // Pause? if (do_pause) @@ -606,7 +626,7 @@ } // This means "The End." - TxnBatch batch; + batch.Clear(); Txn &txn = *batch.add_txn(); txn.set_seqno(-1); bcast(fds, batch); @@ -622,44 +642,46 @@ void process_txn(mii &map, const Txn &txn, int &seqno, Response *res) { - wal &wal = *g_wal; + //wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); seqno = txn.seqno(); if (res != nullptr) { res->set_seqno(seqno); res->set_caught_up(true); } - for (int o = 0; o < txn.op_size(); ++o) { - const Op &op = txn.op(o); - const int key = op.key(); - mii::iterator it = map.find(key); - if (show_updates || count_updates) { - if (it != map.end()) { - if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) ++updates; - } - } - switch (op.type()) { - case Op::read: - if (res != nullptr) { - if (it == map.end()) res->add_result(0); - else res->add_result(it->second); - } - break; - case Op::write: - if (use_wal) wal.write(key, op.value()); - if (it == map.end()) map[key] = op.value(); - else it->second = op.value(); - break; - case Op::del: + if (!fake_exec) { + for (int o = 0; o < txn.op_size(); ++o) { + const Op &op = txn.op(o); + const int key = op.key(); + mii::iterator it = map.find(key); + if (show_updates || count_updates) { if (it != map.end()) { - if (use_wal) wal.del(key); - map.erase(it); + if (show_updates) cout << "existing key: " << key << endl; + if (count_updates) ++updates; } - break; + } + switch (op.type()) { + case Op::read: + if (res != nullptr) { + if (it == map.end()) res->add_result(0); + else res->add_result(it->second); + } + break; + case Op::write: + //if (use_wal) wal.write(key, op.value()); + if (it == map.end()) map[key] = op.value(); + else it->second = op.value(); + break; + case Op::del: + if (it != map.end()) { + //if (use_wal) wal.del(key); + map.erase(it); + } + break; + } } } - if (use_wal) wal.commit(); + //if (use_wal) wal.commit(); } void @@ -761,16 +783,19 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); + st_reader reader(leader); + try { + TxnBatch batch; + ResponseBatch resbatch; while (true) { - TxnBatch batch; long long before_read = -1; if (read_thresh > 0) { before_read = current_time_millis(); } { st_intr intr(stop_hub); - readmsg(leader, batch); + readmsg(reader, batch); } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -780,7 +805,11 @@ } } if (batch.txn_size() > 0) { +#ifdef REUSE_SER + resbatch.Clear(); +#else ResponseBatch resbatch; +#endif for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -876,11 +905,11 @@ finally f(boost::bind(&response_handler::cleanup, this)); st_reader reader(replica); + ResponseBatch batch; while (true) { finally f(boost::bind(&response_handler::loop_cleanup, this)); - ResponseBatch batch; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). if (last_seqno + 1 == seqno) { @@ -1095,7 +1124,6 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); - // XXX finally fin(lambda () { cout << "LEADER SUMMARY" << endl; cout << "- total updates = " << updates << endl; @@ -1408,6 +1436,8 @@ "inspection/diffing") ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), "suppress txn msgs") + ("fake-exec", po::bool_switch(&fake_exec), + "don't actually execute txns") ("fake-bcast", po::bool_switch(&fake_bcast), "when using --bcast-async, don't actually perform the socket write") ("show-updates,U", po::bool_switch(&show_updates), @@ -1420,6 +1450,8 @@ "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") ("wal", po::bool_switch(&use_wal), "enable ARIES write-ahead logging") + ("force-ser", po::bool_switch(&force_ser), + "force issue_txns to serialize its Txns") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), Modified: ydb/trunk/src/p2.cc =================================================================== --- ydb/trunk/src/p2.cc 2009-02-13 20:57:48 UTC (rev 1180) +++ ydb/trunk/src/p2.cc 2009-02-15 03:20:46 UTC (rev 1181) @@ -29,7 +29,7 @@ long long start = 0, seltime = 0, readtime = 0, writetime = 0; int selcnt = 0, readcnt = 0, writecnt = 0; -typedef array_view<char> arr; +typedef managed_array<char> arr; arr mkarr(char *p = nullptr) { return arr(p, false); } typedef unordered_map<int, int> map_t; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |