assorted-commits Mailing List for Assorted projects (Page 29)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-02-19 08:30:23
|
Revision: 1202 http://assorted.svn.sourceforge.net/assorted/?rev=1202&view=rev Author: yangzhang Date: 2009-02-19 08:30:19 +0000 (Thu, 19 Feb 2009) Log Message: ----------- - simplified bcast-switching logic - removed REUSE_SER logic; pointless to keep orig - tried adding aspectc++, no work - removed outstream, replaced with lambdas (and moved into ser.cc) Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/Makefile 2009-02-19 08:30:19 UTC (rev 1202) @@ -36,6 +36,7 @@ ifneq ($(PB),) PB := -DUSE_PB endif +# CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) CXX := $(WTF) $(CXX) LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/main.lzz.clamp 2009-02-19 08:30:19 UTC (rev 1202) @@ -48,7 +48,6 @@ using ydb::msg::reader; using ydb::msg::writer; using ydb::msg::stream; -using ydb::msg::outstream; using ydb::pb::ResponseBatch; using ydb::pb::Response; using ydb::pb::Recovery; @@ -383,7 +382,7 @@ */ template<typename T> void -bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) +bcastmsg_sync(const vector<st_netfd_t> &dsts, const T &msg) { ser_t s; ser(s, msg); @@ -395,22 +394,26 @@ } /** - * Send a message to a single recipient. + * Send a message to some destinations, using whichever method of network IO + * was chosen (sync or async). */ template<typename T> void -sendmsg(st_netfd_t dst, const T &msg) +bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) { - vector<st_netfd_t> dsts(1, dst); - bcastmsg(dsts, msg); + if (use_bcast_async) bcastmsg_async(dsts, msg); + else bcastmsg_sync(dsts, msg); } +/** + * Send a message to a single recipient. + */ template<typename T> void -sendmsg_async(st_netfd_t dst, const T &msg) +sendmsg(st_netfd_t dst, const T &msg) { vector<st_netfd_t> dsts(1, dst); - bcastmsg_async(dsts, msg); + bcastmsg(dsts, msg); } /** @@ -528,15 +531,6 @@ mii g_map; wal *g_wal; -// Function pointer types. -typedef void (*bcasttxn_t)(const vector<st_netfd_t> &dsts, const TxnBatch &msg); -bcasttxn_t bcasttxn_async = bcastmsg_async<TxnBatch>; -bcasttxn_t bcasttxn_sync = bcastmsg<TxnBatch>; - -typedef void (*sendres_t)(st_netfd_t dst, const ResponseBatch &msg); -sendres_t sendres_async = sendmsg_async<ResponseBatch>; -sendres_t sendres_sync = sendmsg<ResponseBatch>; - /** * Keep issuing transactions to the replicas. */ @@ -544,46 +538,47 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { - bcasttxn_t bcast = use_bcast_async ? bcasttxn_async : bcasttxn_sync; - st_thread_t bcaster_thread = bcast == bcasttxn_async ? - my_spawn(bcaster, "bcaster") : nullptr; Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); finally f(lambda () { - if (__ref(bcaster_thread) != nullptr) st_join(__ref(bcaster_thread)); showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); }); reader r(nullptr); - outstream os(fds); - function<void(const void*, size_t)> fn; - if (use_wal) fn = os; - else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; + //function<void(const void*, size_t)> fn = use_wal ? + // lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); } : + // lambda(const void *buf, size_t len) { + // }; + //if (use_wal) fn = lambda(const void *buf, size_t len) {}; + //else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; // TODO why doesn't this work? // else fn = boost::bind(&wal::logbuf, g_wal); - writer w(fn, buf_size); + + writer w(lambda(const void *buf, size_t len) { + if (__ref(use_wal)) + g_wal->logbuf(buf, len); + else + foreach (st_netfd_t dst, __ref(fds)) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); + }, buf_size); stream s(r,w); TxnBatch batch NPBONLY((s)); for (int t = 0; t < batch_size; ++t) batch.add_txn(); while (!stop_hub) { w.mark(); -#ifdef REUSE_SER batch.Clear(); -#else - TxnBatch batch; -#endif // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcast(fds, batch); + bcastmsg(fds, batch); } else { sendmsg(fds[0], batch); } @@ -654,7 +649,7 @@ // Broadcast. #ifdef USE_PB if (!fds.empty() && !suppress_txn_msgs) { - bcast(fds, batch); + bcastmsg(fds, batch); } else if (use_wal) { g_wal->log(batch); } else if (force_ser) { @@ -663,6 +658,9 @@ } #endif + if (fds.empty()) + w.reset(); + // Pause? if (do_pause) do_pause.waitreset(); @@ -677,12 +675,9 @@ NPBONLY(txn.start_op()); NPBONLY(txn.fin_op()); NPBONLY(batch.fin_txn()); - PBONLY(bcast(fds, batch)); + PBONLY(bcastmsg(fds, batch)); w.mark(); w.flush(); - if (bcaster_thread != nullptr) { - msgs.push(make_pair(nullptr, shared_ptr<string>())); - } } /** @@ -815,16 +810,7 @@ // issued more since the Init message). int first_seqno = -1; - st_thread_t bcaster_thread = use_bcast_async ? - my_spawn(bcaster, "bcaster") : nullptr; - sendres_t sendmsg = use_bcast_async ? sendres_async : sendres_sync; - finally f(lambda () { - if (__ref(bcaster_thread) != nullptr) { - msgs.push(make_pair(nullptr, shared_ptr<string>())); - st_join(__ref(bcaster_thread)); - } - long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); @@ -839,8 +825,7 @@ st_reader reader(leader); vector<st_netfd_t> leader_v(1, leader); - outstream os(leader_v); - writer w(os, buf_size); + writer w(lambda(const void*, size_t) { throw std::exception(); }, buf_size); stream s(reader, w); try { @@ -864,11 +849,7 @@ } } if (batch.txn_size() > 0) { -#ifdef REUSE_SER resbatch.Clear(); -#else - ResponseBatch resbatch; -#endif for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -1614,9 +1595,20 @@ my_spawn(memmon, "memmon"); } + // Start the message broadcaster thread, if requested. + st_thread_t bcaster_thread = use_bcast_async ? + my_spawn(bcaster, "bcaster") : nullptr; + long long start = thread_start_time = current_time_millis(); - // At the end, print thread profiling information. + + // At the end, cleanly stop the bcaster thread and print thread profiling + // information. finally f(lambda() { + if (use_bcast_async) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + st_join(__ref(bcaster_thread)); + } + if (profile_threads) { long long end = current_time_millis(); long long all = end - __ref(start); @@ -1657,7 +1649,6 @@ /* * Compile-time options: * - * - REUSE_SER * - map, unordered_map, dense_hash_map * - SERIALIZATION METHOD */ Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/ser.cc 2009-02-19 08:30:19 UTC (rev 1202) @@ -5,7 +5,6 @@ using ydb::msg::reader; using ydb::msg::writer; using ydb::msg::stream; -using ydb::msg::outstream; using namespace commons; using namespace std; #ifdef USE_PB @@ -16,6 +15,18 @@ const int nreps = 2; +class outstream +{ + private: + const vector<st_netfd_t> &dsts; + public: + outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} + void operator()(const void *buf, size_t len) { + foreach (st_netfd_t dst, dsts) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); + } +}; + void producer(st_netfd_t dst) { vector<st_netfd_t> dsts(1, dst); Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-19 08:15:53 UTC (rev 1201) +++ ydb/trunk/src/ser.h 2009-02-19 08:30:19 UTC (rev 1202) @@ -33,18 +33,6 @@ // TODO try to make all of the following conform to the std interfaces, if // amenable -class outstream -{ - private: - const vector<st_netfd_t> &dsts; - public: - outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} - void operator()(const void *buf, size_t len) { - foreach (st_netfd_t dst, dsts) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); - } -}; - class writer { private: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-19 08:16:00
|
Revision: 1201 http://assorted.svn.sourceforge.net/assorted/?rev=1201&view=rev Author: yangzhang Date: 2009-02-19 08:15:53 +0000 (Thu, 19 Feb 2009) Log Message: ----------- added demo of throwing dtors Added Paths: ----------- sandbox/trunk/src/cc/throwing_dtors.cc Added: sandbox/trunk/src/cc/throwing_dtors.cc =================================================================== --- sandbox/trunk/src/cc/throwing_dtors.cc (rev 0) +++ sandbox/trunk/src/cc/throwing_dtors.cc 2009-02-19 08:15:53 UTC (rev 1201) @@ -0,0 +1,36 @@ +// From <http://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor#130123> +// +// This will cause an abort! + +#include <iostream> + +class Bad +{ + public: + ~Bad() + { + throw 1; + } +}; + +int main() +{ + try + { + Bad bad; + } + catch(...) + { + std::cout << "Print This" << std::endl; + } + + try + { + Bad bad; + throw 2; + } + catch(...) + { + std::cout << "Never print this " << std::endl; + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-19 06:58:54
|
Revision: 1200 http://assorted.svn.sourceforge.net/assorted/?rev=1200&view=rev Author: yangzhang Date: 2009-02-19 06:58:42 +0000 (Thu, 19 Feb 2009) Log Message: ----------- added sockstream demo Added Paths: ----------- sandbox/trunk/src/cc/sockstream.cc Added: sandbox/trunk/src/cc/sockstream.cc =================================================================== --- sandbox/trunk/src/cc/sockstream.cc (rev 0) +++ sandbox/trunk/src/cc/sockstream.cc 2009-02-19 06:58:42 UTC (rev 1200) @@ -0,0 +1,296 @@ +// +// sockstream.cc - various tests on socket streams, using C++ with homemade +// networkbuf, C++ with gnu's stdio_filebuf, and C with +// plain old fopen and friends. Compiles on gcc 3.3.4. +// +// Copyright 2005, Chris Frey. To God be the glory. +// You are free to use, modify, redistribute, and sublicense this code, +// as long as this copyright message is not removed from the source, +// and as long as the existence of any changes are noted in the source +// as well. (i.e. You don't need a complete history, just don't claim +// that the modified code was written entirely by me -- include your own +// copyright notice as well.) +// +// If you find this code useful, please email me and let me know. +// +// Conclusion: +// C++'s getline() processes the stream one character at a time, in order +// to search for the delimiter. Therefore, with networkbuf, which only +// fills the input buffer as much as it can with the available network +// data and then returns, it doesn't block until it really needs to, +// and gets all available lines from the kernel, even with a nice large +// buffer. +// +// With stdio_filebuf, this is built on top of C's streams, which has its +// own buffer. So the likelyhood of blocking before getting all the +// data is higher, since C's buffers may empty and get forced to fill +// with an internal fread() while C++'s getline is only asking for a +// single char. +// +// C++ streambufs and derived classes are actually pretty cool. I just +// wish they had used more user-friendly names. But as is, you can +// still use functions like xsgetn and xsputn as read and write, and +// the streambuf supplies all the needed buffering. Plus you can do it +// on a character basis, while still maintaining efficiency with +// buffered kernel calls. Plus there are iterators to work with these +// things, which I haven't fully investigated. +// +// Time to rethink my design of reuse lib's buffer classes, and turn +// them into streambufs perhaps, or at least derive streambuf interface +// classes to make use of them. Also, the transfer classes should be +// able to use streambufs as well. +// +// This stuff is complicated, and not commonly well documented, but +// it sure is useful. +// +// Chris Frey +// <cd...@ne...> +// 2005/02/13 +// + +// c++ io +#include <iostream> +#include <ext/stdio_filebuf.h> + +// c io +#include <cstdio> + +// networking +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +// posix io +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +// debugging +#include <assert.h> + +using namespace std; + +void cpp_networkbuf(int s); +void cpp_stdio_filebuf(int s); +void cpp_work(istream &nin, ostream &nout); +void c_work(int s); + +// +// networkbuf - gives a simple buffer to posix read/write calls, suitable +// for use with a network. Do not use this for plain files, +// since there is no conflict checking code for reading/writing +// to different sections of the same file. It is meant for +// processing two independent streams, just like a network. +// +class networkbuf : public streambuf +{ + int _fd; + char_type *_getbuf, *_putbuf; + size_t _getbuf_size; + size_t _putbuf_size; + +public: + networkbuf(int fd, size_t size) + : _fd(fd) + { + _getbuf = new char_type[size]; + _getbuf_size = size; + // set the input buffer to "nothing there" + setg(_getbuf, _getbuf, _getbuf); + + _putbuf = new char_type[size + 1]; // one more, for when + // overflow has extra byte + _putbuf_size = size; + // set the output buffer to "empty" + setp(_putbuf, _putbuf + _putbuf_size); + } + ~networkbuf() { + sync(); + close(_fd); + delete [] _getbuf; + _getbuf_size = 0; + delete [] _putbuf; + _putbuf_size = 0; + } + + // write out any data in the out buffer, and write c as well if c!=eof() + virtual int_type overflow (int_type c) { +// cout << "overflow called: " << c << endl; + assert( pbase() ); + bool have_extra = c != traits_type::eof(); + + // pbase() is a pointer at the start of our buffer, + // and pptr() is a pointer to the next free spot, + // so we can check for data in buffer by comparing them + if( pptr() > pbase() || have_extra ) { + // we have something to write! + ssize_t count = pptr() - pbase(); + if( have_extra ) { + // tack extra value onto the end of the buf + // (note constructor added a byte for us here, + // just in case) + *(pptr()) = traits_type::to_char_type(c); + count++; + } + + // loop and write all bytes, even if ::write() returns + // less than what we asked for + ssize_t written; + char_type *wp = pbase(); + do { + written = ::write(_fd, wp, count); + if( written > 0 ) { + count -= written; + assert(count >= 0); + wp += written; + } + else { + // fixme... do we set badbit here? + // failbit? + return traits_type::eof(); + } + } while(count); + // reset output buffer to empty state + setp(_putbuf, _putbuf + _putbuf_size); + } + return traits_type::not_eof(c); + } + + // called on ostream::flush()... return -1 here on failure + virtual int sync() { + if( overflow(traits_type::eof()) == traits_type::eof() ) + return -1; + return 0; + } + + // fill the empty buffer, and return the first char in it, + // without advancing the pointer. if EOF, return eof() and + // leave gptr() == egptr() (this is the default, since this + // function won't be called if this wasn't the case, so we + // just do nothing if EOF, below) + virtual int_type underflow () { + int_type ret = traits_type::eof(); + ssize_t count; + if( (count = ::read(_fd, _getbuf, _getbuf_size)) > 0 ) { + // set our input buffer to indicate new data + setg(_getbuf, _getbuf, _getbuf + count); + + // return new character + ret = traits_type::to_int_type(*_getbuf); + } +// cout << "underflow called: " << endl; + return ret; + } +}; + +void cpp_networkbuf(int s) +{ + networkbuf nb(s, 100); + iostream mail(&nb); + cpp_work(mail, mail); +} + +void cpp_filebuf(int s) +{ + // You must make separate filebufs here, since it seems the filebuf + // classes assume a file is seekable, while a socket is really just + // two independent data streams, one in, ont out. So treat it like + // that in code. + __gnu_cxx::stdio_filebuf<char> fbin(s, ios::in, 100); + __gnu_cxx::stdio_filebuf<char> fbout(s, ios::out, 100); + istream mailin(&fbin); + ostream mailout(&fbout); + + cpp_work(mailin, mailout); +} + +void cpp_work(istream &nin, ostream &nout) +{ + cout << "Sending data to socket..." << endl; + nout << "MAIL from: cdfrey" << endl; + nout << "RCPT to: destination-dude" << endl; +// nout << "quit" << endl; // commented out to see buffer behaviour + + cout << "Looping for data..." << endl; + char buffer[1024]; + while( nin.getline(buffer, sizeof(buffer)) ) { + cout << "socket data: " << buffer << endl; + + if( nin.fail() ) { + cout << "failbit \r" << flush << endl; + } + if( nin.bad() ) { + cout << "badbit \r" << flush << endl; + } + if( nin.eof() ) { + cout << "eofbit \r" << flush << endl; + } + } + +/* + // alternate method of copying a streambuf + cout << "Copying all socket data..." << endl; + cout << nin.rdbuf(); +*/ +} + +void c_work(int s) +{ + FILE *mail = fdopen(s, "r+"); + if( mail == NULL ) + return; + + cout << "Sending data to socket..." << endl; + + fputs("MAIL from: cdfrey\n", mail); + fputs("RCPT to: destination-dude\n", mail); + fputs("quit\n", mail); + + cout << "Looping for data..." << endl; + char buffer[1024]; + while( !feof(mail) ) { + // normal method +// fgets(buffer, sizeof(buffer), mail); + + // play with seeking on a network to test its behaviour + fseek(mail, 10, SEEK_CUR); + size_t count = fread(buffer, 1, 1, mail); + if( count == 1 ) + ungetc(buffer[0], mail); + count = fread(buffer, 1, 1, mail); + buffer[count] = 0; + if(count == 0) + cout << "(" << count << ")" << flush; + cout << buffer << flush; + } + + fclose(mail); +} + + +int main() +{ + std::ios::sync_with_stdio(false); // optional, only affects cout, etc. + + // setup socket and connect to local mail host + int s = socket(PF_INET, SOCK_STREAM, 0); + if( s == -1 ) { cerr << "socket error" << endl; return 1; } + + sockaddr_in dest; + dest.sin_family = AF_INET; + dest.sin_port = htons(25); + dest.sin_addr.s_addr = inet_addr("127.0.0.1"); + if( connect(s, (sockaddr *) &dest, sizeof(dest)) == -1 ) { + cerr << "connect error" << endl; return 1; + } + + // or use a file (for testing) +// int s = open("blah.txt", O_RDWR); + + cpp_networkbuf(s); +// cpp_filebuf(s); +// c_work(s); + return 0; +} + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 23:23:18
|
Revision: 1199 http://assorted.svn.sourceforge.net/assorted/?rev=1199&view=rev Author: yangzhang Date: 2009-02-18 21:37:09 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added alternative approach to template polymorphism Added Paths: ----------- sandbox/trunk/src/cc/tmpl_fn_ptrs2.cc Added: sandbox/trunk/src/cc/tmpl_fn_ptrs2.cc =================================================================== --- sandbox/trunk/src/cc/tmpl_fn_ptrs2.cc (rev 0) +++ sandbox/trunk/src/cc/tmpl_fn_ptrs2.cc 2009-02-18 21:37:09 UTC (rev 1199) @@ -0,0 +1,46 @@ +// An alternative approach that requires propagation of a template parameter. + +#include <iostream> + +using namespace std; + +class Write0 { + public: + template< typename tMsg > + void operator()( /*const*/ tMsg msg ) { cout << "write0: " << msg.name() << endl; }; +}; + +class Write1 { + public: + template< typename tMsg > + void operator()( /*const*/ tMsg msg ) { cout << "write1: "<< msg.name() << endl; }; +}; + +struct MsgA { const char *name() { return "MsgA"; } }; +struct MsgB { const char *name() { return "MsgB"; } }; +struct MsgC { const char *name() { return "MsgC"; } }; +struct MsgD { const char *name() { return "MsgD"; } }; + +// the Tmain does the real action +// +template< typename Writer > +int Tmain( Writer write, int argc, char** args ) { + + write( MsgA() ); + write( MsgB() ); + write( MsgB() ); + write( MsgD() ); + + return 0; +} + +// the main merely chooses the writer to use +// +int main( int argc, char** args ) { + + if( argc==1 ) + return Tmain( Write0(), argc, args); + else + return Tmain( Write1(), argc, args); + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 21:24:02
|
Revision: 1198 http://assorted.svn.sourceforge.net/assorted/?rev=1198&view=rev Author: yangzhang Date: 2009-02-18 21:23:56 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added ac++/boost demo Added Paths: ----------- sandbox/trunk/src/cc/acpp_boost.cc Added: sandbox/trunk/src/cc/acpp_boost.cc =================================================================== --- sandbox/trunk/src/cc/acpp_boost.cc (rev 0) +++ sandbox/trunk/src/cc/acpp_boost.cc 2009-02-18 21:23:56 UTC (rev 1198) @@ -0,0 +1,91 @@ +// Demo that AC++ is incompat with boost. + +#include <boost/archive/binary_iarchive.hpp> +#include <boost/archive/binary_oarchive.hpp> +#include <boost/bind.hpp> +#include <boost/program_options.hpp> +#include <boost/range/iterator_range.hpp> +#include <boost/scoped_array.hpp> +#include <boost/shared_ptr.hpp> +int main() { return 0; } + +#if 0 +$ ag++ acpp_boost.cc +/opt/armed/include/boost/mpl/aux_/config/compiler.hpp:40: error: Unexpected token `# else' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/list.hpp:36: error: Empty file name in `#include' +/opt/armed/include/boost/mpl/aux_/include_preprocessed.hpp:37: error: Empty file name in `#include' +/opt/armed/include/boost/function.hpp:64: error: `#include' expects "filename" or <filename> +/opt/armed/include/boost/function.hpp:64: error: `#include' expects "filename" or <filename> +/opt/armed/include/boost/mpl/aux_/numeric_cast_utils.hpp:32: error: `apply_wrap2' undeclared here +/opt/armed/include/boost/mpl/aux_/numeric_cast_utils.hpp:58: error: `apply_wrap2' undeclared here +/opt/armed/include/boost/serialization/tracking.hpp:49: error: invalid member declaration near token `<' +/opt/armed/include/boost/mpl/sequence_tag.hpp:107: error: invalid declaration near token `template' +/opt/armed/include/boost/mpl/sequence_tag.hpp:120: error: expression expected +/opt/armed/include/boost/mpl/sequence_tag.hpp:120: error: invalid declaration near token `template' +/opt/armed/include/boost/mpl/empty.hpp:25: error: invalid declaration near token `template' +/opt/armed/include/boost/archive/detail/iserializer.hpp:351: error: `mpl::equal_to' undeclared here +/opt/armed/include/boost/archive/binary_iarchive.hpp:91: error: `boost::mpl::apply1' undeclared here +/opt/armed/include/boost/archive/detail/oserializer.hpp:254: error: `mpl::equal_to' undeclared here +/opt/armed/include/boost/archive/detail/oserializer.hpp:513: error: invalid member declaration near token `<' +/opt/armed/include/boost/archive/binary_oarchive.hpp:53: error: `boost::mpl::apply1' undeclared here +/opt/armed/include/boost/mpl/size_t_fwd.hpp:23: error: wrong use of `size_t' +/opt/armed/include/boost/mpl/size_t_fwd.hpp:26: error: `size_t' is not a member of `::mpl_' +/opt/armed/include/boost/utility/base_from_member.hpp:75: error: invalid member declaration near token `template' +/opt/armed/include/boost/iterator/iterator_categories.hpp:158: error: invalid type in declaration of parameter `%anon19950' +/opt/armed/include/boost/iterator/interoperable.hpp:34: error: invalid declaration near token `template' +/opt/armed/include/boost/type_traits/is_function.hpp:56: error: invalid declaration near token `template' +/opt/armed/include/boost/type_traits/is_function.hpp:97: error: `::boost::detail::is_function_impl' undeclared here +/opt/armed/include/boost/detail/indirect_traits.hpp:60: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:72: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:96: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:115: error: `is_reference_to_function_pointer_aux' undeclared here +/opt/armed/include/boost/detail/indirect_traits.hpp:120: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:136: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:154: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:159: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:164: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:169: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:174: error: invalid declaration near token `template' +/opt/armed/include/boost/detail/indirect_traits.hpp:188: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:58: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:89: error: `mpl::and_' undeclared here +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:117: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:151: error: `mpl_assertion_in_line_141' already defined +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:145: previously defined here +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:166: error: invalid member declaration near token `enum' +/opt/armed/include/boost/iterator/detail/facade_iterator_category.hpp:189: error: `is_iterator_category' undeclared here +/opt/armed/include/boost/iterator/detail/enable_if.hpp:66: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/iterator_facade.hpp:62: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/iterator_facade.hpp:102: error: invalid member declaration near token `typename' +/opt/armed/include/boost/iterator/iterator_facade.hpp:272: error: `mpl::and_' undeclared here +/opt/armed/include/boost/iterator/iterator_facade.hpp:375: error: `mpl::and_' undeclared here +/opt/armed/include/boost/iterator/iterator_facade.hpp:379: error: `iterator_writability_disabled' undeclared here +/opt/armed/include/boost/iterator/iterator_facade.hpp:388: error: `use_operator_brackets_proxy' undeclared here +/opt/armed/include/boost/iterator/iterator_facade.hpp:484: error: invalid member declaration near token `template' +/opt/armed/include/boost/iterator/iterator_facade.hpp:616: error: cannot declare `boost::detail::iterator_facade_types' within `iterator_facade' +/opt/armed/include/boost/iterator/iterator_facade.hpp:724: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/iterator_adaptor.hpp:148: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/iterator_adaptor.hpp:207: error: `iterator_traversal' undeclared here +/opt/armed/include/boost/iterator/iterator_adaptor.hpp:208: error: cannot declare `::type' within `iterator_adaptor_base' +/opt/armed/include/boost/iterator/iterator_adaptor.hpp:259: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/reverse_iterator.hpp:20: error: invalid declaration near token `template' +/opt/armed/include/boost/iterator/reverse_iterator.hpp:61: error: invalid declaration near token `template' +/opt/armed/include/boost/range/reverse_iterator.hpp:32: error: invalid member declaration near token `<' +error: Execution failed: "ac++" --config "/tmp/agxx_pcfgFqhTIn" -p. -c "acpp_boost.cc" -o "/tmp/acpp_boost.cc_agxx_cAwBb5" +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 18:32:02
|
Revision: 1197 http://assorted.svn.sourceforge.net/assorted/?rev=1197&view=rev Author: yangzhang Date: 2009-02-18 18:31:55 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added question submitted to stackoverflow Added Paths: ----------- sandbox/trunk/src/cc/tmpl_fn_ptrs.cc Added: sandbox/trunk/src/cc/tmpl_fn_ptrs.cc =================================================================== --- sandbox/trunk/src/cc/tmpl_fn_ptrs.cc (rev 0) +++ sandbox/trunk/src/cc/tmpl_fn_ptrs.cc 2009-02-18 18:31:55 UTC (rev 1197) @@ -0,0 +1,46 @@ + // How do I do set templated function pointers? + // + // See <http://stackoverflow.com/questions/560322/how-else-to-achieve-templated-function-pointers>. + + #include <iostream> + using namespace std; + + template<typename T> void write0(T msg) { cout << "write0: " << msg.name() << endl; } + template<typename T> void write1(T msg) { cout << "write1: " << msg.name() << endl; } + + // This isn't so bad, since it's just a conditional (which the processor will + // likely predict correctly most of the time). + bool use_write0; + template<typename T> void write(T msg) { if (use_write0) write0(msg); else write1(msg); } + + struct MsgA { const char *name() { return "MsgA"; } }; + struct MsgB { const char *name() { return "MsgB"; } }; + struct MsgC { const char *name() { return "MsgC"; } }; + struct MsgD { const char *name() { return "MsgD"; } }; + + // This doesn't work: templates may not be virtual. + #if 0 + struct Writer { template<typename T> virtual void write(T msg) = 0; }; + struct Writer0 { template<typename T> virtual void write(T msg) { cout << "write0: " << msg.name() << endl; } }; + struct Writer1 { template<typename T> virtual void write(T msg) { cout << "write0: " << msg.name() << endl; } }; + #endif + + int main(int argc, char **argv) { + use_write0 = argc == 1; + + // I can do this: + write(MsgA()); + + // Can I achieve the following without the verbosity (manual setup, named + // template instantiations, etc.)? + void (*pwriteA)(MsgA) = use_write0 ? (void(*)(MsgA)) write0<MsgA> : (void(*)(MsgA)) write1<MsgA>; + void (*pwriteB)(MsgB) = use_write0 ? (void(*)(MsgB)) write0<MsgB> : (void(*)(MsgB)) write1<MsgB>; + void (*pwriteC)(MsgC) = use_write0 ? (void(*)(MsgC)) write0<MsgC> : (void(*)(MsgC)) write1<MsgC>; + void (*pwriteD)(MsgD) = use_write0 ? (void(*)(MsgD)) write0<MsgD> : (void(*)(MsgD)) write1<MsgD>; + pwriteA(MsgA()); + pwriteB(MsgB()); + pwriteC(MsgC()); + pwriteD(MsgD()); + + return 0; + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 18:30:56
|
Revision: 1196 http://assorted.svn.sourceforge.net/assorted/?rev=1196&view=rev Author: yangzhang Date: 2009-02-18 18:30:53 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added demo of templated virtual functions Added Paths: ----------- sandbox/trunk/src/cc/tmpl_virt_fn.cc Added: sandbox/trunk/src/cc/tmpl_virt_fn.cc =================================================================== --- sandbox/trunk/src/cc/tmpl_virt_fn.cc (rev 0) +++ sandbox/trunk/src/cc/tmpl_virt_fn.cc 2009-02-18 18:30:53 UTC (rev 1196) @@ -0,0 +1,27 @@ +// Demo that you can't have templated virtual functions. + +class IWrite { + public: + template< typename tMsg > + virtual void write( const tMsg& msg ) = 0; +}; + +class Write0 { + public: + template< typename tMsg > + virtual void write( const tMsg& msg ) { cout << "Write0" << msg.name(); }; +}; +class Write1 { + public: + template< typename tMsg > + virtual void write( const tMsg& msg ) { cout << msg.name(); }; +}; + +int main( int argc, char** args ) { + // here's your function pointer, neatly wrapped inside a vtable :) + IWrite* pWriter = argc==1? new Write0 : new Write1; + + pWriter->write( CMsgA() ); + pWriter->write( CMsgC() ); + pWriter->write( CMsgB() ); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 08:22:46
|
Revision: 1195 http://assorted.svn.sourceforge.net/assorted/?rev=1195&view=rev Author: yangzhang Date: 2009-02-18 08:22:40 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added demo of a conditional auto variable Added Paths: ----------- sandbox/trunk/src/cc/conditional_auto.cc Added: sandbox/trunk/src/cc/conditional_auto.cc =================================================================== --- sandbox/trunk/src/cc/conditional_auto.cc (rev 0) +++ sandbox/trunk/src/cc/conditional_auto.cc 2009-02-18 08:22:40 UTC (rev 1195) @@ -0,0 +1,32 @@ +// Demo of conditionally introducing a stack variable. +// +// This came from an issue in ydb where I wanted to be able to dynamically +// control whether protocol buffer objects were reused. + +#include <alloca.h> +#include <iostream> + +using namespace std; + +struct c { + c() { cout << "c()" << endl; } + ~c() { cout << "~c()" << endl; } + void f() { cout << "c.f()" << endl; } +}; + +void f(bool b) { + cout << "f()" << endl; + c outer; + void *buf = alloca(sizeof(c)); + do { + c *p = b ? new(buf) c() : &outer; + c &ref = *p; + ref.f(); + if (&ref == buf) ref.~c(); + } while (false); +} + +int main() { + f(false); + f(true); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 06:01:28
|
Revision: 1194 http://assorted.svn.sourceforge.net/assorted/?rev=1194&view=rev Author: yangzhang Date: 2009-02-18 06:01:21 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added cog demo Added Paths: ----------- sandbox/trunk/src/misc/cog/ sandbox/trunk/src/misc/cog/test.cc.cog sandbox/trunk/src/misc/cog/test.mk Added: sandbox/trunk/src/misc/cog/test.cc.cog =================================================================== --- sandbox/trunk/src/misc/cog/test.cc.cog (rev 0) +++ sandbox/trunk/src/misc/cog/test.cc.cog 2009-02-18 06:01:21 UTC (rev 1194) @@ -0,0 +1,11 @@ +#include <iostream> +using namespace std; +int main() { + // [[[cog + // import cog + // for s in ['hello', 'world']: + // cog.outl('cout << "%s" << endl;' % s) + // ]]] + // [[[end]]] + return 0; +} Added: sandbox/trunk/src/misc/cog/test.mk =================================================================== --- sandbox/trunk/src/misc/cog/test.mk (rev 0) +++ sandbox/trunk/src/misc/cog/test.mk 2009-02-18 06:01:21 UTC (rev 1194) @@ -0,0 +1,4 @@ +all: test + +test.cc: test.cc.cog + cog.py $< > $@ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 00:30:10
|
Revision: 1193 http://assorted.svn.sourceforge.net/assorted/?rev=1193&view=rev Author: yangzhang Date: 2009-02-18 00:30:07 +0000 (Wed, 18 Feb 2009) Log Message: ----------- updated boost_function.cc with alternative syntax Modified Paths: -------------- sandbox/trunk/src/cc/boost_function.cc Modified: sandbox/trunk/src/cc/boost_function.cc =================================================================== --- sandbox/trunk/src/cc/boost_function.cc 2009-02-18 00:29:41 UTC (rev 1192) +++ sandbox/trunk/src/cc/boost_function.cc 2009-02-18 00:30:07 UTC (rev 1193) @@ -9,7 +9,20 @@ // This works... function<void()> f = bind(show, 0); bind(twice, f)(); - // ...but this does not compile. - bind(twice, bind(show, 0))(); + + // ...but this does not compile: + // + // bind(twice, bind(show, 0))(); + // + // error: conversion from ‘void’ to non-scalar type + // ‘boost::function0<void>’ requested. + + // This works... + { + function<void(int)> f = show; + function1<void, int> g = f; + function<void(int)> h = g; + h(0); + } return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 00:29:46
|
Revision: 1192 http://assorted.svn.sourceforge.net/assorted/?rev=1192&view=rev Author: yangzhang Date: 2009-02-18 00:29:41 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added google dense hash map demo Added Paths: ----------- sandbox/trunk/src/cc/google_sparsehash.cc Added: sandbox/trunk/src/cc/google_sparsehash.cc =================================================================== --- sandbox/trunk/src/cc/google_sparsehash.cc (rev 0) +++ sandbox/trunk/src/cc/google_sparsehash.cc 2009-02-18 00:29:41 UTC (rev 1192) @@ -0,0 +1,9 @@ +#include <google/dense_hash_map> +using namespace google; +int main() { + dense_hash_map<int,int> m; + // You need to set this! + m.set_empty_key(0); + m[1] = 2; + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-18 00:29:34
|
Revision: 1191 http://assorted.svn.sourceforge.net/assorted/?rev=1191&view=rev Author: yangzhang Date: 2009-02-18 00:29:29 +0000 (Wed, 18 Feb 2009) Log Message: ----------- added clamp memory safety demo Added Paths: ----------- sandbox/trunk/src/cc/clamp.cc.clamp sandbox/trunk/src/cc/clamp.mk Added: sandbox/trunk/src/cc/clamp.cc.clamp =================================================================== --- sandbox/trunk/src/cc/clamp.cc.clamp (rev 0) +++ sandbox/trunk/src/cc/clamp.cc.clamp 2009-02-18 00:29:29 UTC (rev 1191) @@ -0,0 +1,27 @@ +// Examine memory safety of using context-less lambdas. +// Turns out that the lambdas can be copy constructed +// and that function0<> stores a copy of the functor. +// Everything works as expected. +#include <iostream> +#include <boost/function.hpp> +using namespace std; +using namespace boost; +#include "lambda_impl.clamp_h" +struct save { + save(function<void()> f) : f(f) {} + void call() { f(); } + function<void()> f; +}; +int main() { + int x = 0; + save s(lambda() { cout << "0. " << __ctx(x) << endl; }); + ++x; + save t(lambda() { cout << "1. " << __ctx(x) << endl; }); + s.call(); + t.call(); + + ++x; + function<void()> f = lambda() { cout << "2. " << __ctx(x) << endl; }; + f(); + return 0; +} Added: sandbox/trunk/src/cc/clamp.mk =================================================================== --- sandbox/trunk/src/cc/clamp.mk (rev 0) +++ sandbox/trunk/src/cc/clamp.mk 2009-02-18 00:29:29 UTC (rev 1191) @@ -0,0 +1,4 @@ +all: clamp + +clamp.cc: clamp.cc.clamp + clamp < $< | sed '1d' > $@ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-17 23:16:08
|
Revision: 1188 http://assorted.svn.sourceforge.net/assorted/?rev=1188&view=rev Author: yangzhang Date: 2009-02-17 22:38:36 +0000 (Tue, 17 Feb 2009) Log Message: ----------- - made writer more generic; takes any callback for flush/overflow - added zero-copy (ydb::msg) to main ydb - fixed some bugs in reader/writer - updated default --batch-size=100 and --write-buf=10000 Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-17 04:36:14 UTC (rev 1187) +++ ydb/trunk/src/Makefile 2009-02-17 22:38:36 UTC (rev 1188) @@ -33,6 +33,9 @@ else OPT := -g3 endif +ifneq ($(PB),) + PB := -DUSE_PB +endif CXX := $(WTF) $(CXX) LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ @@ -44,7 +47,7 @@ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Wno-inline -Wsynth -std=gnu++0x $(CXXFLAGS) + -Wno-inline -Wsynth -std=gnu++0x $(PB) $(CXXFLAGS) PBCXXFLAGS := $(OPT) -Wall -Werror $(GPROF) all: $(TARGET) @@ -73,6 +76,8 @@ %.lzz: %.lzz.clamp clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ +main.o: ser.h + all.h: fgrep '#include' main.lzz.clamp > all.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-17 04:36:14 UTC (rev 1187) +++ ydb/trunk/src/main.lzz.clamp 2009-02-17 22:38:36 UTC (rev 1188) @@ -28,18 +28,40 @@ #include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" +//#define USE_PB +#include "ser.h" + +#define function boost::function #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref #define REUSE_SER + using namespace boost; using namespace boost::archive; using namespace commons; using namespace google; using namespace std; +using namespace std::tr1; using namespace testing; -using namespace tr1; +using ydb::msg::reader; +using ydb::msg::writer; +using ydb::msg::stream; +using ydb::msg::outstream; +using ydb::pb::ResponseBatch; +using ydb::pb::Response; +using ydb::pb::Recovery; +using ydb::pb::Recovery_Pair; +using ydb::pb::Init; +using ydb::pb::Join; +using ydb::pb::SockAddr; +#ifdef USE_PB +using namespace ydb::pb; +#else +using namespace ydb::msg; +#endif + #define GETMSG(buf) \ checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ if (stop_time != nullptr) \ @@ -58,7 +80,7 @@ st_utime_t timeout; int chkpt, accept_joiner_seqno, issuing_interval, min_ops, max_ops, stop_on_seqno, batch_size; -size_t accept_joiner_size; +size_t accept_joiner_size, buf_size; bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, @@ -479,6 +501,9 @@ wal() : of("wal"), out(of) {} template <typename T> void log(const T &msg) { ser(of, msg); } + void logbuf(const void *buf, size_t len) { + of.write(reinterpret_cast<const char*>(buf), len); + } #if 0 void del(int key) { int op = op_del; // TODO: is this really necessary? @@ -533,10 +558,20 @@ 0); }); - TxnBatch batch; + reader r(nullptr); + outstream os(fds); + function<void(const void*, size_t)> fn; + if (use_wal) fn = os; + else fn = lambda(const void *buf, size_t len) { g_wal->logbuf(buf, len); }; + // TODO why doesn't this work? + // else fn = boost::bind(&wal::logbuf, g_wal); + writer w(fn, buf_size); + stream s(r,w); + TxnBatch batch NPBONLY((s)); for (int t = 0; t < batch_size; ++t) batch.add_txn(); while (!stop_hub) { + w.mark(); #ifdef REUSE_SER batch.Clear(); #else @@ -559,10 +594,12 @@ } // Generate some random transactions. + NPBONLY(batch.start_txn()); for (int t = 0; t < batch_size; ++t) { Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); int count = randint(min_ops, max_ops + 1); + NPBONLY(txn.start_op()); for (int o = 0; o < count; ++o) { Op *op = txn.add_op(); int rtype = general_txns ? randint(3) : 1, @@ -572,20 +609,20 @@ op->set_key(rkey); op->set_value(rvalue); } + NPBONLY(txn.fin_op()); // Process immediately if not bcasting. if (fds.empty()) { --seqno; process_txn(g_map, txn, seqno, nullptr); } - ++seqno; // Checkpoint. - if (txn.seqno() % chkpt == 0) { + if (seqno % chkpt == 0) { if (verbose) - cout << "issued txn " << txn.seqno() << endl; + cout << "issued txn " << seqno << endl; if (timelim > 0 && current_time_millis() - start_time > timelim) { - cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + cout << "time's up; issued " << seqno << " txns in " << timelim << " ms" << endl; stop_hub.set(); } @@ -598,19 +635,24 @@ } // Are we to accept a new joiner? - if (txn.seqno() == accept_joiner_seqno) { + if (seqno == accept_joiner_seqno) { accept_joiner.set(); } // Set the stopping seqno. - if (txn.seqno() == stop_on_seqno) { - cout << "stopping on issue of seqno " << txn.seqno() << endl; + if (seqno == stop_on_seqno) { + cout << "stopping on issue of seqno " << seqno << endl; stop_hub.set(); break; } + + ++seqno; } + NPBONLY(batch.fin_txn()); + NPBONLY(if (batch.txn_size() == 0) w.reset()); // Broadcast. +#ifdef USE_PB if (!fds.empty() && !suppress_txn_msgs) { bcast(fds, batch); } else if (use_wal) { @@ -619,6 +661,7 @@ string s; ser(s, batch); } +#endif // Pause? if (do_pause) @@ -626,10 +669,17 @@ } // This means "The End." + w.mark(); batch.Clear(); + NPBONLY(batch.start_txn()); Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcast(fds, batch); + NPBONLY(txn.start_op()); + NPBONLY(txn.fin_op()); + NPBONLY(batch.fin_txn()); + PBONLY(bcast(fds, batch)); + w.mark(); + w.flush(); if (bcaster_thread != nullptr) { msgs.push(make_pair(nullptr, shared_ptr<string>())); } @@ -652,6 +702,7 @@ if (!fake_exec) { for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); + const char type = op.type(); const int key = op.key(); mii::iterator it = map.find(key); if (show_updates || count_updates) { @@ -660,7 +711,7 @@ if (count_updates) ++updates; } } - switch (op.type()) { + switch (type) { case Op::read: if (res != nullptr) { if (it == map.end()) res->add_result(0); @@ -668,10 +719,13 @@ } break; case Op::write: - //if (use_wal) wal.write(key, op.value()); - if (it == map.end()) map[key] = op.value(); - else it->second = op.value(); - break; + { + int value = op.value(); + //if (use_wal) wal.write(key, value); + if (it == map.end()) map[key] = value; + else it->second = value; + break; + } case Op::del: if (it != map.end()) { //if (use_wal) wal.del(key); @@ -784,9 +838,13 @@ }); st_reader reader(leader); + vector<st_netfd_t> leader_v(1, leader); + outstream os(leader_v); + writer w(os, buf_size); + stream s(reader, w); try { - TxnBatch batch; + TxnBatch batch NPBONLY((s)); ResponseBatch resbatch; while (true) { long long before_read = -1; @@ -795,7 +853,8 @@ } { st_intr intr(stop_hub); - readmsg(reader, batch); + PBONLY(readmsg(reader, batch)); + NPBONLY(batch.Clear()); } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -849,7 +908,7 @@ if (resbatch.res_size() > 0) sendmsg(leader, resbatch); } else { - // Empty (default) Txn means "generate a snapshot." + // Empty (default) TxnBatch means "generate a snapshot." // TODO make this faster shared_ptr<Recovery> recovery(new Recovery); typedef ::map<int, int> mii_; @@ -1456,7 +1515,7 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader only)") - ("batch-size,b", po::value<int>(&batch_size)->default_value(10), + ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader only)") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader only)") @@ -1483,6 +1542,8 @@ ("leader-port,P", po::value<uint16_t>(&leader_port)->default_value(7654), "port the leader listens on") + ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), + "size of the outgoing (write) buffer in bytes") ("chkpt,c", po::value<int>(&chkpt)->default_value(1000), "number of txns before yielding/verbose printing") ("timelim,T", po::value<long long>(&timelim)->default_value(0), @@ -1592,3 +1653,11 @@ return 1; } } + +/* + * Compile-time options: + * + * - REUSE_SER + * - map, unordered_map, dense_hash_map + * - SERIALIZATION METHOD + */ Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-17 04:36:14 UTC (rev 1187) +++ ydb/trunk/src/ser.cc 2009-02-17 22:38:36 UTC (rev 1188) @@ -5,6 +5,7 @@ using ydb::msg::reader; using ydb::msg::writer; using ydb::msg::stream; +using ydb::msg::outstream; using namespace commons; using namespace std; #ifdef USE_PB @@ -13,21 +14,13 @@ using namespace ydb::msg; #endif -#ifdef USE_PB -#define PBSWITCH(a,b) a -#define PBONLY(x) x -#define NPBONLY(x) -#else -#define PBSWITCH(a,b) b -#define PBONLY(x) -#define NPBONLY(x) x -#endif - const int nreps = 2; void producer(st_netfd_t dst) { - writer w(dst); + vector<st_netfd_t> dsts(1, dst); + outstream os(dsts); + writer w(os, 90); reader r(dst); stream s(r,w); string str; @@ -62,8 +55,9 @@ void consumer(st_netfd_t src) { - array<char> a(1e8); - writer w(src); + vector<st_netfd_t> v; + outstream os(v); + writer w(os, 90); reader r(src); stream s(r,w); const bool show = true; Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-17 04:36:14 UTC (rev 1187) +++ ydb/trunk/src/ser.h 2009-02-17 22:38:36 UTC (rev 1188) @@ -7,6 +7,16 @@ #include <iostream> #include "ydb.pb.h" +#ifdef USE_PB +#define PBSWITCH(a,b) a +#define PBONLY(x) x +#define NPBONLY(x) +#else +#define PBSWITCH(a,b) b +#define PBONLY(x) +#define NPBONLY(x) x +#endif + #define BEGIN_NAMESPACE(ns) namespace ns { #define END_NAMESPACE } @@ -20,29 +30,32 @@ using ydb::pb::Op_OpType; +// TODO try to make all of the following conform to the std interfaces, if +// amenable + +class outstream +{ + private: + const vector<st_netfd_t> &dsts; + public: + outstream(const vector<st_netfd_t> &dsts) : dsts(dsts) {} + void operator()(const void *buf, size_t len) { + foreach (st_netfd_t dst, dsts) + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), ssize_t(len)); + } +}; + class writer { private: - array<char> a_; + commons::array<char> a_; char *p_; char *mark_; char *unsent_; - st_netfd_t out_; - template<typename T> - void write_(T x, char *p) { - reserve(sizeof x, p); - *reinterpret_cast<T*>(p) = x; - } - public: - writer(st_netfd_t out) : - a_(90), p_(a_.get()), mark_(p_), unsent_(a_.get()), out_(out) {} - array<char> &buf() { return a_; } - size_t pos() { return p_ - mark_; } - size_t size() { return a_.size(); } - void mark() { mark_ = p_; } - void reserve(int n) { reserve(n, p_); } - void reserve(int n, char *p) { + boost::function<void(void*, size_t)> flushcb; + char *reserve(int n, char *p) { if (p + n > a_.end()) { + assert(size_t(p - mark_ + n) <= a_.size()); flush(); size_t diff = mark_ - a_.get(); memmove(a_.get(), mark_, diff); @@ -50,10 +63,24 @@ p_ -= diff; p -= diff; } + return p; } + template<typename T> + void write_(T x, char *p) { + *reinterpret_cast<T*>(reserve(sizeof x, p)) = x; + } + public: + writer(boost::function<void(void*, size_t)> flushcb, size_t buf_size) : + a_(buf_size), p_(a_.get()), mark_(p_), unsent_(a_.get()), flushcb(flushcb) {} + commons::array<char> &buf() { return a_; } + size_t pos() { return p_ - mark_; } + size_t size() { return a_.size(); } + void mark() { mark_ = p_; } + void reset() { p_ = mark_; } + void reserve(int n) { reserve(n, p_); } void flush() { if (mark_ - unsent_ > 0) { - st_write(out_, unsent_, mark_ - unsent_, ST_UTIME_NO_TIMEOUT); + flushcb(unsent_, mark_ - unsent_); unsent_ = mark_; } } @@ -143,8 +170,15 @@ void start_txn() { w_.skip<typeof(ntxn_)>(); } Txn *add_txn() { if (ntxn_ == unset) ntxn_ = 0; ++ntxn_; txn_.Clear(); return &txn_; } void fin_txn() { w_.write(ntxn_, off_); } - int txn_size() const { if (ntxn_ == unset) ntxn_ = r_.read<typeof(ntxn_)>(); return ntxn_; } + int txn_size() const { + if (ntxn_ == unset) + ntxn_ = r_.read<typeof(ntxn_)>(); + return ntxn_; + } const Txn &txn(int t) const { txn_.Clear(); return txn_; } + bool AppendToString(string *s) const { throw std::exception(); } + bool SerializeToString(string *s) const { throw std::exception(); } + bool SerializeToOstream(ostream *s) const { throw std::exception(); } }; END_NAMESPACE This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-17 23:16:08
|
Revision: 1189 http://assorted.svn.sourceforge.net/assorted/?rev=1189&view=rev Author: yangzhang Date: 2009-02-17 22:39:12 +0000 (Tue, 17 Feb 2009) Log Message: ----------- introduced PB-building; disabled p2-building Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-17 22:38:36 UTC (rev 1188) +++ ydb/trunk/tools/test.bash 2009-02-17 22:39:12 UTC (rev 1189) @@ -198,8 +198,9 @@ refresh-local cd ~/ydb/src make clean + # PB=1 PPROF=1 OPT=1 make WTF= PPROF=1 OPT=1 make WTF= - PPROF=1 OPT=1 make WTF= p2 + # PPROF=1 OPT=1 make WTF= p2 } init-setup() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-17 23:11:09
|
Revision: 1190 http://assorted.svn.sourceforge.net/assorted/?rev=1190&view=rev Author: yangzhang Date: 2009-02-17 22:39:20 +0000 (Tue, 17 Feb 2009) Log Message: ----------- added some more notes post-meeting Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-17 22:39:12 UTC (rev 1189) +++ ydb/trunk/README 2009-02-17 22:39:20 UTC (rev 1190) @@ -333,7 +333,7 @@ - DONE make readmsg perform fewer syscalls (buffer opportunistically) - like magic: now can sustain 90 Ktps all the way up through 3 xacts! -Period 2/12-2/19 +Period 2/12-2/17 - DONE p2 prototype - some interesting performance bugs @@ -373,13 +373,33 @@ - 1: 220 -> <240 - 3: 220 -> <240 - DONE try adding fake-execution - - made a huge difference + - made a huge difference, but not in the direction we want - -1: 680K - 0: 2M - 1: 730K - 2: 600K - 3: 450K -- TODO commit +- DONE try reverting to map + - still not in the direction we want + - -1: 122K + - 0: 140K + - 1: 122K + - 2: 122K/107K + - 3: 122K/122K/97K +- DONE commit +- DONE add zero-copy structs/(de-)serialization + +Period 2/17- + +- TODO dynamic switch between pb and zero-copy +- TODO async (threaded) wal +- TODO 0-node 0-copy (need to use threads) +- TODO google dense hash map + +- TODO show aries-write +- TODO checkpointing + replaying log from replicas (not from disk) +- TODO scale-up on multicore + - TODO remove extraneous copies; use custom buffer-backed data structures designed for serialization/deserialization - TODO flushing This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-17 04:36:21
|
Revision: 1187 http://assorted.svn.sourceforge.net/assorted/?rev=1187&view=rev Author: yangzhang Date: 2009-02-17 04:36:14 +0000 (Tue, 17 Feb 2009) Log Message: ----------- - added st-based separate client/server to ser demo - added seqno-caching - using st_reader as reader - added writer::flush() Modified Paths: -------------- ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc 2009-02-17 04:33:52 UTC (rev 1186) +++ ydb/trunk/src/ser.cc 2009-02-17 04:36:14 UTC (rev 1187) @@ -1,4 +1,5 @@ #include "ser.h" +#include <commons/st/st.h> //#define USE_PB using ydb::msg::reader; @@ -22,65 +23,85 @@ #define NPBONLY(x) x #endif -//template<typename TxnBatch, typename Txn, typename Op> -void run() +const int nreps = 2; + +void producer(st_netfd_t dst) { - array<char> a(1e8); - writer w(a); - reader r(a); + writer w(dst); + reader r(dst); stream s(r,w); string str; - const int nreps = 2; - - { - TxnBatch batch NPBONLY((s)); - for (int i = 0; i < nreps; ++i) { - w.mark(); - batch.Clear(); - NPBONLY(batch.start_txn()); - for (int t = 0; t < 2; ++t) { - Txn &txn = *batch.add_txn(); - txn.set_seqno(t + 5); - NPBONLY(txn.start_op()); - for (int o = 0; o < 2; ++o) { - Op &op = *txn.add_op(); - op.set_type (Op::del); - op.set_key (3 * (o+1)); - op.set_value(4 * (o+1)); - } - NPBONLY(txn.fin_op()); + const bool show = true; + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + w.mark(); + batch.Clear(); + NPBONLY(batch.start_txn()); + for (int t = 0; t < 2; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(t + 5); + NPBONLY(txn.start_op()); + for (int o = 0; o < 2; ++o) { + Op &op = *txn.add_op(); + op.set_type (Op::del); + op.set_key (3 * (o+1)); + op.set_value(4 * (o+1)); } - NPBONLY(batch.fin_txn()); - cout << w.pos() << '/' << w.size() << endl; - PBONLY(check(batch.SerializeToString(&str))); + NPBONLY(txn.fin_op()); } + NPBONLY(batch.fin_txn()); + if (show) cout << w.pos() << '/' << w.size() << endl; + PBONLY(check(batch.SerializeToString(&str))); } + batch.Clear(); + NPBONLY(batch.start_txn()); + NPBONLY(batch.fin_txn()); + w.mark(); w.flush(); +} +void consumer(st_netfd_t src) +{ + array<char> a(1e8); + writer w(src); + reader r(src); + stream s(r,w); const bool show = true; - { - TxnBatch batch NPBONLY((s)); - for (int i = 0; i < nreps; ++i) { - batch.Clear(); - PBONLY(check(batch.ParseFromString(str))); - if (show) cout << "ntxn " << batch.txn_size() << endl; - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (show) cout << "txn seqno " << txn.seqno() << endl; - for (int o = 0; o < txn.op_size(); ++o) { - const Op &op = txn.op(o); - int otype = op.type(); - int okey = op.key(); - int oval = op.value(); - if (show) cout << "op type " << otype << " key " << okey << " value " << oval << endl; - } + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + batch.Clear(); + PBONLY(check(batch.ParseFromString(str))); + if (show) cout << "ntxn " << batch.txn_size() << endl; + //if (batch.txn_size() == 0) break; + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (show) cout << "txn seqno " << txn.seqno() << " " << txn.seqno() << endl; + for (int o = 0; o < txn.op_size(); ++o) { + const Op &op = txn.op(o); + int otype = op.type(); + int okey = op.key(); + int oval = op.value(); + if (show) + cout << "op type " << otype + << " key " << okey + << " value " << oval << endl; } } } } -int main() +int main(int argc, char **argv) { - run(); + st_init(); + bool is_leader = argc == 1; + if (is_leader) { + st_netfd_t listener = st_tcp_listen(7654); + st_netfd_t dst = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + producer(dst); + } else { + st_netfd_t src = st_tcp_connect(argv[1], 7654, ST_UTIME_NO_TIMEOUT); + consumer(src); + } return 0; } Modified: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h 2009-02-17 04:33:52 UTC (rev 1186) +++ ydb/trunk/src/ser.h 2009-02-17 04:36:14 UTC (rev 1187) @@ -2,6 +2,7 @@ #define YDB_MSG_H #include <commons/array.h> +#include <commons/st/st.h> #include <iomanip> #include <iostream> #include "ydb.pb.h" @@ -15,7 +16,7 @@ using namespace commons; using namespace std; -short unset = -1; +short unset = -7654; using ydb::pb::Op_OpType; @@ -25,30 +26,37 @@ array<char> a_; char *p_; char *mark_; - array<char> &out_; + char *unsent_; + st_netfd_t out_; template<typename T> void write_(T x, char *p) { reserve(sizeof x, p); *reinterpret_cast<T*>(p) = x; } public: - writer(array<char> &out) : a_(90), p_(a_.get()), mark_(p_), out_(out) {} + writer(st_netfd_t out) : + a_(90), p_(a_.get()), mark_(p_), unsent_(a_.get()), out_(out) {} array<char> &buf() { return a_; } size_t pos() { return p_ - mark_; } size_t size() { return a_.size(); } - void mark() { mark_ = p_; /*skip<int>();*/ } - void flush() { memcpy(out_.get(), a_.get(), mark_ - a_.get()); } + void mark() { mark_ = p_; } void reserve(int n) { reserve(n, p_); } void reserve(int n, char *p) { if (p + n > a_.end()) { flush(); - memmove(a_.get(), mark_, a_.end() - mark_); size_t diff = mark_ - a_.get(); - mark_ -= diff; + memmove(a_.get(), mark_, diff); + unsent_ = mark_ = a_.get(); p_ -= diff; p -= diff; } } + void flush() { + if (mark_ - unsent_ > 0) { + st_write(out_, unsent_, mark_ - unsent_, ST_UTIME_NO_TIMEOUT); + unsent_ = mark_; + } + } void show() { cout << (void*) p_; for (size_t i = 0; i < a_.size(); ++i) @@ -64,20 +72,7 @@ template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } }; -class reader -{ - private: - array<char> &a_; - const char *p_; - public: - reader(array<char> &a) : a_(a), p_(a.get()) {} - template<typename T> T read() { - T x = *reinterpret_cast<const T*>(p_); - p_ += sizeof(T); - return x; - } - void jump(ssize_t off) { p_ += off; } -}; +typedef st_reader reader; class stream { @@ -122,10 +117,10 @@ public: Txn(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), op_(s), - nop_(unset) {} - void Clear() { w_.reserve(0*50); nop_ = unset; off_ = w_.pos(); } + nop_(unset), seqno_(unset) {} + void Clear() { w_.reserve(0*50); nop_ = unset; seqno_ = unset; off_ = w_.pos(); } void set_seqno(int x) { w_.write(x); } - int seqno() const { return r_.read<int>(); } + int seqno() const { return seqno_ == unset ? seqno_ = r_.read<int>() : seqno_; } void start_op() { w_.skip<typeof(nop_)>(); } Op *add_op() { if (nop_ == unset) nop_ = 0; ++nop_; return &op_; } void fin_op() { w_.write(nop_, off_ + sizeof(int)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-17 04:33:57
|
Revision: 1186 http://assorted.svn.sourceforge.net/assorted/?rev=1186&view=rev Author: yangzhang Date: 2009-02-17 04:33:52 +0000 (Tue, 17 Feb 2009) Log Message: ----------- - renamed array_view to managed_array - added read<T>() to st_reader Modified Paths: -------------- cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-02-16 21:05:26 UTC (rev 1185) +++ cpp-commons/trunk/src/commons/array.h 2009-02-17 04:33:52 UTC (rev 1186) @@ -24,7 +24,7 @@ }; /** - * A release-able array. + * A scoped, release-able array. */ template<typename T> class auto_array { @@ -41,18 +41,16 @@ }; /** - * Move-able, conditionally-scoped array. - * - * TODO: rename to managed_array + * Conditionally-scoped, move-able, release-able, un-sized array. */ template<typename T> - class array_view { + class managed_array { public: - array_view(T *p, bool scoped) : p_(p), scoped_(scoped) {} + managed_array(T *p, bool scoped) : p_(p), scoped_(scoped) {} #ifdef __GXX_EXPERIMENTAL_CXX0X__ - array_view(array_view<T> &&a) : p_(a.p_), scoped_(a.scoped_) { a.release(); } + managed_array(managed_array<T> &&a) : p_(a.p_), scoped_(a.scoped_) { a.release(); } #endif - ~array_view() { if (scoped_) delete [] p_; } + ~managed_array() { if (scoped_) delete [] p_; } T *release() { T *p = p_; p_ = nullptr; scoped_ = false; return p; } T *get() { return p_; } const T *get() const { return p_; } Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-02-16 21:05:26 UTC (rev 1185) +++ cpp-commons/trunk/src/commons/st/st.h 2009-02-17 04:33:52 UTC (rev 1186) @@ -420,17 +420,17 @@ * Returns a char array that contains the requested number of bytes. If * we hit an error or EOF, then an exception is thrown. */ - array_view<char> read(size_t req = 0, st_utime_t to = ST_UTIME_NO_TIMEOUT) { + managed_array<char> read(size_t req = 0, st_utime_t to = ST_UTIME_NO_TIMEOUT) { // Do we already have the requested data? if (amt() >= req) { - array_view<char> p(start_, false); + managed_array<char> p(start_, false); start_ += req; return p; } // Handle large arrays specially. if (req > buf_.size()) { - array_view<char> p(checkpass(new char[req]), true); + managed_array<char> p(checkpass(new char[req]), true); copy(start_, end_, p.get()); checkeqnneg(st_read_fully(fd_, p + amt(), req, to), static_cast<ssize_t>(req)); start_ = end_ = buf_.get(); @@ -457,11 +457,50 @@ if (amt() < req) throw eof_exception(); - array_view<char> p(start_, false); + managed_array<char> p(start_, false); start_ += req; return p; } + template<typename T> + T read(st_utime_t to = ST_UTIME_NO_TIMEOUT) + { + size_t req = sizeof(T); + + // Do we already have the requested data? + if (amt() >= req) { + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + + assert(req <= buf_.size()); + + // Shift things down if necessary. + if (req > static_cast<size_t>(buf_.end() - end_)) { + copy(start_, end_, buf_.get()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + + // Keep reading until we have enough. + while (amt() < req) { + ssize_t res = st_read(fd_, end_, rem(), to); + checknneg(res); + if (res == 0) break; + else end_ += res; + } + + // If we got a premature EOF. + if (amt() < req) + throw eof_exception(); + + T x = *reinterpret_cast<const T*>(start_); + start_ += req; + return x; + } + private: st_reader(const st_reader &); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-16 21:05:31
|
Revision: 1185 http://assorted.svn.sourceforge.net/assorted/?rev=1185&view=rev Author: yangzhang Date: 2009-02-16 21:05:26 +0000 (Mon, 16 Feb 2009) Log Message: ----------- - moved protobufs to ydb::pb - added ser Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/src/ser.cc ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-15 03:22:34 UTC (rev 1184) +++ ydb/trunk/src/Makefile 2009-02-16 21:05:26 UTC (rev 1185) @@ -101,3 +101,6 @@ p2: p2.cc $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) + +ser: ser.cc ser.h ydb.o + $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) Added: ydb/trunk/src/ser.cc =================================================================== --- ydb/trunk/src/ser.cc (rev 0) +++ ydb/trunk/src/ser.cc 2009-02-16 21:05:26 UTC (rev 1185) @@ -0,0 +1,86 @@ +#include "ser.h" + +//#define USE_PB +using ydb::msg::reader; +using ydb::msg::writer; +using ydb::msg::stream; +using namespace commons; +using namespace std; +#ifdef USE_PB +using namespace ydb::pb; +#else +using namespace ydb::msg; +#endif + +#ifdef USE_PB +#define PBSWITCH(a,b) a +#define PBONLY(x) x +#define NPBONLY(x) +#else +#define PBSWITCH(a,b) b +#define PBONLY(x) +#define NPBONLY(x) x +#endif + +//template<typename TxnBatch, typename Txn, typename Op> +void run() +{ + array<char> a(1e8); + writer w(a); + reader r(a); + stream s(r,w); + string str; + const int nreps = 2; + + { + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + w.mark(); + batch.Clear(); + NPBONLY(batch.start_txn()); + for (int t = 0; t < 2; ++t) { + Txn &txn = *batch.add_txn(); + txn.set_seqno(t + 5); + NPBONLY(txn.start_op()); + for (int o = 0; o < 2; ++o) { + Op &op = *txn.add_op(); + op.set_type (Op::del); + op.set_key (3 * (o+1)); + op.set_value(4 * (o+1)); + } + NPBONLY(txn.fin_op()); + } + NPBONLY(batch.fin_txn()); + cout << w.pos() << '/' << w.size() << endl; + PBONLY(check(batch.SerializeToString(&str))); + } + } + w.flush(); + + const bool show = true; + { + TxnBatch batch NPBONLY((s)); + for (int i = 0; i < nreps; ++i) { + batch.Clear(); + PBONLY(check(batch.ParseFromString(str))); + if (show) cout << "ntxn " << batch.txn_size() << endl; + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (show) cout << "txn seqno " << txn.seqno() << endl; + for (int o = 0; o < txn.op_size(); ++o) { + const Op &op = txn.op(o); + int otype = op.type(); + int okey = op.key(); + int oval = op.value(); + if (show) cout << "op type " << otype << " key " << okey << " value " << oval << endl; + } + } + } + } +} + +int main() +{ + run(); + return 0; +} Added: ydb/trunk/src/ser.h =================================================================== --- ydb/trunk/src/ser.h (rev 0) +++ ydb/trunk/src/ser.h 2009-02-16 21:05:26 UTC (rev 1185) @@ -0,0 +1,158 @@ +#ifndef YDB_MSG_H +#define YDB_MSG_H + +#include <commons/array.h> +#include <iomanip> +#include <iostream> +#include "ydb.pb.h" + +#define BEGIN_NAMESPACE(ns) namespace ns { +#define END_NAMESPACE } + +BEGIN_NAMESPACE(ydb) +BEGIN_NAMESPACE(msg) + +using namespace commons; +using namespace std; + +short unset = -1; + +using ydb::pb::Op_OpType; + +class writer +{ + private: + array<char> a_; + char *p_; + char *mark_; + array<char> &out_; + template<typename T> + void write_(T x, char *p) { + reserve(sizeof x, p); + *reinterpret_cast<T*>(p) = x; + } + public: + writer(array<char> &out) : a_(90), p_(a_.get()), mark_(p_), out_(out) {} + array<char> &buf() { return a_; } + size_t pos() { return p_ - mark_; } + size_t size() { return a_.size(); } + void mark() { mark_ = p_; /*skip<int>();*/ } + void flush() { memcpy(out_.get(), a_.get(), mark_ - a_.get()); } + void reserve(int n) { reserve(n, p_); } + void reserve(int n, char *p) { + if (p + n > a_.end()) { + flush(); + memmove(a_.get(), mark_, a_.end() - mark_); + size_t diff = mark_ - a_.get(); + mark_ -= diff; + p_ -= diff; + p -= diff; + } + } + void show() { + cout << (void*) p_; + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << hex << setfill('0') << setw(2) << int(mark_[i]); + cout << endl; + cout << (void*) p_; + for (size_t i = 0; i < a_.size(); ++i) + cout << " " << setfill(' ') << setw(2) << (i == pos() ? "^^" : ""); + cout << endl; + } + template<typename T> void skip() { reserve(sizeof(T)); p_ += sizeof(T); } + template<typename T> void write(T x) { write_(x, p_); p_ += sizeof x; } + template<typename T> void write(T x, size_t off) { write_(x, mark_ + off); } +}; + +class reader +{ + private: + array<char> &a_; + const char *p_; + public: + reader(array<char> &a) : a_(a), p_(a.get()) {} + template<typename T> T read() { + T x = *reinterpret_cast<const T*>(p_); + p_ += sizeof(T); + return x; + } + void jump(ssize_t off) { p_ += off; } +}; + +class stream +{ + private: + reader &r_; + writer &w_; + public: + stream(reader &r, writer &w) : r_(r), w_(w) {} + reader &get_reader() { return r_; } + writer &get_writer() { return w_; } +}; + +class Op +{ + private: + stream &s_; + reader &r_; + writer &w_; + public: + static const Op_OpType read = ydb::pb::Op_OpType_read; + static const Op_OpType write = ydb::pb::Op_OpType_write; + static const Op_OpType del = ydb::pb::Op_OpType_del; + Op(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()) {} + void set_type (char x) { w_.write(x); } + void set_key (int x) { w_.write(x); } + void set_value(int x) { w_.write(x); } + char type() const { return r_.read<char>(); } + int key() const { return r_.read<int>(); } + int value() const { return r_.read<int>(); } +}; + +class Txn +{ + private: + stream &s_; + reader &r_; + writer &w_; + size_t off_; + Op op_; + mutable short nop_; + mutable int seqno_; + public: + Txn(stream &s) : + s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), op_(s), + nop_(unset) {} + void Clear() { w_.reserve(0*50); nop_ = unset; off_ = w_.pos(); } + void set_seqno(int x) { w_.write(x); } + int seqno() const { return r_.read<int>(); } + void start_op() { w_.skip<typeof(nop_)>(); } + Op *add_op() { if (nop_ == unset) nop_ = 0; ++nop_; return &op_; } + void fin_op() { w_.write(nop_, off_ + sizeof(int)); } + int op_size() const { if (nop_ == unset) nop_ = r_.read<typeof(nop_)>(); return nop_; } + const Op &op(int o) const { return op_; } +}; + +class TxnBatch +{ + private: + stream &s_; + reader &r_; + writer &w_; + size_t off_; + mutable Txn txn_; + mutable short ntxn_; + public: + TxnBatch(stream &s) : s_(s), r_(s.get_reader()), w_(s.get_writer()), off_(w_.pos()), txn_(s), ntxn_(unset) {} + void Clear() { w_.reserve(0*100); txn_.Clear(); ntxn_ = unset; off_ = w_.pos(); } + void start_txn() { w_.skip<typeof(ntxn_)>(); } + Txn *add_txn() { if (ntxn_ == unset) ntxn_ = 0; ++ntxn_; txn_.Clear(); return &txn_; } + void fin_txn() { w_.write(ntxn_, off_); } + int txn_size() const { if (ntxn_ == unset) ntxn_ = r_.read<typeof(ntxn_)>(); return ntxn_; } + const Txn &txn(int t) const { txn_.Clear(); return txn_; } +}; + +END_NAMESPACE +END_NAMESPACE + +#endif Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-02-15 03:22:34 UTC (rev 1184) +++ ydb/trunk/src/ydb.proto 2009-02-16 21:05:26 UTC (rev 1185) @@ -1,3 +1,5 @@ +package ydb.pb; + option optimize_for = SPEED; // A socket address (host:port). This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-15 03:22:41
|
Revision: 1184 http://assorted.svn.sourceforge.net/assorted/?rev=1184&view=rev Author: yangzhang Date: 2009-02-15 03:22:34 +0000 (Sun, 15 Feb 2009) Log Message: ----------- more progress/notes Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-15 03:22:07 UTC (rev 1183) +++ ydb/trunk/README 2009-02-15 03:22:34 UTC (rev 1184) @@ -30,6 +30,7 @@ - [C++ Commons] svn r1082 - [clamp] 153 - [GCC] 4.3.2 +- [google-sparsehash] 1.4 - [googletest] 1.2.1 - [Lazy C++] 2.8.0 - [Protocol Buffers] 2.0.0 @@ -39,6 +40,7 @@ [C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ [clamp]: http://home.clara.net/raoulgough/clamp/ [GCC]: http://gcc.gnu.org/ +[google-sparsehash]: http://code.google.com/p/google-sparsehash/ [googletest]: http://code.google.com/p/googletest/ [Lazy C++]: http://www.lazycplusplus.com/ [Protocol Buffers]: http://code.google.com/p/protobuf/ @@ -280,7 +282,7 @@ - what to do? limit parallelism? how? - include actual separate clients? -Period: 2/5- +Period: 2/5-2/12 - DONE commit!!! - DONE google profiling @@ -331,21 +333,55 @@ - DONE make readmsg perform fewer syscalls (buffer opportunistically) - like magic: now can sustain 90 Ktps all the way up through 3 xacts! -Period +Period 2/12-2/19 - DONE p2 prototype - some interesting performance bugs - forgot to make some sockets non-blocking, eg accepted client socket, eg the client's socket to server; everything still works with select - i was indeed forgetting to set this as well in ydb + - this made bcast-async irrelevant - was always inadvertently calling read() whenever i requested some # bytes - made a big diff in leveling the field between smallerish to largerish msg sizes - - this was hurting me only slightly in ydb, it seems + - this was hurting me only slightly in ydb - was not aggressively consuming as many msgs as i could, only 1 at a time (per return from select) + - not having this problem in ydb - DONE batch responses - made a marked difference; ~100Ktps -> ~140Ktps (for 1-4 reps) +- found a possible perf bug: string.c_str() instead of .data() +- DONE make regular bcastmsg use a single st_write call instead of two (by + serializing the len-prefix in) + - this maybe improved things only a teeny bit +- DONE try introducing protobuf serialization into both wal and solo to see how + much of the perf degradation is due to ser + - lowered solo from 220 Ktps to 190 Ktps + - lowered wal from 180 Ktps to 170 Ktps +- DONE try making process_txns also use st_reader + - didn't help much; in fact, seemed to hurt?! +- DONE try lifting txnbatch in process_txns + - made a huge diff: 140 Ktps to 220 Ktps + - network is now faster than local!!! +- DONE try lifting resbatch in handle_responses + - added a little bit more: 220 Ktps to 225 Ktps +- DONE try reusing the serialized msgs + - easier than expected; just call .Clear()! + - lost the amazing new breakthrough above + - -1: 190 -> 240 + - 0: 220 -> 320 Ktps + - 1: 220 -> <240 + - 3: 220 -> <240 +- DONE try adding fake-execution + - made a huge difference + - -1: 680K + - 0: 2M + - 1: 730K + - 2: 600K + - 3: 450K +- TODO commit +- TODO remove extraneous copies; use custom buffer-backed data structures + designed for serialization/deserialization - TODO flushing - TODO make the logger a "single replica" - TODO oprofile This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-15 03:22:14
|
Revision: 1183 http://assorted.svn.sourceforge.net/assorted/?rev=1183&view=rev Author: yangzhang Date: 2009-02-15 03:22:07 +0000 (Sun, 15 Feb 2009) Log Message: ----------- added node-setup-ghash Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-15 03:21:58 UTC (rev 1182) +++ ydb/trunk/tools/test.bash 2009-02-15 03:22:07 UTC (rev 1183) @@ -173,10 +173,14 @@ node-setup-gtest() { check-remote - cd /tmp/ toast --quiet arm googletest } +node-setup-ghash() { + check-remote + toast --quiet arm 'http://google-sparsehash.googlecode.com/files/sparsehash-1.4.tar.gz' +} + node-setup-ydb-1() { check-remote if [[ ! -L ~/ydb ]] This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-15 03:22:05
|
Revision: 1182 http://assorted.svn.sourceforge.net/assorted/?rev=1182&view=rev Author: yangzhang Date: 2009-02-15 03:21:58 +0000 (Sun, 15 Feb 2009) Log Message: ----------- distinguish between "issued"/"handled" for scaling analysis Modified Paths: -------------- ydb/trunk/tools/analysis.py Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-02-15 03:20:46 UTC (rev 1181) +++ ydb/trunk/tools/analysis.py 2009-02-15 03:21:58 UTC (rev 1182) @@ -110,6 +110,11 @@ print 'file:', getname(scalingpath) res = logextract(scalingpath, 'n', [ r'=== n=(?P<n>-?\d+) ', + r'handled .*\((?P<tps>[.\d]+) tps\)' ]) + + print 'file:', getname(scalingpath) + res0 = logextract(scalingpath, 'n', [ + r'=== n=(?P<n>-?\d+) ', r'issued .*\((?P<tps>[.\d]+) tps\)' ]) print 'file:', getname(ariespath) @@ -117,9 +122,14 @@ r'=== n=(?P<n>-?\d+) ', r'issued .*\((?P<tps>[.\d]+) tps\)' ]) - errorbar(hstack([res2['n'], res['n']]), - hstack([res2['tps mean'], res['tps mean']]), - hstack([res2['tps sd'], res['tps sd']])) + print hstack([res2['n'], res0['n'][:1], res['n']]) + print hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]) + print hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']]) + + errorbar( + hstack([res2['n'], res0['n'][:1], res['n']]), + hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]), + hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']])) title('Scaling of baseline throughput with number of nodes') xlabel('Node count') ylabel('Mean TPS (stdev error bars)') This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-15 03:20:54
|
Revision: 1181 http://assorted.svn.sourceforge.net/assorted/?rev=1181&view=rev Author: yangzhang Date: 2009-02-15 03:20:46 +0000 (Sun, 15 Feb 2009) Log Message: ----------- - rename array_view to managed_array - tried adding google dense_hash_map - added REUSE_SER macro to control whether serialized stuff is reused - refactored serialization/network code: added ser(), st_timed_write() - added --force-ser - added --fake-exec - changed wal to use same txn serialization as net rather than its own in-line serialization - added st_reader for txnbatches Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/p2.cc Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-13 20:57:48 UTC (rev 1180) +++ ydb/trunk/src/main.lzz.clamp 2009-02-15 03:20:46 UTC (rev 1181) @@ -7,7 +7,6 @@ #include <boost/range/iterator_range.hpp> #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> -//#include <boost/thread.hpp> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> @@ -17,6 +16,7 @@ #include <cstring> // strsignal #include <iostream> #include <fstream> // ofstream +#include <google/dense_hash_map> #include <gtest/gtest.h> #include <malloc.h> #include <map> @@ -31,9 +31,11 @@ #define foreach BOOST_FOREACH #define shared_ptr boost::shared_ptr #define ref boost::ref +#define REUSE_SER using namespace boost; using namespace boost::archive; using namespace commons; +using namespace google; using namespace std; using namespace testing; using namespace tr1; @@ -46,8 +48,11 @@ #end #define map_t unordered_map +//#define map_t map +//#define map_t dense_hash_map typedef pair<int, int> pii; typedef map_t<int, int> mii; +typedef string ser_t; // Configuration. st_utime_t timeout; @@ -57,7 +62,7 @@ bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, - suppress_txn_msgs, use_bcast_async, fake_bcast; + suppress_txn_msgs, use_bcast_async, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. @@ -247,12 +252,59 @@ st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; /** + * Serialization. + * + * TODO: experiment with which method is the fastest: using a string as shown + * here or computing the bytesize then allocating (or grabbing/reserving) the + * array. + */ +template<typename T> +void +ser(string &s, const T &msg) +{ + // Serialize message to a buffer. + uint32_t len; + s.append(sizeof len, '\0'); + check(msg.AppendToString(&s)); + + // Warn if the message is large. + if (s.size() > 1000000) + cout << "serializing large message of " << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + copy(plen, plen + sizeof len, s.begin()); +} + +/** + * Helper for getting the cached ByteSize of a message. + */ +template <typename T> +int +pb_size(const T &msg) { + int len = msg.GetCachedSize(); + return len == 0 ? msg.ByteSize() : len; +} + +/** + * Serialization. + */ +template<typename T> +void +ser(ostream &s, const T &msg) +{ + uint32_t len = htonl(pb_size(msg)); + s.write(reinterpret_cast<const char*>(&len), sizeof len); + check(msg.SerializeToOstream(&s)); +} + +/** * The worker that performs the actual broadcasting. */ void bcaster() { - int counter = 0; while (!kill_hub) { pair<st_netfd_t, shared_ptr<string> > pr; { @@ -264,32 +316,8 @@ if (p.get() == nullptr) break; string &s = *p.get(); - int dstno = 0; - // XXX - // foreach (st_netfd_t dst, *gdsts) { - if (!fake_bcast) { - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - - checksize(st_write(dst, s.data(), s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write #" << counter - << " of size " << s.size() - //<< " bytes to dst #" << dstno - << " bytes" - << " took " << write_time << " ms" << endl; - } - } - ++dstno; - } - ++counter; + if (!fake_bcast) + st_timed_write(dst, s.data(), s.size()); } } @@ -298,24 +326,34 @@ */ template<typename T> void -bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) +bcastmsg_async(const vector<st_netfd_t> &dsts, const T &msg) { - // Serialize message to a buffer. - uint32_t len; - shared_ptr<string> p(new string(sizeof len, '\0')); - string &s = *p.get(); - check(msg.AppendToString(&s)); + shared_ptr<string> p(new string); + ser(*p.get(), msg); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); +} - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; +/** + * Perform an st_write but warn if it took over write_thresh ms. + */ +void +st_timed_write(st_netfd_t dst, const void *buf, size_t len) +{ + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } - // Prefix the message with a four-byte length. - len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); - char *plen = reinterpret_cast<char*>(&len); - copy(plen, plen + sizeof len, s.begin()); + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); - foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() << " write of " << len + << " bytes took " << write_time << " ms" << endl; + } + } } /** @@ -325,48 +363,12 @@ void bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) { - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); - - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - int dstno = 0; - foreach (st_netfd_t dst, dsts) { - size_t resid = sizeof len; -#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) - int res = st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); + ser_t s; + ser(s, msg); + if (!fake_bcast) { + foreach (st_netfd_t dst, dsts) { + st_timed_write(dst, s.data(), s.size()); } - if (res == -1 && errno == ETIME) { - checksize(st_write(dst, - reinterpret_cast<char*>(&len) + sizeof len - resid, - resid, - ST_UTIME_NO_TIMEOUT), - resid); - } else { - check0x(res); - } - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write to dst #" << dstno - << " took " << write_time << " ms" << endl; - } - } - checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - ++dstno; } } @@ -463,7 +465,7 @@ void readmsg(st_reader &src, T & msg) { - array_view<char> a = src.read(sizeof(uint32_t)); + managed_array<char> a = src.read(sizeof(uint32_t)); uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); check(msg.ParseFromArray(src.read(len), len)); } @@ -475,6 +477,9 @@ { public: wal() : of("wal"), out(of) {} + template <typename T> + void log(const T &msg) { ser(of, msg); } +#if 0 void del(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; @@ -487,6 +492,7 @@ int op = op_commit; out & op; } +#endif private: enum { op_del, op_write, op_commit }; ofstream of; @@ -527,15 +533,24 @@ 0); }); + TxnBatch batch; + for (int t = 0; t < batch_size; ++t) batch.add_txn(); + while (!stop_hub) { +#ifdef REUSE_SER + batch.Clear(); +#else + TxnBatch batch; +#endif + // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcast(fds, TxnBatch()); + bcast(fds, batch); } else { - sendmsg(fds[0], TxnBatch()); + sendmsg(fds[0], batch); } } // Bring in any new members. @@ -544,7 +559,6 @@ } // Generate some random transactions. - TxnBatch batch; for (int t = 0; t < batch_size; ++t) { Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); @@ -597,8 +611,14 @@ } // Broadcast. - if (!fds.empty() && !suppress_txn_msgs) + if (!fds.empty() && !suppress_txn_msgs) { bcast(fds, batch); + } else if (use_wal) { + g_wal->log(batch); + } else if (force_ser) { + string s; + ser(s, batch); + } // Pause? if (do_pause) @@ -606,7 +626,7 @@ } // This means "The End." - TxnBatch batch; + batch.Clear(); Txn &txn = *batch.add_txn(); txn.set_seqno(-1); bcast(fds, batch); @@ -622,44 +642,46 @@ void process_txn(mii &map, const Txn &txn, int &seqno, Response *res) { - wal &wal = *g_wal; + //wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); seqno = txn.seqno(); if (res != nullptr) { res->set_seqno(seqno); res->set_caught_up(true); } - for (int o = 0; o < txn.op_size(); ++o) { - const Op &op = txn.op(o); - const int key = op.key(); - mii::iterator it = map.find(key); - if (show_updates || count_updates) { - if (it != map.end()) { - if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) ++updates; - } - } - switch (op.type()) { - case Op::read: - if (res != nullptr) { - if (it == map.end()) res->add_result(0); - else res->add_result(it->second); - } - break; - case Op::write: - if (use_wal) wal.write(key, op.value()); - if (it == map.end()) map[key] = op.value(); - else it->second = op.value(); - break; - case Op::del: + if (!fake_exec) { + for (int o = 0; o < txn.op_size(); ++o) { + const Op &op = txn.op(o); + const int key = op.key(); + mii::iterator it = map.find(key); + if (show_updates || count_updates) { if (it != map.end()) { - if (use_wal) wal.del(key); - map.erase(it); + if (show_updates) cout << "existing key: " << key << endl; + if (count_updates) ++updates; } - break; + } + switch (op.type()) { + case Op::read: + if (res != nullptr) { + if (it == map.end()) res->add_result(0); + else res->add_result(it->second); + } + break; + case Op::write: + //if (use_wal) wal.write(key, op.value()); + if (it == map.end()) map[key] = op.value(); + else it->second = op.value(); + break; + case Op::del: + if (it != map.end()) { + //if (use_wal) wal.del(key); + map.erase(it); + } + break; + } } } - if (use_wal) wal.commit(); + //if (use_wal) wal.commit(); } void @@ -761,16 +783,19 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); + st_reader reader(leader); + try { + TxnBatch batch; + ResponseBatch resbatch; while (true) { - TxnBatch batch; long long before_read = -1; if (read_thresh > 0) { before_read = current_time_millis(); } { st_intr intr(stop_hub); - readmsg(leader, batch); + readmsg(reader, batch); } if (read_thresh > 0) { long long read_time = current_time_millis() - before_read; @@ -780,7 +805,11 @@ } } if (batch.txn_size() > 0) { +#ifdef REUSE_SER + resbatch.Clear(); +#else ResponseBatch resbatch; +#endif for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -876,11 +905,11 @@ finally f(boost::bind(&response_handler::cleanup, this)); st_reader reader(replica); + ResponseBatch batch; while (true) { finally f(boost::bind(&response_handler::loop_cleanup, this)); - ResponseBatch batch; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). if (last_seqno + 1 == seqno) { @@ -1095,7 +1124,6 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); - // XXX finally fin(lambda () { cout << "LEADER SUMMARY" << endl; cout << "- total updates = " << updates << endl; @@ -1408,6 +1436,8 @@ "inspection/diffing") ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), "suppress txn msgs") + ("fake-exec", po::bool_switch(&fake_exec), + "don't actually execute txns") ("fake-bcast", po::bool_switch(&fake_bcast), "when using --bcast-async, don't actually perform the socket write") ("show-updates,U", po::bool_switch(&show_updates), @@ -1420,6 +1450,8 @@ "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader only)") ("wal", po::bool_switch(&use_wal), "enable ARIES write-ahead logging") + ("force-ser", po::bool_switch(&force_ser), + "force issue_txns to serialize its Txns") ("leader,l", po::bool_switch(&is_leader), "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), Modified: ydb/trunk/src/p2.cc =================================================================== --- ydb/trunk/src/p2.cc 2009-02-13 20:57:48 UTC (rev 1180) +++ ydb/trunk/src/p2.cc 2009-02-15 03:20:46 UTC (rev 1181) @@ -29,7 +29,7 @@ long long start = 0, seltime = 0, readtime = 0, writetime = 0; int selcnt = 0, readcnt = 0, writecnt = 0; -typedef array_view<char> arr; +typedef managed_array<char> arr; arr mkarr(char *p = nullptr) { return arr(p, false); } typedef unordered_map<int, int> map_t; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-13 20:57:52
|
Revision: 1180 http://assorted.svn.sourceforge.net/assorted/?rev=1180&view=rev Author: yangzhang Date: 2009-02-13 20:57:48 +0000 (Fri, 13 Feb 2009) Log Message: ----------- added notes Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-02-13 20:57:32 UTC (rev 1179) +++ ydb/trunk/README 2009-02-13 20:57:48 UTC (rev 1180) @@ -282,12 +282,79 @@ Period: 2/5- -- TODO commit!!! +- DONE commit!!! +- DONE google profiling + - doesn't work well with this app since no samples are generated while in a + blocking syscall + - top: randint, readmsg, ... lots of ties, hard to tell +- DONE thread profiling + - leader: + - 60% in compute: 40% handle_responses, 60% issue_txns + - 40% unaccounted + - replica: + - 90-100% in process_txns thread + - perftools showed a lot of samples in map operations and in process_txn, + but not sure how to trim down process_txn any more +- DONE replace the map with unordered_map, start using -O3 + - this gave me some performance boost, from around 45 Ktps to 65 Ktps + - now performing better than unoptimized, simple disk + - issues ~72Ktps + - optimized, simple disk does much better as well, unfortunately (nearly + 200Ktps) + - this is close to the 70 Ktps from Abadi's H-Store paper +- DONE asynchronously bcast txns + - issuing, handle_responses %, issuing net throughput, processing + - 0: 260 Ktps, 0%, N/A, 260 Ktps + - 1: <250 Ktps, 70%, 21 MB/s, 65 Ktps + - 2: 75 Ktps, 38% 38%, 6.4 MB/s, 65 Ktps + - 3: 53 Ktps, 27% 27% 27%, 4.5 MB/s, 50 Ktps + - from 0 to 1: it seems that Response serialization is taking up a lot of + time? + - tried removing Response construction, but that barely changed anything + - at under 3 nodes, we are bottlenecked by the CPU throughput of process_txn + (65 Ktps) +- DONE asynchronously bcast responses + - async: handling + - 0: 260 Ktps + - 1: 90 Ktps + - 2: 66 Ktps + - 3: 46 Ktps + - sync + - 0: 260 Ktps + - 1: 63 Ktps: this shows that async does make a big diff + - 2: 60 Ktps: this + next are more similar to async bc of readmsg + bottleneck + - 3: 43 Ktps + - at 3, we are bottlenecked by the leader in handle_responses (apparently) to + 50 Ktps + - we are definitely not bottlenecked by IO throughput +- DONE make readmsg perform fewer syscalls (buffer opportunistically) + - like magic: now can sustain 90 Ktps all the way up through 3 xacts! + +Period + +- DONE p2 prototype + - some interesting performance bugs + - forgot to make some sockets non-blocking, eg accepted client socket, eg + the client's socket to server; everything still works with select + - i was indeed forgetting to set this as well in ydb + - was always inadvertently calling read() whenever i requested some # bytes + - made a big diff in leveling the field between smallerish to largerish + msg sizes + - this was hurting me only slightly in ydb, it seems + - was not aggressively consuming as many msgs as i could, only 1 at a time + (per return from select) +- DONE batch responses + - made a marked difference; ~100Ktps -> ~140Ktps (for 1-4 reps) +- TODO flushing +- TODO make the logger a "single replica" +- TODO oprofile + - not giving much info either for things that are stalled on IO - TODO serialization bench (multiple layers, control batch sizes) - TODO network throughput bench - TODO associative container bench - TODO combine the analyses of the above three; integrate with actual message - formats, etc. + formats, etc.; updated README - TODO batching, serialization, disk speed - TODO better wal - TODO better understand multihost recovery @@ -295,7 +362,7 @@ - TODO data structures benchmark - TODO implement checkpointing disk-based scheme - TODO implement log-based recovery; show that it sucks -- TODO implement group (batch) commit for log-based recovery +- TODO implement group (batch) commit (sync) for log-based recovery - TODO try scaling up - TODO serialize outputs from the various clients to a single merger to (1) have ordering over the (timestamped) messages, and (2) avoid interleaved @@ -359,3 +426,62 @@ - TODO differences from: harbor, harp, aries - TODO understand 2pc, paxos, etc. + +Notes +----- + +### IO limits + +Theoretically, with a GigE network connection, you can max out at roughly 800 +Mb/s or 100 MB/s. Assuming each transaction can be encoded in about 50 bytes, +you can push 100e6 B/s / 50 B/txn = 2e6 txn/s. + +Imagine that this is the only network traffic we need to worry about, so we +don't have any other replicas to dispatch transactions to or any responses to +receive (though GigE is full-duplex). + +As of 2008, a typical 7200rpm desktop hard drive has a sustained +"disk-to-buffer" data transfer rate of about 70 MB/s[^1]. This is slightly +less than the max rate of a GigE network, so we do expect to do better with the +network---but not substantially. + +[^1]: <http://en.wikipedia.org/wiki/Hard_disk_drive> + +### Compute limits + +To be able to process 2e6 txn/s on a 1GHz CPU, we must spend at most 1e9 +cycle/s / 2e6 txn/s = 500 cycle/txn. Assuming: + +- 100 ns for main memory +- 10 ns for L2 cache +- 1 ns for L1 cache + +At 1 ns/cycle, a single cache miss will take up to a fifth of the allotted +processing time per txn. (TODO: what's the baseline for syscalls?) + +A `std::map` can in a tight loop process 1M sequential insertions in ~500 ms, +or ~2M/s (note that these are *not* random keys, but keys incrementing from 0). +If a txn inserts 5 records, this equates to 400Ktps. An `stx::btree_map` takes +~250 ms (4M/s or 800Ktps), and a `tr1::unordered_map` (hash table) takes ~200 +ms (5M/s or 1Mtps). For reference, we can populate a raw array sequentially in +5ms (200M/s or 40Mtps). (These microbenchmarks come from container-bench.) + +These results suggest that we can expect to be bounded by the CPU/memory, and +not IO throughput. With hash tables, we can come close to the target of 2e6 +txn/s, but can't expect to exceed it. + +In the H-Store paper "The End of an Architectural Era", the prototype achieves +70,416 TPC-C txn/s, 82 times faster than the 850 txn/s achieved on the +commercial system. + +### Workloads + +Ebay engages in 26 Btxn/day, or 300Ktps. + +Discussion +---------- + +Disk vs. network: practical considerations + +- Network is simpler, can be stateless +- Disk may quickly grow stale during downtime This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-13 20:57:39
|
Revision: 1179 http://assorted.svn.sourceforge.net/assorted/?rev=1179&view=rev Author: yangzhang Date: 2009-02-13 20:57:32 +0000 (Fri, 13 Feb 2009) Log Message: ----------- added p2 harness Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-02-13 20:57:05 UTC (rev 1178) +++ ydb/trunk/tools/test.bash 2009-02-13 20:57:32 UTC (rev 1179) @@ -195,6 +195,7 @@ cd ~/ydb/src make clean PPROF=1 OPT=1 make WTF= + PPROF=1 OPT=1 make WTF= p2 } init-setup() { @@ -511,6 +512,28 @@ } # +# Prototype 2 +# + +p2-helper() { + local leader="$1" + shift + tagssh "$leader" "ydb/src/p2 -l | tail" & + sleep .1 + { + while (( $# > 0 )) ; do + tagssh "$1" "ydb/src/p2 -H $leader | tail" & + shift + done + time wait + } 2>&1 | fgrep real +} + +p2() { + hostargs p2-helper +} + +# # Main # This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-02-13 20:57:10
|
Revision: 1178 http://assorted.svn.sourceforge.net/assorted/?rev=1178&view=rev Author: yangzhang Date: 2009-02-13 20:57:05 +0000 (Fri, 13 Feb 2009) Log Message: ----------- - added p2 - added ResponseBatch - added gch rules (but not yet incorporated into main build) - moved to g++0x - lifted break_exception - removed bcastmsg_fake - changed bcastmsg_async to queue up (dst,msg) pairs (rather than rely on a cached version of the dsts vector) - changed the core readmsg calls to use st_reader - replaced bcast_async macros with fn ptrs - reworked process_txn interface - added leader summary as well - tolerate EINTR on joiner accept - added --fake-bcast, --bcast-async Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/src/p2.cc Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/Makefile 2009-02-13 20:57:05 UTC (rev 1178) @@ -44,7 +44,7 @@ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Wno-inline -Wsynth $(CXXFLAGS) + -Wno-inline -Wsynth -std=gnu++0x $(CXXFLAGS) PBCXXFLAGS := $(OPT) -Wall -Werror $(GPROF) all: $(TARGET) @@ -73,6 +73,12 @@ %.lzz: %.lzz.clamp clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ +all.h: + fgrep '#include' main.lzz.clamp > all.h + +all.h.gch: all.h + $(COMPILE.cc) $(PBHDRS) $(OUTPUT_OPTION) $< + clean: rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h serperf @@ -89,6 +95,9 @@ ### serperf: serperf.o ydb.o - $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ + $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) # serperf.cc ydb.pb.h + +p2: p2.cc + $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/main.lzz.clamp 2009-02-13 20:57:05 UTC (rev 1178) @@ -29,6 +29,8 @@ #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH +#define shared_ptr boost::shared_ptr +#define ref boost::ref using namespace boost; using namespace boost::archive; using namespace commons; @@ -55,7 +57,7 @@ bool verbose, yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_wal, - suppress_txn_msgs; + suppress_txn_msgs, use_bcast_async, fake_bcast; long long timelim, read_thresh, write_thresh; // Control. @@ -68,13 +70,14 @@ /** * Convenience function for calculating percentages. */ -double -pct(double sub, double tot) -{ - return 100 * sub / tot; -} +double pct(double sub, double tot) { return 100 * sub / tot; } /** + * Convenience class for performing long-jumping break. + */ +class break_exception : public std::exception {}; + +/** * The list of all threads. Keep track of these so that we may cleanly shut * down all threads. */ @@ -241,81 +244,30 @@ const vector<st_netfd_t> &rs_; }; -/** - * XXX - */ -template<typename T> -void -bcastmsg_fake(const vector<st_netfd_t> &dsts, const T & msg) -{ - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); +st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; - if (s.size() > 1000000) - cout << "sending large message to " << dsts.size() << " dsts, size = " - << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - int dstno = 0; - foreach (st_netfd_t dst, dsts) { - size_t resid = sizeof len; -#define checksize(x,y) checkeqnneg(x, static_cast<ssize_t>(y)) - int res = true ? 0 : st_write_resid(dst, static_cast<void*>(&len), &resid, timeout); - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - if (res == -1 && errno == ETIME) { - checksize(st_write(dst, - reinterpret_cast<char*>(&len) + sizeof len - resid, - resid, - ST_UTIME_NO_TIMEOUT), - resid); - } else { - check0x(res); - } - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() - << ": write to dst #" << dstno - << " took " << write_time << " ms" << endl; - } - } - if (false) - checksize(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - s.size()); - ++dstno; - } -} - -st_channel<shared_ptr<string> > msgs; - -const vector<st_netfd_t> *gdsts; - /** - * XXX + * The worker that performs the actual broadcasting. */ void bcaster() { int counter = 0; while (!kill_hub) { - shared_ptr<string> p; + pair<st_netfd_t, shared_ptr<string> > pr; { st_intr intr(kill_hub); - p = msgs.take(); + pr = msgs.take(); } + st_netfd_t dst = pr.first; + shared_ptr<string> &p = pr.second; if (p.get() == nullptr) break; string &s = *p.get(); int dstno = 0; - foreach (st_netfd_t dst, *gdsts) { + // XXX + // foreach (st_netfd_t dst, *gdsts) { + if (!fake_bcast) { long long before_write = -1; if (write_thresh > 0) { before_write = current_time_millis(); @@ -330,7 +282,8 @@ cout << "thread " << threadname() << ": write #" << counter << " of size " << s.size() - << " bytes to dst #" << dstno + //<< " bytes to dst #" << dstno + << " bytes" << " took " << write_time << " ms" << endl; } } @@ -341,14 +294,12 @@ } /** - * XXX + * Asynchronous version of the broadcaster. */ template<typename T> void bcastmsg_async(const vector<st_netfd_t> &dsts, const T & msg) { - gdsts = &dsts; - // Serialize message to a buffer. uint32_t len; shared_ptr<string> p(new string(sizeof len, '\0')); @@ -362,13 +313,11 @@ // Prefix the message with a four-byte length. len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); char *plen = reinterpret_cast<char*>(&len); - for (size_t i = 0; i < sizeof len; ++i) - s[i] = plen[i]; + copy(plen, plen + sizeof len, s.begin()); - msgs.push(p); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); } - /** * Send a message to some destinations (sequentially). */ @@ -432,6 +381,14 @@ bcastmsg(dsts, msg); } +template<typename T> +void +sendmsg_async(st_netfd_t dst, const T &msg) +{ + vector<st_netfd_t> dsts(1, dst); + bcastmsg_async(dsts, msg); +} + /** * Read a message. This is done in two steps: first by reading the length * prefix, then by reading the actual body. This function also provides a way @@ -476,7 +433,7 @@ char buf[len]; GETMSG(buf); } else { - cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + //cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; scoped_array<char> buf(new char[len]); GETMSG(buf.get()); } @@ -499,6 +456,19 @@ } /** + * Same as the above readmsg() but uses an st_reader instead of a raw + * st_netfd_t. + */ +template <typename T> +void +readmsg(st_reader &src, T & msg) +{ + array_view<char> a = src.read(sizeof(uint32_t)); + uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); + check(msg.ParseFromArray(src.read(len), len)); +} + +/** * ARIES write-ahead log. No undo logging necessary (no steal). */ class wal @@ -527,6 +497,15 @@ mii g_map; wal *g_wal; +// Function pointer types. +typedef void (*bcasttxn_t)(const vector<st_netfd_t> &dsts, const TxnBatch &msg); +bcasttxn_t bcasttxn_async = bcastmsg_async<TxnBatch>; +bcasttxn_t bcasttxn_sync = bcastmsg<TxnBatch>; + +typedef void (*sendres_t)(st_netfd_t dst, const ResponseBatch &msg); +sendres_t sendres_async = sendmsg_async<ResponseBatch>; +sendres_t sendres_sync = sendmsg<ResponseBatch>; + /** * Keep issuing transactions to the replicas. */ @@ -534,16 +513,16 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { -#define bcastmsg bcastmsg_async + bcasttxn_t bcast = use_bcast_async ? bcasttxn_async : bcasttxn_sync; + st_thread_t bcaster_thread = bcast == bcasttxn_async ? + my_spawn(bcaster, "bcaster") : nullptr; + Op_OpType types[] = {Op::read, Op::write, Op::del}; vector<st_netfd_t> fds; long long start_time = current_time_millis(); -#if bcastmsg == bcastmsg_async - st_joining join_bcaster(my_spawn(bcaster, "bcaster")); -#endif - finally f(lambda () { + if (__ref(bcaster_thread) != nullptr) st_join(__ref(bcaster_thread)); showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); }); @@ -554,7 +533,7 @@ // empty/default Txn). if (!newreps.empty() && seqno > 0) { if (multirecover) { - bcastmsg(fds, TxnBatch()); + bcast(fds, TxnBatch()); } else { sendmsg(fds[0], TxnBatch()); } @@ -583,7 +562,7 @@ // Process immediately if not bcasting. if (fds.empty()) { --seqno; - process_txn(nullptr, g_map, txn, seqno, true); + process_txn(g_map, txn, seqno, nullptr); } ++seqno; @@ -619,7 +598,7 @@ // Broadcast. if (!fds.empty() && !suppress_txn_msgs) - bcastmsg(fds, batch); + bcast(fds, batch); // Pause? if (do_pause) @@ -630,11 +609,10 @@ TxnBatch batch; Txn &txn = *batch.add_txn(); txn.set_seqno(-1); - bcastmsg(fds, batch); -#if bcastmsg == bcastmsg_any - msgs.push(shared_ptr<string>()); -#endif -#undef bcastmsg + bcast(fds, batch); + if (bcaster_thread != nullptr) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + } } /** @@ -642,15 +620,15 @@ * leader. */ void -process_txn(st_netfd_t leader, mii &map, const Txn &txn, int &seqno, - bool caught_up) +process_txn(mii &map, const Txn &txn, int &seqno, Response *res) { wal &wal = *g_wal; checkeq(txn.seqno(), seqno + 1); - Response res; - res.set_seqno(txn.seqno()); - res.set_caught_up(caught_up); seqno = txn.seqno(); + if (res != nullptr) { + res->set_seqno(seqno); + res->set_caught_up(true); + } for (int o = 0; o < txn.op_size(); ++o) { const Op &op = txn.op(o); const int key = op.key(); @@ -663,8 +641,10 @@ } switch (op.type()) { case Op::read: - if (it == map.end()) res.add_result(0); - else res.add_result(it->second); + if (res != nullptr) { + if (it == map.end()) res->add_result(0); + else res->add_result(it->second); + } break; case Op::write: if (use_wal) wal.write(key, op.value()); @@ -680,7 +660,6 @@ } } if (use_wal) wal.commit(); - if (caught_up && leader != nullptr) sendmsg(leader, res); } void @@ -760,7 +739,16 @@ // issued more since the Init message). int first_seqno = -1; + st_thread_t bcaster_thread = use_bcast_async ? + my_spawn(bcaster, "bcaster") : nullptr; + sendres_t sendmsg = use_bcast_async ? sendres_async : sendres_sync; + finally f(lambda () { + if (__ref(bcaster_thread) != nullptr) { + msgs.push(make_pair(nullptr, shared_ptr<string>())); + st_join(__ref(bcaster_thread)); + } + long long now = current_time_millis(); showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); @@ -773,8 +761,6 @@ __ref(send_states).push(shared_ptr<Recovery>()); }); - class break_exception : public std::exception {}; - try { while (true) { TxnBatch batch; @@ -794,6 +780,7 @@ } } if (batch.txn_size() > 0) { + ResponseBatch resbatch; for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); // Regular transaction. @@ -809,7 +796,8 @@ first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } - process_txn(leader, map, txn, seqno, true); + Response *res = resbatch.add_res(); + process_txn(map, txn, seqno, res); action = "processed"; } else { if (first_seqno == -1) @@ -829,6 +817,8 @@ st_sleep(0); } } + if (resbatch.res_size() > 0) + sendmsg(leader, resbatch); } else { // Empty (default) Txn means "generate a snapshot." // TODO make this faster @@ -876,39 +866,28 @@ start_time(current_time_millis()), recovery_start_time(caught_up ? -1 : start_time), recovery_end_time(-1), + start_seqno(seqno), recovery_start_seqno(caught_up ? -1 : seqno), recovery_end_seqno(-1), last_seqno(-1) {} void run() { - //start_time = current_time_millis(); - //recovery_start_time = caught_up ? -1 : start_time; - //recovery_end_time = -1; - //recovery_start_seqno = caught_up ? -1 : seqno; - //recovery_end_seqno = -1; - //last_seqno = -1; + finally f(boost::bind(&response_handler::cleanup, this)); - finally f(lambda () { - long long end_time = current_time_millis(); - if (__ref(recovery_end_time) > -1) { - cout << __ref(rid) << ": "; - showtput("after recovery, finished", end_time, __ref(recovery_end_time), - __ref(seqno), __ref(recovery_end_seqno)); - } - }); + st_reader reader(replica); while (true) { finally f(boost::bind(&response_handler::loop_cleanup, this)); - Response res; + ResponseBatch batch; // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). if (last_seqno + 1 == seqno) { // Stop-interruptible in case we're already caught up. try { st_intr intr(stop_hub); - readmsg(replica, res); + readmsg(reader, batch); } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -925,34 +904,38 @@ // Only kill-interruptible because we want a clean termination (want // to get all the acks back). st_intr intr(kill_hub); - readmsg(replica, res); + readmsg(reader, batch); } - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - if (!caught_up && res.caught_up()) { - long long now = current_time_millis(), timediff = now - start_time; - caught_up = true; - recover_signals.push(now); - cout << rid << ": "; - cout << "recovering node caught up; took " - << timediff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } - } - if (res.seqno() % chkpt == 0) { - if (verbose) { + + for (int i = 0; i < batch.res_size(); ++i) { + const Response &res = batch.res(i); + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + if (!caught_up && res.caught_up()) { + long long now = current_time_millis(), timediff = now - start_time; + caught_up = true; + recover_signals.push(now); cout << rid << ": "; - cout << "got response " << res.seqno() << " from " << replica << endl; + cout << "recovering node caught up; took " + << timediff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } } - st_sleep(0); + if (res.seqno() % chkpt == 0) { + if (verbose) { + cout << rid << ": "; + cout << "got response " << res.seqno() << " from " << replica << endl; + } + st_sleep(0); + } + last_seqno = res.seqno(); } - last_seqno = res.seqno(); } } @@ -977,6 +960,17 @@ } } + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + st_netfd_t replica; const int &seqno; int rid; @@ -984,7 +978,7 @@ bool caught_up; st_channel<long long> ⊂ long long start_time, recovery_start_time, recovery_end_time; - int recovery_start_seqno, recovery_end_seqno, last_seqno; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; }; /** @@ -1101,6 +1095,23 @@ foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); + // XXX + finally fin(lambda () { + cout << "LEADER SUMMARY" << endl; + cout << "- total updates = " << updates << endl; + cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " + << g_map.size() << endl; + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + cout << "- dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << __ref(seqno) << endl; + foreach (const pii &p, g_map) { + of << p.first << ": " << p.second << endl; + } + } + }); + try { // Start handling responses. st_thread_group handlers; @@ -1113,11 +1124,17 @@ // Accept the recovering node, and tell it about the online replicas. st_netfd_t joiner; - { + try { st_intr intr(stop_hub); joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); accept_joiner.waitset(); + } catch (std::exception &ex) { + string s(ex.what()); + if (s.find("Interrupted system call") == s.npos) + throw; + else + throw break_exception(); } Join join = readmsg<Join>(joiner); replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); @@ -1133,6 +1150,7 @@ handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), rid++, ref(recover_signals), false), "handle_responses_joiner")); + } catch (break_exception &ex) { } catch (std::exception &ex) { // TODO: maybe there's a cleaner way to do this final step before waiting with the join cerr_thread_ex(ex) << endl; @@ -1276,7 +1294,7 @@ int mid_seqno = seqno; while (!backlog.empty()) { shared_ptr<Txn> p = backlog.take(); - process_txn(leader, map, *p, seqno, false); + process_txn(map, *p, seqno, nullptr); if (p->seqno() % chkpt == 0) { if (verbose) cout << "processed txn " << p->seqno() << " off the backlog; " @@ -1390,8 +1408,12 @@ "inspection/diffing") ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), "suppress txn msgs") + ("fake-bcast", po::bool_switch(&fake_bcast), + "when using --bcast-async, don't actually perform the socket write") ("show-updates,U", po::bool_switch(&show_updates), "log operations that touch (update/read/delete) an existing key") + ("bcast-async", po::bool_switch(&use_bcast_async), + "broadcast messages asynchronously") ("count-updates,u",po::bool_switch(&count_updates), "count operations that touch (update/read/delete) an existing key") ("general-txns,g", po::bool_switch(&general_txns), Added: ydb/trunk/src/p2.cc =================================================================== --- ydb/trunk/src/p2.cc (rev 0) +++ ydb/trunk/src/p2.cc 2009-02-13 20:57:05 UTC (rev 1178) @@ -0,0 +1,353 @@ +#include <algorithm> +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <commons/array.h> +#include <commons/nullptr.h> +#include <commons/rand.h> +#include <commons/sockets.h> +#include <commons/time.h> +#include <exception> +#include <iostream> +#include <set> +#include <string> +#include <sys/select.h> +#include <tr1/unordered_map> +#include <vector> +using namespace commons; +using namespace std; +using namespace tr1; +#define foreach BOOST_FOREACH +#define exception std::exception +#define STAT(t, c, x) \ + long long start_time = current_time_millis(); \ + x \ + ++c; \ + t += current_time_millis() - start_time; + +int bufsize = 1e8, chkpt = 1e4, batch_size = 1e4, thresh = 1e6; +bool verbose = true; +long long start = 0, seltime = 0, readtime = 0, writetime = 0; +int selcnt = 0, readcnt = 0, writecnt = 0; + +typedef array_view<char> arr; +arr mkarr(char *p = nullptr) { return arr(p, false); } + +typedef unordered_map<int, int> map_t; + +fd_set rfds, wfds, efds; + +class reader +{ +private: + array<char> buf_; + char *start_; + char *end_; + int fd_; +public: + reader(int fd) : buf_(bufsize), start_(buf_.get()), end_(start_), fd_(fd) {} + size_t rem() { return buf_.end() - end_; } + size_t amt() { return end_ - start_; } + int fd() { return fd_; } + arr read(size_t req) { + if (req <= amt()) { + arr a = mkarr(start_); + start_ += req; + return a; + } + // make sure we have enough space + check(req < buf_.size()); + // shift if necessary + if (req > rem()) { + memmove(buf_.get(), start_, amt()); + size_t diff = start_ - buf_.get(); + start_ -= diff; + end_ -= diff; + } + // read; advance end_ + STAT(readtime, readcnt, int res = ::read(fd(), end_, rem());) + int e = errno; + errno = 0; + //cout << "read res " << res << endl; + if (e == EAGAIN) return mkarr(); + if (res < 1) { close(fd()); throw exception(); } + end_ += res; + // if we still haven't read enough (requested), ret null + if (amt() < req) return mkarr(); + // advance start_ and return the newly consumed range + arr a = mkarr(start_); + start_ += req; + //cout << "offset " << a.get() - buf_.get() << endl; + return a; + } +}; + +class msg_reader +{ +private: + // len_ of 0 means we haven't read the prefix yet + uint32_t len_; + reader r_; +public: + msg_reader(int fd) : len_(0), r_(fd) {} + arr read(uint32_t &len) { + // read prefix + if (len_ == 0) { + arr prefix = r_.read(sizeof len_); + if (prefix.get() == nullptr) return mkarr(); + uint32_t tmp = *reinterpret_cast<uint32_t*>(prefix.get()); + //cout << "tmp " << tmp << endl; + len_ = tmp; + } + // read body + check(len_ > 0); + arr body = r_.read(len_); + if (body.get() != nullptr) { + len = len_; + len_ = 0; + } + return body; + } +}; + +class writer +{ +private: + array<char> buf_; + // start/end of unsent, prepared range + char *start_; + char *end_; + int fd_; +public: + writer(int fd) : buf_(bufsize), start_(buf_.get()), end_(start_), fd_(fd) {} + int fd() { return fd_; }; + fd_set &wfds() { return ::wfds; } + size_t amt() { return end_ - start_; } + size_t rem() { return buf_.end() - end_; } + + arr getbuf(uint32_t req) { + uint32_t tot = req + sizeof req; + check(tot > 0); + check(tot <= buf_.size()); + //cout << "getbuf req " << req << endl; + + // make space? + if (tot > rem()) { + if (tot > buf_.size() - amt()) return mkarr(); // not enough space + memmove(buf_.get(), start_, amt()); // shift + size_t diff = start_ - buf_.get(); + //if (diff > 0) cout << "shifting amt " << amt() << " diff " << diff << endl; + start_ -= diff; + end_ -= diff; + assert(rem() >= tot); + } + + // write length prefix + allocate/return body + *(reinterpret_cast<uint32_t*>(end_)) = req; + end_ += sizeof req; + arr p = mkarr(end_); + end_ += req; + return p; + } + + void write() { + // perform the write + STAT(writetime, writecnt, int res = ::write(fd(), start_, end_ - start_);) + if (res < 0) { close(fd()); throw exception(); } + //cout << "write res " << res << " amt " << amt() << endl; + // advance start_ + start_ += res; + // re-register for writes if we still have things to write + if (end_ - start_ > 0) { + FD_SET(fd(), &wfds()); + } + } +}; + +class replica_channel +{ +private: + int fd_; + writer w_; + char *buf_; + +public: + replica_channel(int fd) : fd_(fd), w_(fd) {} + int fd() { return fd_; } + + void writeint(uint32_t i) { + *(reinterpret_cast<uint32_t*>(buf_)) = i; + buf_ += sizeof i; + } + + void handle_write() { + //cout << "writing" << endl; + uint32_t npairs = batch_size; + uint32_t len = 2 * sizeof(uint32_t) * npairs; + arr a = w_.getbuf(len); + buf_ = a; + if (buf_ == nullptr) return; + for (uint32_t i = 0; i < npairs; ++i) { + writeint(1); + writeint(2); + } + w_.write(); + } + +}; + +class replica +{ +private: + int fd_; + msg_reader r_; + const char *buf_; + map_t map_; + int counter_; + int readcount_; + long long start_; + +public: + replica(int fd) : fd_(fd), r_(fd), counter_(0), readcount_(0), start_(current_time_millis()) {} + int fd() { return fd_; } + + uint32_t readint() { + uint32_t i = *reinterpret_cast<const uint32_t*>(buf_); + buf_ += sizeof i; + return i; + } + + void handle_read() { + ++readcount_; + while (true) { + uint32_t len = 0; + arr a = r_.read(len); + buf_ = a.get(); + if (buf_ == nullptr) break; + uint32_t npairs = len / sizeof(uint32_t) / 2; + check(2 * sizeof(uint32_t) * npairs == len); // should be whole count + for (uint32_t i = 0; i < npairs; ++i) { + uint32_t k = readint(); + uint32_t v = readint(); + map_[k] = v; + ++counter_; + if (counter_ % chkpt == 0) { + //if (verbose) cout << current_time_millis() << ": count " << counter_ << endl; + if (counter_ > thresh) { + long long end = current_time_millis(); + double rate = double(counter_) / (end - start_) * 1000; + cout << "rate " << rate << " pairs/s " << rate / 5 << " tps; readcount " << readcount_ << endl; + throw exception(); + } + } + } + } + } + +}; + +class mainer +{ +private: + vector<replica_channel*> rs; + +public: + int main(int argc, char **argv) { + bool is_leader; + string host; + + namespace po = boost::program_options; + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("leader,l", po::bool_switch(&is_leader), "leader") + ("verbose,v",po::bool_switch(&verbose), "verbose") + ("host,H", + po::value<string>(&host)->default_value(string("localhost")), + "hostname or address of the leader") + ("batch,b", po::value<int>(&batch_size)->default_value(1e4), "batch size"); + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + } catch (exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int srv = is_leader ? tcp_listen(7654, true) : -1; + int cli = is_leader ? -1 : tcp_connect(host.c_str(), 7654); + if (cli >= 0) checknnegerr(fcntl(cli, F_SETFL, O_NONBLOCK | fcntl(cli, F_GETFL, 0))); + int nfds = max(srv, cli); + if (srv >= 0) FD_SET(srv, &rfds); + if (cli >= 0) FD_SET(cli, &rfds); + replica rep(cli); + if (cli >= 0) { start = current_time_millis(); seltime = 0; } + + while (true) { + //sleep(1); + //cout << endl; + STAT(seltime, selcnt, checknnegerr(select(nfds + 1, &rfds, &wfds, &efds, nullptr));) + //cout << "select waited " << diff << endl; + + // accept new connections + if (srv >= 0 && FD_ISSET(srv, &rfds)) { + if (start == 0) { start = current_time_millis(); seltime = 0; } + cout << "accept" << endl; + int r = checknnegerr(accept(srv, nullptr, nullptr)); + cout << fcntl(r, F_GETFL, 0) << ' '; + checknnegerr(fcntl(r, F_SETFL, O_NONBLOCK | fcntl(r, F_GETFL, 0))); + cout << fcntl(r, F_GETFL, 0) << endl; + rs.push_back(new replica_channel(r)); + nfds = max(nfds, r); + FD_SET(r, &wfds); + } + + if (cli >= 0 && FD_ISSET(cli, &rfds)) { + rep.handle_read(); + } + + // handle ready events + foreach (replica_channel *p, rs) { + replica_channel &r = *p; + if (FD_ISSET(r.fd(), &rfds)) { + //r.handle_read(); + } + if (FD_ISSET(r.fd(), &wfds)) { + r.handle_write(); + } + FD_SET(r.fd(), &rfds); + } + } + + return 0; + } +}; + +void dump() { + long long tot = current_time_millis() - start; + cout << "readtime " << readtime << " writetime " << writetime << " seltime " << seltime << " tot " << tot << endl; + cout << "readcnt " << readcnt << " writecnt " << writecnt << " selcnt " << selcnt << endl; +} + +int main(int argc, char **argv) { + int ret; + atexit(dump); + try { + ret = mainer().main(argc, argv); + } catch (...) { + ret = 1; + } + return ret; +} Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-02-13 20:54:46 UTC (rev 1177) +++ ydb/trunk/src/ydb.proto 2009-02-13 20:57:05 UTC (rev 1178) @@ -78,4 +78,8 @@ message TxnBatch { repeated Txn txn = 1; -} \ No newline at end of file +} + +message ResponseBatch { + repeated Response res = 1; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |