[Assorted-commits] SF.net SVN: assorted:[1202] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-19 08:30:23
|
Revision: 1202 http://assorted.svn.sourceforge.net/assorted/?rev=1202&view=rev Author: yangzhang Date: 2009-02-19 08:30:19 +0000 (Thu, 19 Feb 2009) Log Message: ----------- - simplified bcast-switching logic - removed REUSE_SER logic; pointless to keep orig - tried adding aspectc++, no work - removed outstream, replaced with lambdas (and moved into ser.cc) Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/Makefile 2009-02-19 08:30:19 UTC (rev 1202) @@ -36,6 +36,7 @@ ifneq ($(PB),) PB := -DUSE_PB endif +# CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) CXX := $(WTF) $(CXX) LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/main.lzz.clamp 2009-02-19 08:30:19 UTC (rev 1202) @@ -48,7 +48,6 @@ using ydb::msg::reader; using ydb::msg::writer; using ydb::msg::stream; -using ydb::msg::outstream; using ydb::pb::ResponseBatch; using ydb::pb::Response; using ydb::pb::Recovery; @@ -383,7 +382,7 @@ */ template<typename T> void -bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) +bcastmsg_sync(const vector<st_netfd_t> &dsts, const T &msg) { ser_t s; ser(s, msg); @@ -395,22 +394,26 @@ } /** - * Send a message to a single recipient. + * Send a message to some destinations, using whichever method of network IO + * was chosen (sync or async). */ template<typename T> void -sendmsg(st_netfd_t dst, const T &msg) +bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) { - vector<st_netfd_t> dsts(1, dst); - bcastmsg(dsts, msg); + if (use_bcast_async) bcastmsg_async(dsts, msg); + else bcastmsg_sync(dsts, msg); } +/** + * Send a message to a single recipient. + */ template<typename T> void -sendmsg_async(st_netfd_t dst, const T &msg) +sendmsg(st_netfd_t dst, const T &msg) { vector<st_netfd_t> dsts(1, dst); - bcastmsg_async(dsts, msg); + bcastmsg(dsts, msg); } /** @@ -528,15 +531,6 @@ mii g_map; wal *g_wal; -// Function pointer types. -typedef void (*bcasttxn_t)(const vector<st_netfd_t> &dsts, const TxnBatch &msg); -bcasttxn_t bcasttxn_async = bcastmsg_async<TxnBatch>; -bcasttxn_t bcasttxn_sync = bcastmsg<TxnBatch>; - -typedef void (*sendres_t)(st_netfd_t dst, const ResponseBatch &msg); -sendres_t sendres_async = sendmsg_async<ResponseBatch>; -sendres_t sendres_sync = sendmsg<ResponseBatch>; - /** * Keep issuing transactions to the replicas. */ @@ -544,46 +538,47 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { - bcasttxn_t bcast = use_bcast_async ? bcasttxn_async : bcasttxn_sync; - st_thread_t bcaster_thread = bcast == bcasttxn_async ? - my_spawn(bcaster, "bcaster") : nullptr; Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); finally f(lambda () { - if (__ref(bcaster_thread) != nullptr) st_join(__ref(bcaster_thread)); showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); }); reader r(nullptr); - outstream os(fds); - function<void(const void*, size_t)> fn; - if (use_wal) fn = os; - else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; + //function<void(const void*, size_t)> fn = use_wal ? + // lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); } : + // lambda(const void *buf, size_t len) { + // }; + //if (use_wal) fn = lambda(const void *buf, size_t len) {}; + //else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; // TODO why doesn't this work? // else fn = boost::bind(&wal::logbuf, g_wal); - writer w(fn, buf_size); + + writer w(lambda(const void *buf, size_t len) { + if (__ref(use_wal)) + g_wal->logbuf(buf, len); + else + foreach (st_netfd_t dst, __ref(fds)) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); + }, buf_size); stream s(r,w); TxnBatch batch NPBONLY((s)); for (int t = 0; t < batch_size; ++t) batch.add_txn(); while (!stop_hub) { w.mark(); -#ifdef REUSE_SER batch.Clear(); -#else - TxnBatch batch; -#endif // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcast(fds, batch); + bcastmsg(fds, batch); } else { sendmsg(fds[0], batch); } @@ -654,7 +649,7 @@ // Broadcast. #ifdef USE_PB if (!fds.empty() && !suppress_txn_msgs) { - bcast(fds, batch); + bcastmsg(fds, batch); } else if (use_wal) { g_wal->log(batch); } else if (force_ser) { @@ -663,6 +658,9 @@ } #endif + if (fds.empty()) + w.reset(); + // Pause? if (do_pause) do_pause.waitreset(); @@ -677,12 +675,9 @@ NPBONLY(txn.start_op()); NPBONLY(txn.fin_op()); NPBONLY(batch.fin_txn()); - PBONLY(bcast(fds, batch)); + PBONLY(bcastmsg(fds, batch)); w.mark(); w.flush(); - if (bcaster_thread != nullptr) { - msgs.push(make_pair(nullptr, shared_ptr<string>())); - } } /** @@ -815,16 +810,7 @@ // issued more since the Init message). int first_seqno = -1; - st_thread_t bcaster_thread = use_bcast_async ? - my_spawn(bcaster, "bcaster") : nullptr; - sendres_t sendmsg = use_bcast_async ? sendres_async : sendres_sync; - finally f(lambda () { - if (__ref(bcaster_thread) != nullptr) { - msgs.push(make_pair(nullptr, shared_ptr<string>())); - st_join(__ref(bcaster_thread)); - } - long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); @@ -839,8 +825,7 @@ st_reader reader(leader); vector<st_netfd_t> leader_v(1, leader); - outstream os(leader_v); - writer w(os, buf_size); + writer w(lambda(const void*, size_t) { throw std::exception(); }, buf_size); stream s(reader, w); try { @@ -864,11 +849,7 @@ } } if (batch.txn_size() > 0) { -#ifdef REUSE_SER resbatch.Clear(); -#else - ResponseBatch resbatch; -#endif for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -1614,9 +1595,20 @@ my_spawn(memmon, "memmon"); } + // Start the message broadcaster thread, if requested. + st_thread_t bcaster_thread = use_bcast_async ? + my_spawn(bcaster, "bcaster") : nullptr; + long long start = thread_start_time = current_time_millis(); - // At the end, print thread profiling information. + + // At the end, cleanly stop the bcaster thread and print thread profiling + // information. finally f(lambda() { + if (use_bcast_async) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + st_join(__ref(bcaster_thread)); + } + if (profile_threads) { long long end = current_time_millis(); long long all = end - __ref(start); @@ -1657,7 +1649,6 @@ /* * Compile-time options: * - * - REUSE_SER * - map, unordered_map, dense_hash_map * - SERIALIZATION METHOD */ Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/ser.cc 2009-02-19 08:30:19 UTC (rev 1202) @@ -5,7 +5,6 @@ using ydb::msg::reader; using ydb::msg::writer; using ydb::msg::stream; -using ydb::msg::outstream; using namespace commons; using namespace std; #ifdef USE_PB @@ -16,6 +15,18 @@ const int nreps = 2; +class outstream +{ + private: + const vector<st_netfd_t> &dsts; + public: + outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} + void operator()(const void *buf, size_t len) { + foreach (st_netfd_t dst, dsts) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); + } +}; + void producer(st_netfd_t dst) { vector<st_netfd_t> dsts(1, dst); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/ser.h 2009-02-19 08:30:19 UTC (rev 1202) @@ -33,18 +33,6 @@ // TODO try to make all of the following conform to the std interfaces, if // amenable -class outstream -{ - private: - const vector<st_netfd_t> &dsts; - public: - outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} - void operator()(const void *buf, size_t len) { - foreach (st_netfd_t dst, dsts) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } -}; - class writer { private: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |