[Assorted-commits] SF.net SVN: assorted:[1178] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-13 20:57:10
|
Revision: 1178 http://assorted.svn.sourceforge.net/assorted/?rev=1178&view=rev Author: yangzhang Date: 2009-02-13 20:57:05 +0000 (Fri, 13 Feb 2009) Log Message: ----------- - added p2 - added ResponseBatch - added gch rules (but not yet incorporated into main build) - moved to g++0x - lifted break_exception - removed bcastmsg_fake - changed bcastmsg_async to queue up (dst,msg) pairs (rather than rely on a cached version of the dsts vector) - changed the core readmsg calls to use st_reader - replaced bcast_async macros with fn ptrs - reworked process_txn interface - added leader summary as well - tolerate EINTR on joiner accept - added --fake-bcast, --bcast-async Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/src/p2.cc Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/Makefile 2009-02-13 20:57:05 UTC (rev 1178) @@ -44,7 +44,7 @@ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Wno-inline -Wsynth $(CXXFLAGS) + -Wno-inline -Wsynth -std=gnu++0x $(CXXFLAGS) PBCXXFLAGS := $(OPT) -Wall -Werror $(GPROF) all: $(TARGET) @@ -73,6 +73,12 @@ %.lzz: %.lzz.clamp clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ +all.h: + fgrep '#include' main.lzz.clamp > all.h + +all.h.gch: all.h + $(COMPILE.cc) $(PBHDRS) $(OUTPUT_OPTION) $< + clean: rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h serperf @@ -89,6 +95,9 @@ ### serperf: serperf.o ydb.o - $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ + $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) # serperf.cc ydb.pb.h + +p2: p2.cc + $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/main.lzz.clamp 2009-02-13 20:57:05 UTC (rev 1178) @@ -29,6 +29,8 @@ #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH +#define shared_ptr boost::shared_ptr +#define ref boost::ref using namespace boost; using namespace boost::archive; using namespace commons; @@ -55,7 +57,7 @@ bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, - suppress_txn_msgs; + suppress_txn_msgs, use_bcast_async, fake_bcast; long long timelim, read_thresh, write_thresh; // Control. @@ -68,13 +70,14 @@ /** * Convenience function for calculating percentages. */ -double -pct(double sub, double tot) -{ - return 100 * sub / tot; -} +double pct(double sub, double tot) { return 100 * sub / tot; } /** + * Convenience class for performing long-jumping break. + */ +class break_exception : public std::exception {}; + +/** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. */ @@ -241,81 +244,30 @@ const vector<st_netfd_t> &rs_; }; -/** - * XXX - */ -template<typename T> -void -bcastmsg_fake(const vector<st_netfd_t> &dsts, const T & msg) -{ - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); +st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - int dstno = 0; - foreach (st_netfd_t dst, dsts) { - size_t resid = sizeof len; -#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) - int res = true ? 0 : st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - if (res == -1 && errno == ETIME) { - checksize(st_write(dst, - reinterpret_cast<char*>(&len) + sizeof len - resid, - resid, - ST_UTIME_NO_TIMEOUT), - resid); - } else { - check0x(res); - } - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write to dst #" << dstno - << " took " << write_time << " ms" << endl; - } - } - if (false) - checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - ++dstno; - } -} - -st_channel<shared_ptr<string> > msgs; - -const vector<st_netfd_t> *gdsts; - /** - * XXX + * The worker that performs the actual broadcasting. */ void bcaster() { int counter = 0; while (!kill_hub) { - shared_ptr<string> p; + pair<st_netfd_t, shared_ptr<string> > pr; { st_intr intr(kill_hub); - p = msgs.take(); + pr = msgs.take(); } + st_netfd_t dst = pr.first; + shared_ptr<string> &p = pr.second; if (p.get() == nullptr) break; string &s = *p.get(); int dstno = 0; - foreach (st_netfd_t dst, *gdsts) { + // XXX + // foreach (st_netfd_t dst, *gdsts) { + if (!fake_bcast) { long long before_write = -1; if (write_thresh > 0) { before_write = current_time_millis(); @@ -330,7 +282,8 @@ cout << "thread " << threadname() << ": write #" << counter << " of size " << s.size() - << " bytes to dst #" << dstno + //<< " bytes to dst #" << dstno + << " bytes" << " took " << write_time << " ms" << endl; } } @@ -341,14 +294,12 @@ } /** - * XXX + * Asynchronous version of the broadcaster. */ template<typename T> void bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) { - gdsts = &dsts; - // Serialize message to a buffer. uint32_t len; shared_ptr<string> p(new string(sizeof len, '\0')); @@ -362,13 +313,11 @@ // Prefix the message with a four-byte length. len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); char *plen = reinterpret_cast<char*>(&len); - for (size_t i = 0; i < sizeof len; ++i) - s[i] = plen[i]; + copy(plen, plen + sizeof len, s.begin()); - msgs.push(p); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); } - /** * Send a message to some destinations (sequentially). */ @@ -432,6 +381,14 @@ bcastmsg(dsts, msg); } +template<typename T> +void +sendmsg_async(st_netfd_t dst, const T &msg) +{ + vector<st_netfd_t> dsts(1, dst); + bcastmsg_async(dsts, msg); +} + /** * Read a message. This is done in two steps: first by reading the length * prefix, then by reading the actual body. This function also provides a way @@ -476,7 +433,7 @@ char buf[len]; GETMSG(buf); } else { - cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + //cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; scoped_array<char> buf(new char[len]); GETMSG(buf.get()); } @@ -499,6 +456,19 @@ } /** + * Same as the above readmsg() but uses an st_reader instead of a raw + * st_netfd_t. + */ +template <typename T> +void +readmsg(st_reader &src, T & msg) +{ + array_view<char> a = src.read(sizeof(uint32_t)); + uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); + check(msg.ParseFromArray(src.read(len), len)); +} + +/** * ARIES write-ahead log. No undo logging necessary (no steal). */ class wal @@ -527,6 +497,15 @@ mii g_map; wal *g_wal; +// Function pointer types. +typedef void (*bcasttxn_t)(const vector<st_netfd_t> &dsts, const TxnBatch &msg); +bcasttxn_t bcasttxn_async = bcastmsg_async<TxnBatch>; +bcasttxn_t bcasttxn_sync = bcastmsg<TxnBatch>; + +typedef void (*sendres_t)(st_netfd_t dst, const ResponseBatch &msg); +sendres_t sendres_async = sendmsg_async<ResponseBatch>; +sendres_t sendres_sync = sendmsg<ResponseBatch>; + /** * Keep issuing transactions to the replicas. */ @@ -534,16 +513,16 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { -#define bcastmsg bcastmsg_async + bcasttxn_t bcast = use_bcast_async ? bcasttxn_async : bcasttxn_sync; + st_thread_t bcaster_thread = bcast == bcasttxn_async ? + my_spawn(bcaster, "bcaster") : nullptr; + Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); -#if bcastmsg == bcastmsg_async - st_joining join_bcaster(my_spawn(bcaster, "bcaster")); -#endif - finally f(lambda () { + if (__ref(bcaster_thread) != nullptr) st_join(__ref(bcaster_thread)); showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); }); @@ -554,7 +533,7 @@ // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcastmsg(fds, TxnBatch()); + bcast(fds, TxnBatch()); } else { sendmsg(fds[0], TxnBatch()); } @@ -583,7 +562,7 @@ // Process immediately if not bcasting. if (fds.empty()) { --seqno; - process_txn(nullptr, g_map, txn, seqno, true); + process_txn(g_map, txn, seqno, nullptr); } ++seqno; @@ -619,7 +598,7 @@ // Broadcast. if (!fds.empty() && !suppress_txn_msgs) - bcastmsg(fds, batch); + bcast(fds, batch); // Pause? if (do_pause) @@ -630,11 +609,10 @@ TxnBatch batch; Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcastmsg(fds, batch); -#if bcastmsg == bcastmsg_any - msgs.push(shared_ptr<string>()); -#endif -#undef bcastmsg + bcast(fds, batch); + if (bcaster_thread != nullptr) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + } } /** @@ -642,15 +620,15 @@ * leader. */ void -process_txn(st_netfd_t leader, mii &map, const Txn &txn, int &seqno, - bool caught_up) +process_txn(mii &map, const Txn &txn, int &seqno, Response *res) { wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); - Response res; - res.set_seqno(txn.seqno()); - res.set_caught_up(caught_up); seqno = txn.seqno(); + if (res != nullptr) { + res->set_seqno(seqno); + res->set_caught_up(true); + } for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); const int key = op.key(); @@ -663,8 +641,10 @@ } switch (op.type()) { case Op::read: - if (it == map.end()) res.add_result(0); - else res.add_result(it->second); + if (res != nullptr) { + if (it == map.end()) res->add_result(0); + else res->add_result(it->second); + } break; case Op::write: if (use_wal) wal.write(key, op.value()); @@ -680,7 +660,6 @@ } } if (use_wal) wal.commit(); - if (caught_up && leader != nullptr) sendmsg(leader, res); } void @@ -760,7 +739,16 @@ // issued more since the Init message). int first_seqno = -1; + st_thread_t bcaster_thread = use_bcast_async ? + my_spawn(bcaster, "bcaster") : nullptr; + sendres_t sendmsg = use_bcast_async ? sendres_async : sendres_sync; + finally f(lambda () { + if (__ref(bcaster_thread) != nullptr) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + st_join(__ref(bcaster_thread)); + } + long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); @@ -773,8 +761,6 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - class break_exception : public std::exception {}; - try { while (true) { TxnBatch batch; @@ -794,6 +780,7 @@ } } if (batch.txn_size() > 0) { + ResponseBatch resbatch; for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -809,7 +796,8 @@ first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } - process_txn(leader, map, txn, seqno, true); + Response *res = resbatch.add_res(); + process_txn(map, txn, seqno, res); action = "processed"; } else { if (first_seqno == -1) @@ -829,6 +817,8 @@ st_sleep(0); } } + if (resbatch.res_size() > 0) + sendmsg(leader, resbatch); } else { // Empty (default) Txn means "generate a snapshot." // TODO make this faster @@ -876,39 +866,28 @@ start_time(current_time_millis()), recovery_start_time(caught_up ? -1 : start_time), recovery_end_time(-1), + start_seqno(seqno), recovery_start_seqno(caught_up ? -1 : seqno), recovery_end_seqno(-1), last_seqno(-1) {} void run() { - //start_time = current_time_millis(); - //recovery_start_time = caught_up ? -1 : start_time; - //recovery_end_time = -1; - //recovery_start_seqno = caught_up ? -1 : seqno; - //recovery_end_seqno = -1; - //last_seqno = -1; + finally f(boost::bind(&response_handler::cleanup, this)); - finally f(lambda () { - long long end_time = current_time_millis(); - if (__ref(recovery_end_time) > -1) { - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); - } - }); + st_reader reader(replica); while (true) { finally f(boost::bind(&response_handler::loop_cleanup, this)); - Response res; + ResponseBatch batch; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). if (last_seqno + 1 == seqno) { // Stop-interruptible in case we're already caught up. try { st_intr intr(stop_hub); - readmsg(replica, res); + readmsg(reader, batch); } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -925,34 +904,38 @@ // Only kill-interruptible because we want a clean termination (want // to get all the acks back). st_intr intr(kill_hub); - readmsg(replica, res); + readmsg(reader, batch); } - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - if (!caught_up && res.caught_up()) { - long long now = current_time_millis(), timediff = now - start_time; - caught_up = true; - recover_signals.push(now); - cout << rid << ": "; - cout << "recovering node caught up; took " - << timediff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } - } - if (res.seqno() % chkpt == 0) { - if (verbose) { + + for (int i = 0; i < batch.res_size(); ++i) { + const Response &res = batch.res(i); + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + if (!caught_up && res.caught_up()) { + long long now = current_time_millis(), timediff = now - start_time; + caught_up = true; + recover_signals.push(now); cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; + cout << "recovering node caught up; took " + << timediff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } } - st_sleep(0); + if (res.seqno() % chkpt == 0) { + if (verbose) { + cout << rid << ": "; + cout << "got response " << res.seqno() << " from " << replica << endl; + } + st_sleep(0); + } + last_seqno = res.seqno(); } - last_seqno = res.seqno(); } } @@ -977,6 +960,17 @@ } } + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + st_netfd_t replica; const int &seqno; int rid; @@ -984,7 +978,7 @@ bool caught_up; st_channel<long long> ⊂ long long start_time, recovery_start_time, recovery_end_time; - int recovery_start_seqno, recovery_end_seqno, last_seqno; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; }; /** @@ -1101,6 +1095,23 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); + // XXX + finally fin(lambda () { + cout << "LEADER SUMMARY" << endl; + cout << "- total updates = " << updates << endl; + cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " + << g_map.size() << endl; + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + cout << "- dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << __ref(seqno) << endl; + foreach (const pii &p, g_map) { + of << p.first << ": " << p.second << endl; + } + } + }); + try { // Start handling responses. st_thread_group handlers; @@ -1113,11 +1124,17 @@ // Accept the recovering node, and tell it about the online replicas. st_netfd_t joiner; - { + try { st_intr intr(stop_hub); joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); accept_joiner.waitset(); + } catch (std::exception &ex) { + string s(ex.what()); + if (s.find("Interrupted system call") == s.npos) + throw; + else + throw break_exception(); } Join join = readmsg<Join>(joiner); replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); @@ -1133,6 +1150,7 @@ handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, ref(recover_signals), false), "handle_responses_joiner")); + } catch (break_exception &ex) { } catch (std::exception &ex) { // TODO: maybe there's a cleaner way to do this final step before waiting with the join cerr_thread_ex(ex) << endl; @@ -1276,7 +1294,7 @@ int mid_seqno = seqno; while (!backlog.empty()) { shared_ptr<Txn> p = backlog.take(); - process_txn(leader, map, *p, seqno, false); + process_txn(map, *p, seqno, nullptr); if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " @@ -1390,8 +1408,12 @@ "inspection/diffing") ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), "suppress txn msgs") + ("fake-bcast", po::bool_switch(&fake_bcast), + "when using --bcast-async, don't actually perform the socket write") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") + ("bcast-async", po::bool_switch(&use_bcast_async), + "broadcast messages asynchronously") ("count-updates,u",po::bool_switch(&count_updates), "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), Added: ydb/trunk/src/p2.cc =================================================================== --- ydb/trunk/src/p2.cc (rev 0) +++ ydb/trunk/src/p2.cc 2009-02-13 20:57:05 UTC (rev 1178) @@ -0,0 +1,353 @@ +#include <algorithm> +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <commons/array.h> +#include <commons/nullptr.h> +#include <commons/rand.h> +#include <commons/sockets.h> +#include <commons/time.h> +#include <exception> +#include <iostream> +#include <set> +#include <string> +#include <sys/select.h> +#include <tr1/unordered_map> +#include <vector> +using namespace commons; +using namespace std; +using namespace tr1; +#define foreach BOOST_FOREACH +#define exception std::exception +#define STAT(t, c, x) \ + long long start_time = current_time_millis(); \ + x \ + ++c; \ + t += current_time_millis() - start_time; + +int bufsize = 1e8, chkpt = 1e4, batch_size = 1e4, thresh = 1e6; +bool verbose = true; +long long start = 0, seltime = 0, readtime = 0, writetime = 0; +int selcnt = 0, readcnt = 0, writecnt = 0; + +typedef array_view<char> arr; +arr mkarr(char *p = nullptr) { return arr(p, false); } + +typedef unordered_map<int, int> map_t; + +fd_set rfds, wfds, efds; + +class reader +{ +private: + array<char> buf_; + char *start_; + char *end_; + int fd_; +public: + reader(int fd) : buf_(bufsize), start_(buf_.get()), end_(start_), fd_(fd) {} + size_t rem() { return buf_.end() - end_; } + size_t amt() { return end_ - start_; } + int fd() { return fd_; } + arr read(size_t req) { + if (req <= amt()) { + arr a = mkarr(start_); + start_ += req; + return a; + } + // make sure we have enough space + check(req < buf_.size()); + // shift if necessary + if (req > rem()) { + memmove(buf_.get(), start_, amt()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + // read; advance end_ + STAT(readtime, readcnt, int res = ::read(fd(), end_, rem());) + int e = errno; + errno = 0; + //cout << "read res " << res << endl; + if (e == EAGAIN) return mkarr(); + if (res < 1) { close(fd()); throw exception(); } + end_ += res; + // if we still haven't read enough (requested), ret null + if (amt() < req) return mkarr(); + // advance start_ and return the newly consumed range + arr a = mkarr(start_); + start_ += req; + //cout << "offset " << a.get() - buf_.get() << endl; + return a; + } +}; + +class msg_reader +{ +private: + // len_ of 0 means we haven't read the prefix yet + uint32_t len_; + reader r_; +public: + msg_reader(int fd) : len_(0), r_(fd) {} + arr read(uint32_t &len) { + // read prefix + if (len_ == 0) { + arr prefix = r_.read(sizeof len_); + if (prefix.get() == nullptr) return mkarr(); + uint32_t tmp = *reinterpret_cast<uint32_t*>(prefix.get()); + //cout << "tmp " << tmp << endl; + len_ = tmp; + } + // read body + check(len_ > 0); + arr body = r_.read(len_); + if (body.get() != nullptr) { + len = len_; + len_ = 0; + } + return body; + } +}; + +class writer +{ +private: + array<char> buf_; + // start/end of unsent, prepared range + char *start_; + char *end_; + int fd_; +public: + writer(int fd) : buf_(bufsize), start_(buf_.get()), end_(start_), fd_(fd) {} + int fd() { return fd_; }; + fd_set &wfds() { return ::wfds; } + size_t amt() { return end_ - start_; } + size_t rem() { return buf_.end() - end_; } + + arr getbuf(uint32_t req) { + uint32_t tot = req + sizeof req; + check(tot > 0); + check(tot <= buf_.size()); + //cout << "getbuf req " << req << endl; + + // make space? + if (tot > rem()) { + if (tot > buf_.size() - amt()) return mkarr(); // not enough space + memmove(buf_.get(), start_, amt()); // shift + size_t diff = start_ - buf_.get(); + //if (diff > 0) cout << "shifting amt " << amt() << " diff " << diff << endl; + start_ -= diff; + end_ -= diff; + assert(rem() >= tot); + } + + // write length prefix + allocate/return body + *(reinterpret_cast<uint32_t*>(end_)) = req; + end_ += sizeof req; + arr p = mkarr(end_); + end_ += req; + return p; + } + + void write() { + // perform the write + STAT(writetime, writecnt, int res = ::write(fd(), start_, end_ - start_);) + if (res < 0) { close(fd()); throw exception(); } + //cout << "write res " << res << " amt " << amt() << endl; + // advance start_ + start_ += res; + // re-register for writes if we still have things to write + if (end_ - start_ > 0) { + FD_SET(fd(), &wfds()); + } + } +}; + +class replica_channel +{ +private: + int fd_; + writer w_; + char *buf_; + +public: + replica_channel(int fd) : fd_(fd), w_(fd) {} + int fd() { return fd_; } + + void writeint(uint32_t i) { + *(reinterpret_cast<uint32_t*>(buf_)) = i; + buf_ += sizeof i; + } + + void handle_write() { + //cout << "writing" << endl; + uint32_t npairs = batch_size; + uint32_t len = 2 * sizeof(uint32_t) * npairs; + arr a = w_.getbuf(len); + buf_ = a; + if (buf_ == nullptr) return; + for (uint32_t i = 0; i < npairs; ++i) { + writeint(1); + writeint(2); + } + w_.write(); + } + +}; + +class replica +{ +private: + int fd_; + msg_reader r_; + const char *buf_; + map_t map_; + int counter_; + int readcount_; + long long start_; + +public: + replica(int fd) : fd_(fd), r_(fd), counter_(0), readcount_(0), start_(current_time_millis()) {} + int fd() { return fd_; } + + uint32_t readint() { + uint32_t i = *reinterpret_cast<const uint32_t*>(buf_); + buf_ += sizeof i; + return i; + } + + void handle_read() { + ++readcount_; + while (true) { + uint32_t len = 0; + arr a = r_.read(len); + buf_ = a.get(); + if (buf_ == nullptr) break; + uint32_t npairs = len / sizeof(uint32_t) / 2; + check(2 * sizeof(uint32_t) * npairs == len); // should be whole count + for (uint32_t i = 0; i < npairs; ++i) { + uint32_t k = readint(); + uint32_t v = readint(); + map_[k] = v; + ++counter_; + if (counter_ % chkpt == 0) { + //if (verbose) cout << current_time_millis() << ": count " << counter_ << endl; + if (counter_ > thresh) { + long long end = current_time_millis(); + double rate = double(counter_) / (end - start_) * 1000; + cout << "rate " << rate << " pairs/s " << rate / 5 << " tps; readcount " << readcount_ << endl; + throw exception(); + } + } + } + } + } + +}; + +class mainer +{ +private: + vector<replica_channel*> rs; + +public: + int main(int argc, char **argv) { + bool is_leader; + string host; + + namespace po = boost::program_options; + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("leader,l", po::bool_switch(&is_leader), "leader") + ("verbose,v",po::bool_switch(&verbose), "verbose") + ("host,H", + po::value<string>(&host)->default_value(string("localhost")), + "hostname or address of the leader") + ("batch,b", po::value<int>(&batch_size)->default_value(1e4), "batch size"); + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + } catch (exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int srv = is_leader ? tcp_listen(7654, true) : -1; + int cli = is_leader ? -1 : tcp_connect(host.c_str(), 7654); + if (cli >= 0) checknnegerr(fcntl(cli, F_SETFL, O_NONBLOCK | fcntl(cli, F_GETFL, 0))); + int nfds = max(srv, cli); + if (srv >= 0) FD_SET(srv, &rfds); + if (cli >= 0) FD_SET(cli, &rfds); + replica rep(cli); + if (cli >= 0) { start = current_time_millis(); seltime = 0; } + + while (true) { + //sleep(1); + //cout << endl; + STAT(seltime, selcnt, checknnegerr(select(nfds + 1, &rfds, &wfds, &efds, nullptr));) + //cout << "select waited " << diff << endl; + + // accept new connections + if (srv >= 0 && FD_ISSET(srv, &rfds)) { + if (start == 0) { start = current_time_millis(); seltime = 0; } + cout << "accept" << endl; + int r = checknnegerr(accept(srv, nullptr, nullptr)); + cout << fcntl(r, F_GETFL, 0) << ' '; + checknnegerr(fcntl(r, F_SETFL, O_NONBLOCK | fcntl(r, F_GETFL, 0))); + cout << fcntl(r, F_GETFL, 0) << endl; + rs.push_back(new replica_channel(r)); + nfds = max(nfds, r); + FD_SET(r, &wfds); + } + + if (cli >= 0 && FD_ISSET(cli, &rfds)) { + rep.handle_read(); + } + + // handle ready events + foreach (replica_channel *p, rs) { + replica_channel &r = *p; + if (FD_ISSET(r.fd(), &rfds)) { + //r.handle_read(); + } + if (FD_ISSET(r.fd(), &wfds)) { + r.handle_write(); + } + FD_SET(r.fd(), &rfds); + } + } + + return 0; + } +}; + +void dump() { + long long tot = current_time_millis() - start; + cout << "readtime " << readtime << " writetime " << writetime << " seltime " << seltime << " tot " << tot << endl; + cout << "readcnt " << readcnt << " writecnt " << writecnt << " selcnt " << selcnt << endl; +} + +int main(int argc, char **argv) { + int ret; + atexit(dump); + try { + ret = mainer().main(argc, argv); + } catch (...) { + ret = 1; + } + return ret; +} Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/ydb.proto 2009-02-13 20:57:05 UTC (rev 1178) @@ -78,4 +78,8 @@ message TxnBatch { repeated Txn txn = 1; -} \ No newline at end of file +} + +message ResponseBatch { + repeated Response res = 1; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |