[Assorted-commits] SF.net SVN: assorted:[1246] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-04 06:55:47
|
Revision: 1246 http://assorted.svn.sourceforge.net/assorted/?rev=1246&view=rev Author: yangzhang Date: 2009-03-04 06:55:42 +0000 (Wed, 04 Mar 2009) Log Message: ----------- - added byte length prefixes to marked segments in writer - updated ser Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/main.lzz.clamp 2009-03-04 06:55:42 UTC (rev 1246) @@ -911,6 +911,7 @@ ResponseBatch &resbatch = *presbatch; ser_t serbuf; while (true) { + uint32_t prefix = 0; long long before_read = -1; if (read_thresh > 0) { before_read = current_time_millis(); @@ -918,7 +919,7 @@ { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -931,7 +932,17 @@ w.mark(); resbatch.Clear(); start_res(resbatch); + // XXX + //char *start = reader.start(); + //const Txn &first_txn = batch.txn(0); + //if (txn.seqno() < 0) { + //} else if (txn.seqno() == seqno + 1) { + //} else { + // // Skip entire message. + // reader. + //} for (int t = 0; t < batch.txn_size(); ++t) { + // XXX const Txn &txn = t == 0 ? first_txn : batch.txn(t); const Txn &txn = batch.txn(t); // Regular transaction. const char *action; @@ -955,8 +966,11 @@ } else { if (first_seqno == -1) first_seqno = txn.seqno(); - // Queue up for later processing once a snapshot has been received. - // XXX speed up + // Queue up entire buffer for later processing once a snapshot has + // been received. + // XXX backlog.push(array()); + // Stop the loop. + // XXX t = batch.txn_size(); backlog.push(to_pb_Txn(txn)); action = "backlogged"; } @@ -1053,6 +1067,7 @@ while (true) { finally f(loop_cleanup); + uint32_t prefix = 0; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). @@ -1061,7 +1076,7 @@ try { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -1079,7 +1094,7 @@ // to get all the acks back). st_intr intr(kill_hub); if (Types::is_pb()) readmsg(reader, batch); - else batch.Clear(); + else { prefix = reader.read<uint32_t>(); batch.Clear(); } } for (int i = 0; i < batch.res_size(); ++i) { Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/ser.cc 2009-03-04 06:55:42 UTC (rev 1246) @@ -42,8 +42,9 @@ typedef typename types::Op Op; vector<st_netfd_t> dsts(1, dst); outstream os(dsts); - writer w(os, 90); - reader r(dst); + char *buf = new char[90]; + writer w(os, buf, 90); + reader r(dst, buf, 90); stream s(r,w); string str; const bool show = true; @@ -51,6 +52,7 @@ TxnBatch &batch = *p; for (int i = 0; i < nreps; ++i) { w.mark(); + w.show(); batch.Clear(); start_txn(batch); for (int t = 0; t < 2; ++t) { @@ -69,12 +71,13 @@ if (show) cout << w.pos() << '/' << w.size() << endl; if (types::is_pb()) push(batch, str, os); } + w.mark(); + w.show(); batch.Clear(); start_txn(batch); fin_txn(batch); - w.mark(); + w.mark_and_flush(); w.show(); - w.flush(); if (types::is_pb()) push(batch, str, os); } @@ -86,22 +89,26 @@ typedef typename types::Op Op; vector<st_netfd_t> v; outstream os(v); - writer w(os, 90); - reader r(src); + char *buf = new char[90]; + writer w(os, buf, 90); + reader r(src, buf, 90); stream s(r,w); string str; // XXX const bool show = true; scoped_ptr<TxnBatch> p(new_TxnBatch<TxnBatch>(s)); TxnBatch &batch = *p; while (true) { + uint32_t len; if (types::is_pb()) { - uint32_t len = r.read<uint32_t>(); + len = r.read<uint32_t>(); managed_array<char> a = r.read(len); check(batch.ParseFromArray(a.get(), len)); } else { + len = r.read<uint32_t>(); batch.Clear(); } - if (show) cout << "ntxn " << batch.txn_size() << endl; + if (show) w.show(); + if (show) cout << "len " << len << " ntxn " << batch.txn_size() << endl; if (batch.txn_size() == 0) break; for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-03-03 18:50:38 UTC (rev 1245) +++ ydb/trunk/src/ser.h 2009-03-04 06:55:42 UTC (rev 1246) @@ -69,45 +69,56 @@ NONCOPYABLE(writer) private: sized_array<char> a_; + char *unsent_; + char *mark_; char *p_; - char *mark_; - char *unsent_; boost::function<void(void*, size_t)> flushcb; char *reserve(int n, char *p) { if (p + n > a_.end()) { - assert(size_t(p - mark_ + n) <= a_.size()); + // check that the reserved space will fit + assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); + // get rid of what we have flush(); - size_t diff = mark_ - a_.get(); - memmove(a_.get(), mark_, p_ - mark_); - unsent_ = mark_ = a_.get(); + size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); + memmove(a_.get() + sizeof(uint32_t), mark_, p_ - mark_); + mark_ = (unsent_ = a_.get()) + sizeof(uint32_t); p_ -= diff; p -= diff; } return p; } + char *prefix() { return mark_ - sizeof(uint32_t); } template<typename T> void write_(T x, char *p) { *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; } public: writer(boost::function<void(void*, size_t)> flushcb, char *a, size_t buf_size) : - a_(a, buf_size), p_(a_.get()), mark_(p_), unsent_(p_), flushcb(flushcb) {} + a_(a, buf_size), unsent_(a_.get()), mark_(unsent_ + sizeof(uint32_t)), + p_(mark_), flushcb(flushcb) {} sized_array<char> &buf() { return a_; } char *cur() { return p_; } size_t pos() { return p_ - mark_; } size_t size() { return a_.size(); } - void mark() { mark_ = p_; } + void mark() { + if (p_ > mark_) { + // prefix last segment with its length + *reinterpret_cast<uint32_t*>(prefix()) = uint32_t(p_ - mark_); + // start new segment + mark_ = (p_ += sizeof(uint32_t)); + } + } void reset() { p_ = mark_; } void reserve(int n) { reserve(n, p_); } void mark_and_flush() { mark(); flush(); - unsent_ = mark_ = p_ = a_.get(); + mark_ = p_ = (unsent_ = a_.get()) + sizeof(uint32_t); } void flush() { - if (mark_ - unsent_ > 0) { - flushcb(unsent_, mark_ - unsent_); - unsent_ = mark_; + if (prefix() > unsent_) { + flushcb(unsent_, prefix() - unsent_); + unsent_ = prefix(); } } void show() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |