[Assorted-commits] SF.net SVN: assorted:[1187] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-02-17 04:36:21
|
Revision: 1187 http://assorted.svn.sourceforge.net/assorted/?rev=1187&view=rev Author: yangzhang Date: 2009-02-17 04:36:14 +0000 (Tue, 17 Feb 2009) Log Message: ----------- - added st-based separate client/server to ser demo - added seqno-caching - using st_reader as reader - added writer::flush() Modified Paths: -------------- ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-17 04:33:52 UTC (rev 1186) +++ ydb/trunk/src/ser.cc 2009-02-17 04:36:14 UTC (rev 1187) @@ -1,4 +1,5 @@ #include "ser.h" +#include <commons/st/st.h> //#define USE_PB using ydb::msg::reader; @@ -22,65 +23,85 @@ #define NPBONLY(x) x #endif -//template<typename TxnBatch, typename Txn, typename Op> -void run() +const int nreps = 2; + +void producer(st_netfd_t dst) { - array<char> a(1e8); - writer w(a); - reader r(a); + writer w(dst); + reader r(dst); stream s(r,w); string str; - const int nreps = 2; - - { - TxnBatch batch NPBONLY((s)); - for (int i = 0; i < nreps; ++i) { - w.mark(); - batch.Clear(); - NPBONLY(batch.start_txn()); - for (int t = 0; t < 2; ++t) { - Txn &txn = *batch.add_txn(); - txn.set_seqno(t + 5); - NPBONLY(txn.start_op()); - for (int o = 0; o < 2; ++o) { - Op &op = *txn.add_op(); - op.set_type (Op::del); - op.set_key (3 * (o+1)); - op.set_value(4 * (o+1)); - } - NPBONLY(txn.fin_op()); + const bool show = true; + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + w.mark(); + batch.Clear(); + NPBONLY(batch.start_txn()); + for (int t = 0; t < 2; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(t + 5); + NPBONLY(txn.start_op()); + for (int o = 0; o < 2; ++o) { + Op &op = *txn.add_op(); + op.set_type (Op::del); + op.set_key (3 * (o+1)); + op.set_value(4 * (o+1)); } - NPBONLY(batch.fin_txn()); - cout << w.pos() << '/' << w.size() << endl; - PBONLY(check(batch.SerializeToString(&str))); + NPBONLY(txn.fin_op()); } + NPBONLY(batch.fin_txn()); + if (show) cout << w.pos() << '/' << w.size() << endl; + PBONLY(check(batch.SerializeToString(&str))); } + batch.Clear(); + NPBONLY(batch.start_txn()); + NPBONLY(batch.fin_txn()); + w.mark(); w.flush(); +} +void consumer(st_netfd_t src) +{ + array<char> a(1e8); + writer w(src); + reader r(src); + stream s(r,w); const bool show = true; - { - TxnBatch batch NPBONLY((s)); - for (int i = 0; i < nreps; ++i) { - batch.Clear(); - PBONLY(check(batch.ParseFromString(str))); - if (show) cout << "ntxn " << batch.txn_size() << endl; - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (show) cout << "txn seqno " << txn.seqno() << endl; - for (int o = 0; o < txn.op_size(); ++o) { - const Op &op = txn.op(o); - int otype = op.type(); - int okey = op.key(); - int oval = op.value(); - if (show) cout << "op type " << otype << " key " << okey << " value " << oval << endl; - } + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + batch.Clear(); + PBONLY(check(batch.ParseFromString(str))); + if (show) cout << "ntxn " << batch.txn_size() << endl; + //if (batch.txn_size() == 0) break; + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (show) cout << "txn seqno " << txn.seqno() << " " << txn.seqno() << endl; + for (int o = 0; o < txn.op_size(); ++o) { + const Op &op = txn.op(o); + int otype = op.type(); + int okey = op.key(); + int oval = op.value(); + if (show) + cout << "op type " << otype + << " key " << okey + << " value " << oval << endl; } } } } -int main() +int main(int argc, char **argv) { - run(); + st_init(); + bool is_leader = argc == 1; + if (is_leader) { + st_netfd_t listener = st_tcp_listen(7654); + st_netfd_t dst = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + producer(dst); + } else { + st_netfd_t src = st_tcp_connect(argv[1], 7654, ST_UTIME_NO_TIMEOUT); + consumer(src); + } return 0; } Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-17 04:33:52 UTC (rev 1186) +++ ydb/trunk/src/ser.h 2009-02-17 04:36:14 UTC (rev 1187) @@ -2,6 +2,7 @@ #define YDB_MSG_H #include <commons/array.h> +#include <commons/st/st.h> #include <iomanip> #include <iostream> #include "ydb.pb.h" @@ -15,7 +16,7 @@ using namespace commons; using namespace std; -short unset = -1; +short unset = -7654; using ydb::pb::Op_OpType; @@ -25,30 +26,37 @@ array<char> a_; char *p_; char *mark_; - array<char> &out_; + char *unsent_; + st_netfd_t out_; template<typename T> void write_(T x, char *p) { reserve(sizeof x, p); *reinterpret_cast<T*>(p) = x; } public: - writer(array<char> &out) : a_(90), p_(a_.get()), mark_(p_), out_(out) {} + writer(st_netfd_t out) : + a_(90), p_(a_.get()), mark_(p_), unsent_(a_.get()), out_(out) {} array<char> &buf() { return a_; } size_t pos() { return p_ - mark_; } size_t size() { return a_.size(); } - void mark() { mark_ = p_; /*skip<int>();*/ } - void flush() { memcpy(out_.get(), a_.get(), mark_ - a_.get()); } + void mark() { mark_ = p_; } void reserve(int n) { reserve(n, p_); } void reserve(int n, char *p) { if (p + n > a_.end()) { flush(); - memmove(a_.get(), mark_, a_.end() - mark_); size_t diff = mark_ - a_.get(); - mark_ -= diff; + memmove(a_.get(), mark_, diff); + unsent_ = mark_ = a_.get(); p_ -= diff; p -= diff; } } + void flush() { + if (mark_ - unsent_ > 0) { + st_write(out_, unsent_, mark_ - unsent_, ST_UTIME_NO_TIMEOUT); + unsent_ = mark_; + } + } void show() { cout << (void*) p_; for (size_t i = 0; i < a_.size(); ++i) @@ -64,20 +72,7 @@ template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } }; -class reader -{ - private: - array<char> &a_; - const char *p_; - public: - reader(array<char> &a) : a_(a), p_(a.get()) {} - template<typename T> T read() { - T x = *reinterpret_cast<const T*>(p_); - p_ += sizeof(T); - return x; - } - void jump(ssize_t off) { p_ += off; } -}; +typedef st_reader reader; class stream { @@ -122,10 +117,10 @@ public: Txn(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), op_(s), - nop_(unset) {} - void Clear() { w_.reserve(0*50); nop_ = unset; off_ = w_.pos(); } + nop_(unset), seqno_(unset) {} + void Clear() { w_.reserve(0*50); nop_ = unset; seqno_ = unset; off_ = w_.pos(); } void set_seqno(int x) { w_.write(x); } - int seqno() const { return r_.read<int>(); } + int seqno() const { return seqno_ == unset ? seqno_ = r_.read<int>() : seqno_; } void start_op() { w_.skip<typeof(nop_)>(); } Op *add_op() { if (nop_ == unset) nop_ = 0; ++nop_; return &op_; } void fin_op() { w_.write(nop_, off_ + sizeof(int)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |