[Assorted-commits] SF.net SVN: assorted:[1206] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-20 06:22:10
|
Revision: 1206 http://assorted.svn.sourceforge.net/assorted/?rev=1206&view=rev Author: yangzhang Date: 2009-02-20 06:22:02 +0000 (Fri, 20 Feb 2009) Log Message: ----------- - removed the parse/ser calls - renamed ydb.o -> ydb.pb.o - got pb path working in ser - added notes to reuse serialization buffers for pb path in ydb - added some more notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-20 05:52:31 UTC (rev 1205) +++ ydb/trunk/README 2009-02-20 06:22:02 UTC (rev 1206) @@ -398,6 +398,8 @@ - TODO try making a streambuf for st_write, then try it in conj with struct-less pb - DONE dynamic switch between pb and zero-copy +- TODO fix pb recovery +- TODO implement new recovery (add buffer swapping, add buffers to a list) - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) - TODO google dense hash map Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-20 05:52:31 UTC (rev 1205) +++ ydb/trunk/src/Makefile 2009-02-20 06:22:02 UTC (rev 1206) @@ -9,7 +9,7 @@ PBS := $(wildcard *.proto) PBHDRS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) PBSRCS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) -PBOBJS := $(foreach pb,$(PBS),$(patsubst %.proto,%.o,$(pb))) +PBOBJS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.o,$(pb))) GENHDRS := $(LZZHDRS) $(PBHDRS) GENSRCS := $(LZZSRCS) $(PBSRCS) @@ -56,12 +56,12 @@ $(TARGET): $(OBJS) $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ +%.pb.o: %.pb.cc %.pb.h + $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< + %.o: %.cc $(PBHDRS) $(COMPILE.cc) $(OUTPUT_OPTION) $< -%.o: %.pb.cc %.pb.h - $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< - %.cc %.hh: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< python -c 'pars = file("lambda_impl.clamp_h").read().split("\n\n"); hh = file("main.hh").read(); print >> file("main.cc", "a"), pars[-1]; print >> file("main.hh", "w"), "\n\n".join(pars[:-1] + [hh])' @@ -98,7 +98,7 @@ ### -serperf: serperf.o ydb.o +serperf: serperf.o ydb.pb.o $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) # serperf.cc ydb.pb.h @@ -106,5 +106,5 @@ p2: p2.cc $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) -ser: ser.cc ser.h ydb.o +ser: ser.cc ser.h ydb.pb.o $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-20 05:52:31 UTC (rev 1205) +++ ydb/trunk/src/main.lzz.clamp 2009-02-20 06:22:02 UTC (rev 1206) @@ -377,7 +377,7 @@ */ template<typename T> void -bcastmsg_sync(const vector<st_netfd_t> &dsts, const T &msg) +bcastmsg_sync(const vector<st_netfd_t> &dsts, const T &msg /*, ser_t &s */) { ser_t s; ser(s, msg); @@ -394,7 +394,7 @@ */ template<typename T> void -bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) +bcastmsg(const vector<st_netfd_t> &dsts, const T &msg /* XXX optimize this , ser_t &s */) { if (use_bcast_async) bcastmsg_async(dsts, msg); else bcastmsg_sync(dsts, msg); @@ -407,6 +407,7 @@ void sendmsg(st_netfd_t dst, const T &msg) { + // XXX optimize this vector<st_netfd_t> dsts(1, dst); bcastmsg(dsts, msg); } Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-20 05:52:31 UTC (rev 1205) +++ ydb/trunk/src/ser.cc 2009-02-20 06:22:02 UTC (rev 1206) @@ -22,6 +22,17 @@ } }; +template<typename TxnBatch> +void push(TxnBatch &batch, string &str, outstream &os) { + str.clear(); + uint32_t len = 0; + str.append(sizeof len, '\0'); + check(batch.AppendToString(&str)); + len = str.size() - sizeof len; + copy((char*) &len, (char*) &len + sizeof len, str.begin()); + os(str.data(), str.size()); +} + template<typename types> void producer(st_netfd_t dst) { @@ -55,7 +66,7 @@ } fin_txn(batch); if (show) cout << w.pos() << '/' << w.size() << endl; - ser(batch, str); + if (types::is_pb()) push(batch, str, os); } batch.Clear(); start_txn(batch); @@ -63,6 +74,7 @@ w.mark(); w.show(); w.flush(); + if (types::is_pb()) push(batch, str, os); } template<typename types> @@ -81,8 +93,13 @@ scoped_ptr<TxnBatch> p(new_TxnBatch<TxnBatch>(s)); TxnBatch &batch = *p; while (true) { - batch.Clear(); - parse(batch, str); + if (types::is_pb()) { + uint32_t len = r.read<uint32_t>(); + managed_array<char> a = r.read(len); + check(batch.ParseFromArray(a.get(), len)); + } else { + batch.Clear(); + } if (show) cout << "ntxn " << batch.txn_size() << endl; if (batch.txn_size() == 0) break; for (int t = 0; t < batch.txn_size(); ++t) { @@ -107,6 +124,7 @@ st_init(); bool use_pb = argc > 1 && string("-p") == argv[1]; bool is_leader = argc == (use_pb ? 2 : 1); + cout << "use_pb " << use_pb << " is_leader " << is_leader << endl; if (is_leader) { st_netfd_t listener = st_tcp_listen(7654); st_netfd_t dst = checkerr(st_accept(listener, nullptr, nullptr, @@ -116,7 +134,7 @@ else producer<rb_types>(dst); } else { - st_netfd_t src = st_tcp_connect(argv[1], 7654, ST_UTIME_NO_TIMEOUT); + st_netfd_t src = st_tcp_connect(argv[use_pb ? 2 : 1], 7654, ST_UTIME_NO_TIMEOUT); if (use_pb) consumer<pb_types>(src); else Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-20 05:52:31 UTC (rev 1205) +++ ydb/trunk/src/ser.h 2009-02-20 06:22:02 UTC (rev 1206) @@ -160,14 +160,6 @@ bool ParseFromArray(void *p, size_t len) { throw std::exception(); } }; -template<typename T> void parse(T &batch, const string &str); -template<> void parse(ydb::pb::TxnBatch &batch, const string &str) { check(batch.ParseFromString(str)); } -template<> void parse(ydb::msg::TxnBatch &batch, const string &str) {} - -template<typename T> void ser(T &batch, string &str); -template<> void ser(ydb::pb::TxnBatch &batch, string &str) { check(batch.SerializeToString(&str)); } -template<> void ser(ydb::msg::TxnBatch &batch, string &str) {} - template<typename T> void start_txn(T &batch); template<> void start_txn(ydb::pb::TxnBatch &batch) {} template<> void start_txn(ydb::msg::TxnBatch &batch) { batch.start_txn(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |