[Assorted-commits] SF.net SVN: assorted:[1245] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-03 18:50:44
|
Revision: 1245 http://assorted.svn.sourceforge.net/assorted/?rev=1245&view=rev Author: yangzhang Date: 2009-03-03 18:50:38 +0000 (Tue, 03 Mar 2009) Log Message: ----------- - fixed bug: was reading caught_up as int instead of char - fixed bug: was not clearing Response in ResponseBatch::add_res() - fixed bug: was serializing resbatch instead of batch in process_txns - add mark_and_flush() so that the buffer can be cleared out - replaced --wal with --twal and --pwal; reintroduced physical logging Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-03 04:44:03 UTC (rev 1244) +++ ydb/trunk/src/main.lzz.clamp 2009-03-03 18:50:38 UTC (rev 1245) @@ -68,7 +68,8 @@ size_t accept_joiner_size, buf_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, use_pb_res, + debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, + use_pb, use_pb_res, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -531,7 +532,6 @@ void logbuf(const void *buf, size_t len) { of.write(reinterpret_cast<const char*>(buf), len); } -#if 0 void del(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; @@ -544,7 +544,6 @@ int op = op_commit; out & op; } -#endif private: enum { op_del, op_write, op_commit }; ofstream of; @@ -579,7 +578,7 @@ commons::array<char> rbuf(read_buf_size), wbuf(buf_size); reader r(nullptr, rbuf.get(), rbuf.size()); function<void(const void*, size_t)> fn; - if (use_wal) + if (use_twal) fn = bind(&wal::logbuf, g_wal, _1, _2); else fn = lambda(const void *buf, size_t len) { @@ -609,7 +608,9 @@ if (!newreps.empty() && seqno > 0) { start_txn(batch); fin_txn(batch); - w.mark(); + // TODO: verify that this made the catch-up stream more efficient, + // starting it only at the point necessary + w.mark_and_flush(); if (Types::is_pb()) { if (multirecover) bcastmsg(fds, batch); else sendmsg(fds[0], batch); @@ -684,17 +685,17 @@ bool do_bcast = !fds.empty() && !suppress_txn_msgs; if (Types::is_pb()) { // Broadcast/log/serialize. - if (force_ser || do_bcast || use_wal) { + if (force_ser || do_bcast || use_twal) { serbuf.clear(); ser(serbuf, batch); if (do_bcast) bcastbuf(fds, serbuf); - if (use_wal) g_wal->logbuf(serbuf); + if (use_twal) g_wal->logbuf(serbuf); } } else { // Reset if we have nobody to send to (incl. disk) or if we actually have // no txns (possible due to loop structure; want to avoid to avoid // confusing with the 0-txn message signifying "prepare a recovery msg"). - if (!do_bcast && !use_wal) { + if (!do_bcast && !use_twal) { w.reset(); } } @@ -715,8 +716,7 @@ fin_op(txn); fin_txn(batch); if (Types::is_pb()) bcastmsg(fds, batch); - w.mark(); - w.flush(); + w.mark_and_flush(); } } @@ -760,14 +760,14 @@ case Op::write: { int value = op.value(); - //if (use_wal) wal.write(key, value); + if (use_pwal) g_wal->write(key, value); if (it == map.end()) map[key] = value; else it->second = value; break; } case Op::del: if (it != map.end()) { - //if (use_wal) wal.del(key); + if (use_pwal) g_wal->del(key); map.erase(it); } break; @@ -776,7 +776,7 @@ } if (res != nullptr) fin_result(*res); - //if (use_wal) wal.commit(); + if (use_pwal) g_wal->commit(); } void @@ -880,6 +880,15 @@ // issued more since the Init message). int first_seqno = -1; + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(leader, rbuf.get(), rbuf.size()); + vector<st_netfd_t> leader_v(1, leader); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, wbuf.get(), wbuf.size()); + stream s(reader, w); + finally f(lambda () { long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), @@ -891,17 +900,10 @@ __ref(seqno_caught_up)); } __ref(send_states).push(shared_ptr<Recovery>()); + __ref(w).mark_and_flush(); + st_sleep(1); }); - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - st_reader reader(leader, rbuf.get(), rbuf.size()); - vector<st_netfd_t> leader_v(1, leader); - writer w(lambda(const void *buf, size_t len) { - checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - }, wbuf.get(), wbuf.size()); - stream s(reader, w); - try { scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); TxnBatch &batch = *pbatch; @@ -954,7 +956,7 @@ if (first_seqno == -1) first_seqno = txn.seqno(); // Queue up for later processing once a snapshot has been received. - // XXX + // XXX speed up backlog.push(to_pb_Txn(txn)); action = "backlogged"; } @@ -970,7 +972,7 @@ fin_res(resbatch); if (RTypes::is_pb() && resbatch.res_size() > 0) { serbuf.clear(); - ser(serbuf, batch); + ser(serbuf, resbatch); sendbuf(leader, serbuf); } } else if (multirecover || mypos == 0) { @@ -1086,7 +1088,14 @@ // catching up. If it has, then broadcast a signal so that all response // handlers will know about this event. int rseqno = res.seqno(); + if (rseqno <= last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(rseqno)); bool rcaught_up = res.caught_up(); + for (int r = 0; r < res.result_size(); ++r) { + cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; + } if (!caught_up && rcaught_up) { long long now = current_time_millis(), timediff = now - start_time; caught_up = true; @@ -1476,18 +1485,16 @@ while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); - if (p->seqno() > seqno) { - process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; - } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } + process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } showtput("replayer caught up; from backlog replayed", current_time_millis(), mid_time, seqno, mid_seqno); @@ -1611,8 +1618,10 @@ "use protocol buffers instead of raw buffers for txns") ("use-pb-res", po::bool_switch(&use_pb_res), "use protocol buffers instead of raw buffers for responses") - ("wal", po::bool_switch(&use_wal), - "enable ARIES write-ahead logging") + ("twal", po::bool_switch(&use_twal), + "enable transactional write-ahead logging") + ("pwal", po::bool_switch(&use_pwal), + "enable physical write-ahead logging") ("force-ser", po::bool_switch(&force_ser), "force issue_txns to serialize its Txns") ("leader,l", po::bool_switch(&is_leader), Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-03 04:44:03 UTC (rev 1244) +++ ydb/trunk/src/ser.h 2009-03-03 18:50:38 UTC (rev 1245) @@ -91,7 +91,7 @@ } public: writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : - a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(a_.get()), flushcb(flushcb) {} + a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(p_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } char *cur() { return p_; } size_t pos() { return p_ - mark_; } @@ -99,6 +99,11 @@ void mark() { mark_ = p_; } void reset() { p_ = mark_; } void reserve(int n) { reserve(n, p_); } + void mark_and_flush() { + mark(); + flush(); + unsent_ = mark_ = p_ = a_.get(); + } void flush() { if (mark_ - unsent_ > 0) { flushcb(unsent_, mark_ - unsent_); @@ -220,7 +225,7 @@ void set_seqno(int x) { w_.write(x); } void set_caught_up(char x) { w_.write(x); } int seqno() const { return r_.read<int>(); } - bool caught_up() const { return r_.read<int>(); } + bool caught_up() const { return r_.read<char>(); } void start_result() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } void add_result(int x) { w_.write(x); } void fin_result() { w_.write(nres_, off_ + sizeof(int) + sizeof(char)); } @@ -244,7 +249,7 @@ ResponseBatch(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), res_(s), nres_(unset) {} void Clear() { res_.Clear(); nres_ = unset; off_ = w_.pos(); } void start_res() { if (nres_ == unset) nres_ = 0; w_.skip<typeof(nres_)>(); } - Response *add_res() { ++nres_; return &res_; } + Response *add_res() { ++nres_; res_.Clear(); return &res_; } void fin_res() { w_.write(nres_, off_); } int res_size() const { if (nres_ == unset) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |