[Assorted-commits] SF.net SVN: assorted:[1255] ydb/trunk/src/main.lzz.clamp
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-05 23:16:57
|
Revision: 1255 http://assorted.svn.sourceforge.net/assorted/?rev=1255&view=rev Author: yangzhang Date: 2009-03-05 23:16:46 +0000 (Thu, 05 Mar 2009) Log Message: ----------- added fast rb backlogging/catch-up Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-05 23:15:25 UTC (rev 1254) +++ ydb/trunk/src/main.lzz.clamp 2009-03-05 23:16:46 UTC (rev 1255) @@ -7,6 +7,7 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> +#include <boost/tuple/tuple.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -27,13 +28,15 @@ #include <tr1/unordered_map> #include <unistd.h> // pipe, write #include <vector> +#include "ser.h" #include "ydb.pb.h" -#include "ser.h" #define function boost::function #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref +#define tuple boost::tuple +#define make_tuple boost::make_tuple using namespace boost; using namespace boost::archive; @@ -53,6 +56,8 @@ typedef pair<int, int> pii; typedef map_t<int, int> mii; +typedef tuple<sized_array<char>, char*, char*> chunk; + template<typename T> void init_map(T &map) {} template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); @@ -69,7 +74,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, + use_pb, use_pb_res, g_caught_up, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -862,7 +867,8 @@ void process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, - st_channel<shared_ptr<pb::Txn> > &backlog, int init_seqno, + /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ + st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { typedef typename Types::TxnBatch TxnBatch; @@ -880,7 +886,8 @@ // issued more since the Init message). int first_seqno = -1; - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + commons::array<char> wbuf(buf_size); st_reader reader(leader, rbuf.get(), rbuf.size()); vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { @@ -910,85 +917,125 @@ scoped_ptr<ResponseBatch> presbatch(new_ResponseBatch<ResponseBatch>(s)); ResponseBatch &resbatch = *presbatch; ser_t serbuf; + char *first_start = reader.start(); + assert(first_start == rbuf.get()); + const size_t headerlen = sizeof(uint32_t) + sizeof(short) + sizeof(int); while (true) { uint32_t prefix = 0; - long long before_read = -1; - if (read_thresh > 0) { - before_read = current_time_millis(); + char *start = reader.start(); + + // Will overflow on next few reads ("header")? + if (reader.unread() + reader.rem() < headerlen) { + sized_array<char> buf(new char[read_buf_size], read_buf_size); + memcpy(buf.get(), reader.start(), reader.unread()); + swap(buf, reader.buf()); + reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); + backlog.push(make_tuple(buf, first_start, start)); + first_start = reader.start(); } - { + + if (Types::is_pb()) { + long long before_read = -1; + if (read_thresh > 0) { + before_read = current_time_millis(); + } + { + st_intr intr(stop_hub); + readmsg(reader, batch); + } + if (read_thresh > 0) { + long long read_time = current_time_millis() - before_read; + if (read_time > read_thresh) { + cout << "thread " << threadname() + << ": read took " << read_time << " ms" << endl; + } + } + } else { st_intr intr(stop_hub); - if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + prefix = reader.read<uint32_t>(); + check(prefix < 10000); + batch.Clear(); } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; - } - } + if (batch.txn_size() > 0) { - w.mark(); - resbatch.Clear(); - start_res(resbatch); - // XXX - //char *start = reader.start(); - //const Txn &first_txn = batch.txn(0); - //if (txn.seqno() < 0) { - //} else if (txn.seqno() == seqno + 1) { - //} else { - // // Skip entire message. - // reader. - //} - for (int t = 0; t < batch.txn_size(); ++t) { - // XXX const Txn &txn = t == 0 ? first_txn : batch.txn(t); - const Txn &txn = batch.txn(t); - // Regular transaction. - const char *action; - if (txn.seqno() < 0) { - throw break_exception(); - } else if (txn.seqno() == seqno + 1) { - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; - } + const Txn &first_txn = batch.txn(0); + if (first_txn.seqno() < 0) { + break; + } else if (first_txn.seqno() > seqno + 1) { + // In backlogging mode? + + // Skip entire message, pushing it to the thread that's handling + // recovery for later processing once snapshot is received. + // TODO: implement and use anchors instead? + if (first_seqno == -1) + cout << "first seqno: " << (first_seqno = first_txn.seqno()) << endl; + + // Caught up? + if (first_seqno == seqno + 1) { + // Rewind so we process accumulated messages. + reader.reset_range(first_start, reader.end()); + continue; + } + + // About to overflow? + if (reader.unread() + reader.rem() < prefix + sizeof(uint32_t) - headerlen) { + // Move current partial message to new buffer. + sized_array<char> tmp(new char[read_buf_size], read_buf_size); + *reinterpret_cast<uint32_t*>(tmp.get()) = prefix; + *reinterpret_cast<short*>(tmp.get() + sizeof(uint32_t)) = short(batch.txn_size()); + *reinterpret_cast<int*>(tmp.get() + sizeof(uint32_t) + sizeof(short)) = first_txn.seqno(); + memcpy(tmp.get() + headerlen, reader.start(), reader.unread()); + + // Swap the buffers. + swap(tmp, reader.buf()); + reader.reset_range(reader.buf().get() + headerlen, reader.buf().get() + headerlen + reader.unread()); + assert(tmp.get() <= first_start && first_start < tmp.end()); + assert(tmp.get() < start && start < tmp.end()); + assert(first_start < start); + backlog.push(make_tuple(tmp, first_start, start)); + first_start = reader.buf().get(); + first_seqno = first_txn.seqno(); + } + + // Fill up rest of the message + assert(reader.unread() + reader.rem() >= prefix + sizeof(uint32_t) - headerlen); + check0x(reader.accum(prefix + sizeof(uint32_t) - headerlen)); + } else { + // Regular transaction batch. + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + w.mark(); + resbatch.Clear(); + start_res(resbatch); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = t == 0 ? first_txn : batch.txn(t); Response *res = resbatch.add_res(); process_txn<Types, RTypes>(map, txn, seqno, res); if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); } - action = "processed"; - } else { - if (first_seqno == -1) - first_seqno = txn.seqno(); - // Queue up entire buffer for later processing once a snapshot has - // been received. - // XXX backlog.push(array()); - // Stop the loop. - // XXX t = batch.txn_size(); - backlog.push(to_pb_Txn(txn)); - action = "backlogged"; - } - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << action << " txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "processed txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } } + fin_res(resbatch); + if (RTypes::is_pb() && resbatch.res_size() > 0) { + serbuf.clear(); + ser(serbuf, resbatch); + sendbuf(leader, serbuf); + } } - fin_res(resbatch); - if (RTypes::is_pb() && resbatch.res_size() > 0) { - serbuf.clear(); - ser(serbuf, resbatch); - sendbuf(leader, serbuf); - } } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster @@ -1439,7 +1486,8 @@ } // Process txns. - st_channel<shared_ptr<pb::Txn> > backlog; + // XXX st_channel<shared_ptr<pb::Txn> > backlog; + st_channel<chunk> backlog; const function<void()> process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, @@ -1497,7 +1545,51 @@ long long mid_time = current_time_millis(); int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw operation_not_supported("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + assert(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + assert(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + assert(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = reader.read<uint32_t>(); + assert(prefix < 10000); + assert(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + process_txn<rb_types, rb_types>(map, txn, seqno, nullptr); + if (fake_exec && !Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + } + assert(start + sizeof(uint32_t) + prefix == reader.start()); + } + } + g_caught_up = true; +#if 0 + while (!backlog.empty()) { using pb::Txn; shared_ptr<Txn> p = backlog.take(); process_txn<pb_types, pb_types>(map, *p, seqno, nullptr); @@ -1511,6 +1603,7 @@ st_sleep(0); } } +#endif showtput("replayer caught up; from backlog replayed", current_time_millis(), mid_time, seqno, mid_seqno); } @@ -1786,6 +1879,8 @@ // Initialize the map. init_map(g_map); + cout << "pid " << getpid() << endl; + // Which role are we? if (is_leader) { if (use_pb) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |