[Assorted-commits] SF.net SVN: assorted:[1204] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-20 03:22:10
|
Revision: 1204 http://assorted.svn.sourceforge.net/assorted/?rev=1204&view=rev Author: yangzhang Date: 2009-02-20 01:41:23 +0000 (Fri, 20 Feb 2009) Log Message: ----------- - upgraded pb-switching from static to dynamic (using template hackery) - added --use-pb - added batches of types, etc. - broke recovery; need to fix this/speed it up - updated ser; this was playground for moving to dynamic switching - fixed writer::show() - fixed start_op/txn and add_op/txn so that the start_ operations would initialize the counts to 0 - fixed the clamp preprocessing so as to separate the fwd decls from the defs - updated README with new TODOs Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-19 23:02:34 UTC (rev 1203) +++ ydb/trunk/README 2009-02-20 01:41:23 UTC (rev 1204) @@ -391,9 +391,15 @@ Period 2/17- -- TODO dynamic switch between pb and zero-copy +- DONE removed class outstream +- TODO refactor st_reader, etc. to be generic opportunistic buffered readers +- TODO see how streambuf read/write is actually implemented (whether it's too + slow) +- TODO try making a streambuf for st_write, then try it in conj with + struct-less pb +- DONE dynamic switch between pb and zero-copy - TODO async (threaded) wal -- TODO 0-node 0-copy (need to use threads) +- TODO 0-node 0-copy (don't need to use threads, just process each batch immed) - TODO google dense hash map - TODO show aries-write Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-19 23:02:34 UTC (rev 1203) +++ ydb/trunk/src/Makefile 2009-02-20 01:41:23 UTC (rev 1204) @@ -62,12 +62,10 @@ %.o: %.pb.cc %.pb.h $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< -%.cc: %.lzz +%.cc %.hh: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< + python -c 'pars = file("lambda_impl.clamp_h").read().split("\n\n"); hh = file("main.hh").read(); print >> file("main.cc", "a"), pars[-1]; print >> file("main.hh", "w"), "\n\n".join(pars[:-1] + [hh])' -%.hh: %.lzz - lzz -hx hh -sx cc -hl -sl -hd -sd $< - %.pb.cc: %.proto protoc --cpp_out=. $< @@ -75,7 +73,7 @@ protoc --cpp_out=. $< %.lzz: %.lzz.clamp - clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ + clamp < $< | sed '1d' > $@ main.o: ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-19 23:02:34 UTC (rev 1203) +++ ydb/trunk/src/main.lzz.clamp 2009-02-20 01:41:23 UTC (rev 1204) @@ -28,14 +28,12 @@ #include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" -//#define USE_PB #include "ser.h" #define function boost::function #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref -#define REUSE_SER using namespace boost; using namespace boost::archive; @@ -55,11 +53,8 @@ using ydb::pb::Init; using ydb::pb::Join; using ydb::pb::SockAddr; -#ifdef USE_PB using namespace ydb::pb; -#else using namespace ydb::msg; -#endif #define GETMSG(buf) \ checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ @@ -82,7 +77,7 @@ size_t accept_joiner_size, buf_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_wal, + debug_threads, multirecover, disk, debug_memory, use_wal, use_pb, suppress_txn_msgs, use_bcast_async, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -534,10 +529,14 @@ /** * Keep issuing transactions to the replicas. */ +template<typename Types> void issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { + typedef typename Types::TxnBatch TxnBatch; + typedef typename Types::Txn Txn; + typedef typename Types::Op Op; Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; @@ -566,7 +565,8 @@ checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); }, buf_size); stream s(r,w); - TxnBatch batch NPBONLY((s)); + scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); + TxnBatch batch = *pbatch; for (int t = 0; t < batch_size; ++t) batch.add_txn(); while (!stop_hub) { @@ -589,12 +589,12 @@ } // Generate some random transactions. - NPBONLY(batch.start_txn()); + start_txn(batch); for (int t = 0; t < batch_size; ++t) { Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); int count = randint(min_ops, max_ops + 1); - NPBONLY(txn.start_op()); + start_op(txn); for (int o = 0; o < count; ++o) { Op *op = txn.add_op(); int rtype = general_txns ? randint(3) : 1, @@ -604,12 +604,12 @@ op->set_key(rkey); op->set_value(rvalue); } - NPBONLY(txn.fin_op()); + fin_op(txn); // Process immediately if not bcasting. if (fds.empty()) { --seqno; - process_txn(g_map, txn, seqno, nullptr); + process_txn<Types>(g_map, txn, seqno, nullptr); } // Checkpoint. @@ -643,8 +643,8 @@ ++seqno; } - NPBONLY(batch.fin_txn()); - NPBONLY(if (batch.txn_size() == 0) w.reset()); + fin_txn(batch); + if (batch.txn_size() == 0) w.reset(); // Broadcast. #ifdef USE_PB @@ -669,13 +669,13 @@ // This means "The End." w.mark(); batch.Clear(); - NPBONLY(batch.start_txn()); + start_txn(batch); Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - NPBONLY(txn.start_op()); - NPBONLY(txn.fin_op()); - NPBONLY(batch.fin_txn()); - PBONLY(bcastmsg(fds, batch)); + start_op(txn); + fin_op(txn); + fin_txn(batch); + if (Types::is_pb()) bcastmsg(fds, batch); w.mark(); w.flush(); } @@ -684,9 +684,12 @@ * Process a transaction: update DB state (incl. seqno) and send response to * leader. */ +template<typename Types> void -process_txn(mii &map, const Txn &txn, int &seqno, Response *res) +process_txn(mii &map, const typename Types::Txn &txn, int &seqno, Response *res) { + typedef typename Types::Txn Txn; + typedef typename Types::Op Op; //wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); seqno = txn.seqno(); @@ -770,6 +773,17 @@ } #end +template<typename Txn> shared_ptr<ydb::pb::Txn> to_pb_Txn(Txn txn); +template<> shared_ptr<ydb::pb::Txn> to_pb_Txn(ydb::pb::Txn txn) { + return shared_ptr<ydb::pb::Txn>(new ydb::pb::Txn(txn)); +} +template<> shared_ptr<ydb::pb::Txn> to_pb_Txn(ydb::msg::Txn txn) { + shared_ptr<ydb::pb::Txn> ptxn(new ydb::pb::Txn()); + ptxn->set_seqno(txn.seqno()); + // XXX FIXME + return ptxn; +} + /** * Actually do the work of executing a transaction and sending back the reply. * @@ -795,12 +809,17 @@ * * \param[in] wal The WAL. */ +template<typename Types> void process_txns(st_netfd_t leader, mii &map, int &seqno, st_channel<shared_ptr<Recovery> > &send_states, - st_channel<shared_ptr<Txn> > &backlog, int init_seqno, + st_channel<shared_ptr<ydb::pb::Txn> > &backlog, int init_seqno, int mypos, int nnodes) { + typedef typename Types::TxnBatch TxnBatch; + typedef typename Types::Txn Txn; + typedef typename Types::Op Op; + bool caught_up = init_seqno == 0; long long start_time = current_time_millis(), time_caught_up = caught_up ? start_time : -1; @@ -829,7 +848,8 @@ stream s(reader, w); try { - TxnBatch batch NPBONLY((s)); + scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); + TxnBatch batch = *pbatch; ResponseBatch resbatch; while (true) { long long before_read = -1; @@ -838,8 +858,8 @@ } { st_intr intr(stop_hub); - PBONLY(readmsg(reader, batch)); - NPBONLY(batch.Clear()); + if (Types::is_pb()) readmsg(reader, batch); + else batch.Clear(); } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -866,13 +886,14 @@ caught_up = true; } Response *res = resbatch.add_res(); - process_txn(map, txn, seqno, res); + process_txn<Types>(map, txn, seqno, res); action = "processed"; } else { if (first_seqno == -1) first_seqno = txn.seqno(); // Queue up for later processing once a snapshot has been received. - backlog.push(shared_ptr<Txn>(new Txn(txn))); + // XXX + backlog.push(to_pb_Txn(txn)); action = "backlogged"; } @@ -1158,8 +1179,9 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno), - ref(accept_joiner)); + const function<void()> f = use_pb ? + bind(issue_txns<pb_types>, ref(newreps), ref(seqno), ref(accept_joiner)) : + bind(issue_txns<rb_types>, ref(newreps), ref(seqno), ref(accept_joiner)); st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); @@ -1302,12 +1324,15 @@ } // Process txns. - st_channel<shared_ptr<Txn> > backlog; - st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), - ref(seqno), ref(send_states), - ref(backlog), init.txnseqno(), - mypos, init.node_size()), - "process_txns")); + st_channel<shared_ptr<ydb::pb::Txn> > backlog; + const function<void()> process_fn = use_pb ? + bind(process_txns<pb_types>, leader, ref(map), ref(seqno), + ref(send_states), ref(backlog), init.txnseqno(), mypos, + init.node_size()) : + bind(process_txns<rb_types>, leader, ref(map), ref(seqno), + ref(send_states), ref(backlog), init.txnseqno(), mypos, + init.node_size()); + st_joining join_proc(my_spawn(process_fn, "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), ref(seqno), ref(send_states)), "recover_joiner")); @@ -1361,8 +1386,9 @@ int mid_seqno = seqno; while (!backlog.empty()) { + using ydb::pb::Txn; shared_ptr<Txn> p = backlog.take(); - process_txn(map, *p, seqno, nullptr); + process_txn<pb_types>(map, *p, seqno, nullptr); if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " @@ -1488,6 +1514,8 @@ "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") + ("use-pb", po::bool_switch(&use_pb), + "use protocol buffers instead of raw buffers") ("wal", po::bool_switch(&use_wal), "enable ARIES write-ahead logging") ("force-ser", po::bool_switch(&force_ser), Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-19 23:02:34 UTC (rev 1203) +++ ydb/trunk/src/ser.cc 2009-02-20 01:41:23 UTC (rev 1204) @@ -1,17 +1,11 @@ #include "ser.h" #include <commons/st/st.h> +#include <boost/scoped_ptr.hpp> -//#define USE_PB -using ydb::msg::reader; -using ydb::msg::writer; -using ydb::msg::stream; using namespace commons; using namespace std; -#ifdef USE_PB +using namespace ydb::msg; using namespace ydb::pb; -#else -using namespace ydb::msg; -#endif const int nreps = 2; @@ -22,13 +16,18 @@ public: outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} void operator()(const void *buf, size_t len) { + cout << "writing " << len << endl; foreach (st_netfd_t dst, dsts) checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); } }; +template<typename types> void producer(st_netfd_t dst) { + typedef typename types::TxnBatch TxnBatch; + typedef typename types::Txn Txn; + typedef typename types::Op Op; vector<st_netfd_t> dsts(1, dst); outstream os(dsts); writer w(os, 90); @@ -36,48 +35,56 @@ stream s(r,w); string str; const bool show = true; - TxnBatch batch NPBONLY((s)); + scoped_ptr<TxnBatch> p(new_TxnBatch<TxnBatch>(s)); + TxnBatch &batch = *p; for (int i = 0; i < nreps; ++i) { w.mark(); batch.Clear(); - NPBONLY(batch.start_txn()); + start_txn(batch); for (int t = 0; t < 2; ++t) { Txn &txn = *batch.add_txn(); txn.set_seqno(t + 5); - NPBONLY(txn.start_op()); + start_op(txn); for (int o = 0; o < 2; ++o) { Op &op = *txn.add_op(); op.set_type (Op::del); op.set_key (3 * (o+1)); op.set_value(4 * (o+1)); } - NPBONLY(txn.fin_op()); + fin_op(txn); } - NPBONLY(batch.fin_txn()); + fin_txn(batch); if (show) cout << w.pos() << '/' << w.size() << endl; - PBONLY(check(batch.SerializeToString(&str))); + ser(batch, str); } batch.Clear(); - NPBONLY(batch.start_txn()); - NPBONLY(batch.fin_txn()); + start_txn(batch); + fin_txn(batch); w.mark(); + w.show(); w.flush(); } +template<typename types> void consumer(st_netfd_t src) { + typedef typename types::TxnBatch TxnBatch; + typedef typename types::Txn Txn; + typedef typename types::Op Op; vector<st_netfd_t> v; outstream os(v); writer w(os, 90); reader r(src); stream s(r,w); + string str; // XXX const bool show = true; - TxnBatch batch NPBONLY((s)); - for (int i = 0; i < nreps; ++i) { + scoped_ptr<TxnBatch> p(new_TxnBatch<TxnBatch>(s)); + TxnBatch &batch = *p; + while (true) { batch.Clear(); - PBONLY(check(batch.ParseFromString(str))); + parse(batch, str); if (show) cout << "ntxn " << batch.txn_size() << endl; - //if (batch.txn_size() == 0) break; + if (batch.txn_size() == 0) break; for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); if (show) cout << "txn seqno " << txn.seqno() << " " << txn.seqno() << endl; @@ -98,15 +105,22 @@ int main(int argc, char **argv) { st_init(); - bool is_leader = argc == 1; + bool use_pb = argc > 1 && string("-p") == argv[1]; + bool is_leader = argc == (use_pb ? 2 : 1); if (is_leader) { st_netfd_t listener = st_tcp_listen(7654); st_netfd_t dst = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); - producer(dst); + if (use_pb) + producer<pb_types>(dst); + else + producer<rb_types>(dst); } else { st_netfd_t src = st_tcp_connect(argv[1], 7654, ST_UTIME_NO_TIMEOUT); - consumer(src); + if (use_pb) + consumer<pb_types>(src); + else + consumer<rb_types>(src); } return 0; } Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-19 23:02:34 UTC (rev 1203) +++ ydb/trunk/src/ser.h 2009-02-20 01:41:23 UTC (rev 1204) @@ -7,16 +7,6 @@ #include <iostream> #include "ydb.pb.h" -#ifdef USE_PB -#define PBSWITCH(a,b) a -#define PBONLY(x) x -#define NPBONLY(x) -#else -#define PBSWITCH(a,b) b -#define PBONLY(x) -#define NPBONLY(x) x -#endif - #define BEGIN_NAMESPACE(ns) namespace ns { #define END_NAMESPACE } @@ -75,7 +65,7 @@ void show() { cout << (void*) p_; for (size_t i = 0; i < a_.size(); ++i) - cout << " " << hex << setfill('0') << setw(2) << int(mark_[i]); + cout << " " << hex << setfill('0') << setw(2) << (int)(unsigned char)(a_.get()[i]); cout << endl; cout << (void*) p_; for (size_t i = 0; i < a_.size(); ++i) @@ -136,8 +126,8 @@ void Clear() { w_.reserve(0*50); nop_ = unset; seqno_ = unset; off_ = w_.pos(); } void set_seqno(int x) { w_.write(x); } int seqno() const { return seqno_ == unset ? seqno_ = r_.read<int>() : seqno_; } - void start_op() { w_.skip<typeof(nop_)>(); } - Op *add_op() { if (nop_ == unset) nop_ = 0; ++nop_; return &op_; } + void start_op() { if (nop_ == unset) nop_ = 0; w_.skip<typeof(nop_)>(); } + Op *add_op() { ++nop_; return &op_; } void fin_op() { w_.write(nop_, off_ + sizeof(int)); } int op_size() const { if (nop_ == unset) nop_ = r_.read<typeof(nop_)>(); return nop_; } const Op &op(int o) const { return op_; } @@ -155,8 +145,8 @@ public: TxnBatch(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), txn_(s), ntxn_(unset) {} void Clear() { w_.reserve(0*100); txn_.Clear(); ntxn_ = unset; off_ = w_.pos(); } - void start_txn() { w_.skip<typeof(ntxn_)>(); } - Txn *add_txn() { if (ntxn_ == unset) ntxn_ = 0; ++ntxn_; txn_.Clear(); return &txn_; } + void start_txn() { if (ntxn_ == unset) ntxn_ = 0; w_.skip<typeof(ntxn_)>(); } + Txn *add_txn() { ++ntxn_; txn_.Clear(); return &txn_; } void fin_txn() { w_.write(ntxn_, off_); } int txn_size() const { if (ntxn_ == unset) @@ -167,8 +157,52 @@ bool AppendToString(string *s) const { throw std::exception(); } bool SerializeToString(string *s) const { throw std::exception(); } bool SerializeToOstream(ostream *s) const { throw std::exception(); } + bool ParseFromArray(void *p, size_t len) { throw std::exception(); } }; +template<typename T> void parse(T &batch, const string &str); +template<> void parse(ydb::pb::TxnBatch &batch, const string &str) { check(batch.ParseFromString(str)); } +template<> void parse(ydb::msg::TxnBatch &batch, const string &str) {} + +template<typename T> void ser(T &batch, string &str); +template<> void ser(ydb::pb::TxnBatch &batch, string &str) { check(batch.SerializeToString(&str)); } +template<> void ser(ydb::msg::TxnBatch &batch, string &str) {} + +template<typename T> void start_txn(T &batch); +template<> void start_txn(ydb::pb::TxnBatch &batch) {} +template<> void start_txn(ydb::msg::TxnBatch &batch) { batch.start_txn(); } + +template<typename T> void fin_txn(T &batch); +template<> void fin_txn(ydb::pb::TxnBatch &batch) {} +template<> void fin_txn(ydb::msg::TxnBatch &batch) { batch.fin_txn(); } + +template<typename T> void start_op(T &txn); +template<> void start_op(ydb::pb::Txn &txn) {} +template<> void start_op(ydb::msg::Txn &txn) { txn.start_op(); } + +template<typename T> void fin_op(T &txn); +template<> void fin_op(ydb::pb::Txn &txn) {} +template<> void fin_op(ydb::msg::Txn &txn) { txn.fin_op(); } + +template<typename T> T *new_TxnBatch(stream &s); +template<> ydb::pb::TxnBatch *new_TxnBatch(stream &s) { return new ydb::pb::TxnBatch(); } +template<> ydb::msg::TxnBatch *new_TxnBatch(stream &s) { return new ydb::msg::TxnBatch(s); } + +struct pb_types { + typedef ydb::pb::TxnBatch TxnBatch; + typedef ydb::pb::Txn Txn; + typedef ydb::pb::Op Op; + static bool is_pb() { return true; } +}; + +// rb = raw buffer +struct rb_types { + typedef ydb::msg::TxnBatch TxnBatch; + typedef ydb::msg::Txn Txn; + typedef ydb::msg::Op Op; + static bool is_pb() { return false; } +}; + END_NAMESPACE END_NAMESPACE This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |