[Assorted-commits] SF.net SVN: assorted:[1330] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-24 06:11:05
|
Revision: 1330 http://assorted.svn.sourceforge.net/assorted/?rev=1330&view=rev Author: yangzhang Date: 2009-03-24 06:10:53 +0000 (Tue, 24 Mar 2009) Log Message: ----------- - updated build system to first lzz then clamp, fixing the issue of where to place lambdas (header or source) - bunch of physical refactoring, particularly trying to reduce the number of includes in headers Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/mkdeps.py ydb/trunk/src/msg.h ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/ydb.lzz.clamp Added Paths: ----------- ydb/trunk/src/util.lzz.clamp Removed Paths: ------------- ydb/trunk/src/util.lzz Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/Makefile 2009-03-24 06:10:53 UTC (rev 1330) @@ -140,20 +140,41 @@ %.cc: %.cc.cog cog.py $< > $@ -%.cc %.hh: %.lzz - lzz -hx hh -sx cc -hl -sl -hd -sd $< - %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< -%.lzz: %.lzz.clamp +# ORIG +# +#%.cc %.hh: %.lzz +# lzz -hx hh -sx cc -hl -sl -hd -sd $< +# +#%.lzz: %.lzz.clamp +# rm -f $@ +# mkdir -p .clamp/ +# clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ +# sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ +# sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ +# chmod -w $@ + +%.cc.clamp %.hh.clamp: %.lzz.clamp + ln -sf $< $(basename $<) + rm -f $(basename $(basename $<)).{hh,cc}.clamp + lzz -hx hh.clamp -sx cc.clamp -hd -sd $(basename $<) + chmod -w $(basename $(basename $<)).{hh.clamp,cc.clamp} + +%.cc: %.cc.clamp rm -f $@ mkdir -p .clamp/ - clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ - sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ - sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ + clamp --outdir .clamp/ --prefix $(basename $@)_cc < $< | \ + sed 's/"$(basename $@).hh.clamp"/"$(basename $@).hh"/' > $@ chmod -w $@ +%.hh: %.hh.clamp + rm -f $@ + mkdir -p .clamp/ + clamp --outdir .clamp/ --prefix $(basename $@)_hh < $< > $@ + chmod -w $@ + pch.h: svn ls -rHEAD -R $(SVNURL) | \ egrep -v '/$$|Makefile' | \ @@ -186,7 +207,7 @@ -Wno-unused-parameter clean: - rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) + rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) *.d *.hh.clamp *.cc.clamp distclean: clean rm -f pch.h pch.h.gch Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/main.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,15 +1,16 @@ #hdr #include "unsetprefs.h" -#include <boost/archive/binary_iarchive.hpp> -#include <boost/archive/binary_oarchive.hpp> #include <boost/tuple/tuple.hpp> -#include <commons/st/st.h> +#include <commons/st/intr.h> +#include <commons/st/sync.h> +#include <commons/st/channel.h> #include <fstream> // ofstream #include <vector> -#include "msg.h" #include "util.hh" #include "setprefs.h" +namespace boost { namespace archive { class binary_oarchive; } } + using namespace boost; using namespace boost::archive; using namespace commons; @@ -21,11 +22,14 @@ #src #include "unsetprefs.h" #include <boost/foreach.hpp> +#include <boost/archive/binary_oarchive.hpp> #include <commons/assert.h> -#include <commons/nullptr.h> #include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/sockets.h> #include <iostream> #include <unistd.h> // pipe, write, sync +#include "msg.h" #include "setprefs.h" #end @@ -33,6 +37,7 @@ typedef commons::array<char> recovery_t; + // Configuration. st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, @@ -161,29 +166,36 @@ class wal { public: - wal(const string &fname) : of(fname.c_str()), out(of) {} + wal(const string &fname) : + of_(fname.c_str()), + ar_(new binary_oarchive(of())) + {} + ~wal() { delete ar_; } template <typename T> - void log(const T &msg) { ser(of, msg); } + void log(const T &msg) { ser(of(), msg); } void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); + of().write(reinterpret_cast<const char*>(buf), len); } void logdel(int key) { int op = op_del; // TODO: is this really necessary? - out & op & key; + ar() & op & key; } void logwrite(int key, int val) { int op = op_write; - out & op & key & val; + ar() & op & key & val; } void logcommit() { int op = op_commit; - out & op; + ar() & op; } - void flush() { of.flush(); } + void flush() { of().flush(); } private: - ofstream of; - binary_oarchive out; + ofstream of_; + //unique_ptr<binary_oarchive> ar_; + binary_oarchive *ar_; + ofstream &of() { return of_; } + binary_oarchive &ar() { return *ar_; }; }; // TODO? Modified: ydb/trunk/src/mkdeps.py =================================================================== --- ydb/trunk/src/mkdeps.py 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/mkdeps.py 2009-03-24 06:10:53 UTC (rev 1330) @@ -39,12 +39,17 @@ for dep in deps(src(hdr)): yield dep -for i in pwd.glob('*.lzz') + pwd.glob('*.lzz.clamp'): - print sub(r'\.lzz(\.clamp)?', '.o', i), ':', ' '.join(deps(i)) +for i in pwd.glob('*.lzz.clamp'): + print sub(r'\.lzz\.clamp', '.o', i), ':', sub(r'\.lzz\.clamp', '.hh', i), ' '.join(deps(i)) for i in pwd.glob('*.d'): with file(i) as f: for line in f: for word in line.split(): - if '.clamp' in word: - print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) + if '.clamp/' in word: + if '_hh_lambda_' in word: + print sub(r'(\.clamp/(.+)_hh_lambda_.+\.clamp_h)', r'\1: \2.hh.clamp', word) + elif '_cc_lambda_' in word: + print sub(r'(\.clamp/(.+)_cc_lambda_.+\.clamp_h)', r'\1: \2.cc.clamp', word) + else: + print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) Modified: ydb/trunk/src/msg.h =================================================================== --- ydb/trunk/src/msg.h 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/msg.h 2009-03-24 06:10:53 UTC (rev 1330) @@ -3,7 +3,7 @@ #include <commons/array.h> #include <commons/exceptions.h> -#include <commons/st/st.h> +#include <commons/st/reader.h> #include <commons/streamwriter.h> #include <commons/utility.h> #include <iomanip> Modified: ydb/trunk/src/stxn.lzz.clamp =================================================================== --- ydb/trunk/src/stxn.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/stxn.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,13 +1,15 @@ #hdr #include "unsetprefs.h" #include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/tuple/tuple.hpp> +#include <commons/array.h> #include <commons/memory.h> -#include <boost/foreach.hpp> +#include <commons/rand.h> #include <commons/snap_map.h> -#include <commons/rand.h> -#include <commons/array.h> +#include <commons/time.h> #include <google/dense_hash_map> -#include <boost/tuple/tuple.hpp> +#include "msg.h" #include "util.hh" #include "main.hh" #include "setprefs.h" @@ -89,6 +91,9 @@ issue_txns(st_channel<replica_info> &newreps, int &seqno, st_bool &accept_joiner) { + USE(newreps); + USE(seqno); + USE(accept_joiner); typedef typename Types::TxnBatch TxnBatch; typedef typename Types::Txn Txn; typedef typename Types::Op Op; @@ -255,6 +260,10 @@ process_txn(mii &map, const typename Types::Txn &txn, int &seqno, typename RTypes::Response *res) { + USE(map); + USE(txn); + USE(seqno); + USE(res); typedef typename Types::Txn Txn; typedef typename Types::Op Op; checkeq(txn.seqno(), seqno + 1); @@ -338,6 +347,14 @@ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { + USE(leader); + USE(map); + USE(seqno); + USE(send_states); + USE(backlog); + USE(init_seqno); + USE(mypos); + USE(nnodes); typedef typename Types::TxnBatch TxnBatch; typedef typename Types::Txn Txn; typedef typename Types::Op Op; Modified: ydb/trunk/src/tpcc.lzz.clamp =================================================================== --- ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/tpcc.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -11,6 +11,8 @@ #src #include "unsetprefs.h" #include <commons/memory.h> +#include <commons/st/io.h> +#include <commons/time.h> #include <string> #include "tpcc/clock.h" #include "tpcc/randomgenerator.h" @@ -18,6 +20,7 @@ #include "tpcc/tpccdb.h" #include "tpcc/tpccgenerator.h" #include "tpcc/tpcctables.h" +#include "msg.h" #include "setprefs.h" #end @@ -32,6 +35,16 @@ unique_ptr<TPCCTables> g_tables; namespace { +class st_bcast { + const vector<st_netfd_t> &fds_; +public: + st_bcast(const vector<st_netfd_t> &fds) : fds_(fds) {} + void operator()(const void *buf, size_t len) { + foreach (st_netfd_t dst, fds_) + st_timed_write(dst, buf, len); + } +}; + class st_tpcc : public TPCCDB { private: @@ -43,10 +56,7 @@ public: st_tpcc(const vector<st_netfd_t> &fds) : a_(buf_size), - writer_(lambda(const void *buf, size_t len) { - foreach (st_netfd_t dst, __ref(fds)) - st_timed_write(dst, buf, len); - }, a_, buf_size) {} + writer_(st_bcast(fds), a_, buf_size) {} void flush() { writer_.mark_and_flush(); } void set_seqno(int seqno) { seqno_ = seqno; } @@ -821,4 +831,3 @@ ASSERT(false); } } - Deleted: ydb/trunk/src/util.lzz =================================================================== --- ydb/trunk/src/util.lzz 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/util.lzz 2009-03-24 06:10:53 UTC (rev 1330) @@ -1,503 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <cstring> // size_t -#include <ios> // streamoff -#include <st.h> -#include <string> -#include <map> -#include <set> -#include <utility> -#include <boost/scoped_array.hpp> -#include <commons/array.h> -#include <commons/nullptr.h> -#include <commons/time.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include "msg.h" -using namespace std; -using namespace boost; -using namespace ydb::msg; -using namespace google::protobuf::io; -#end -#src -#include <sys/socket.h> // getpeername -#include <gtest/gtest.h> -#include <netinet/in.h> // in_addr etc. -#end - -using namespace testing; - -// -// Globals -// - -bool fake_bcast, profile_threads, multirecover, debug_threads; -size_t buf_size; -long long write_thresh; - -// -// Display -// - -void -showdatarate(const char *action, streamoff len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showdatarate(const char *action, size_t len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showtput(const char *action, long long stop_time, long long start_time, - int stop_count, int start_count) -{ - long long time_diff = stop_time - start_time; - int count_diff = stop_count - start_count; - double rate = double(count_diff) * 1000. / double(time_diff); - cout << action << " " << count_diff << " txns [" - << start_count << ".." << stop_count - << "] in " << time_diff << " ms [" - << start_time << ".." << stop_time - << "] (" - << rate << " tps)" << endl; -} - -// -// Calculations -// - -pair<size_t, size_t> -recovery_range(size_t size, int mypos, int nnodes) -{ - return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, - multirecover ? size * (mypos + 1) / size_t(nnodes) : size); -} - -inline bool -check_interval(int seqno, int interval) -{ - return interval > 0 && seqno % interval == interval - 1; -} - -/** - * Return range * part / nparts, but with proper casting. Assumes that part < - * nparts. - */ -inline int -interp(int range, int part, int nparts) { - return static_cast<int>(static_cast<long long>(range) * part / nparts); -} - -#src -TEST(interp_test, basics) { - EXPECT_EQ(0, interp(3, 0, 3)); - EXPECT_EQ(1, interp(3, 1, 3)); - EXPECT_EQ(2, interp(3, 2, 3)); - EXPECT_EQ(3, interp(3, 3, 3)); - - EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); - EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); - EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); -} -#end - -/** - * Convenience function for calculating percentages. - */ -template<typename T> -inline double pct(T sub, T tot) -{ - return 100 * double(sub) / double(tot); -} - -// -// ST IO -// - -/** - * Perform an st_write but warn if it took over write_thresh ms. - */ -void -st_timed_write(st_netfd_t dst, const void *buf, size_t len) -{ - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() << " write of " << len - << " bytes to dst " << show_sockaddr(dst) << " blocked for " - << write_time << " ms" << endl; - } - } -} - -// -// ST Sockets -// - -char * -show_sockaddr(st_netfd_t fd) -{ - sockaddr_in sa; - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd), - reinterpret_cast<sockaddr*>(&sa), - &salen)); - return inet_ntoa(sa.sin_addr); -} - -inline const string& -nfd2name(st_netfd_t fd) -{ - return nfdnames[fd]; -} - -map<st_netfd_t, string> nfdnames; - -// -// ST Threads -// - -/** - * The list of all threads. Keep track of these so that we may cleanly shut - * down all threads. - */ -set<st_thread_t> threads; - -/** - * RAII for adding/removing the current thread from the global threads set. - */ -class thread_eraser -{ - public: - thread_eraser() { threads.insert(st_thread_self()); } - ~thread_eraser() { threads.erase(st_thread_self()); } -}; - -/** - * For debug/error-printing purposes. - */ -map<st_thread_t, string> threadnames; -st_thread_t last_thread; - -/** - * For profiling. - */ -map<st_thread_t, long long> threadtimes; -long long thread_start_time; - -/** - * Look up thread name, or just show thread ID. - */ -inline string -threadname(st_thread_t t = st_thread_self()) { - if (threadnames.find(t) != threadnames.end()) { - return threadnames[t]; - } else { - return lexical_cast<string>(t); - } -} - -/** - * Debug function for thread names. Remember what we're switching from. - */ -inline void -switch_out_cb() -{ - if (debug_threads) last_thread = st_thread_self(); - if (profile_threads) - threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; -} - -/** - * Debug function for thread names. Show what we're switching from/to. - */ -inline void switch_in_cb() -{ - if (debug_threads && last_thread != st_thread_self()) { - cout << "switching"; - if (last_thread != 0) cout << " from " << threadname(last_thread); - cout << " to " << threadname() << endl; - } - if (profile_threads) - thread_start_time = current_time_millis(); -} - -/** - * Print to cerr a thread exception. - */ -ostream& -cerr_thread_ex(const std::exception &ex) -{ - return cerr << "exception in thread " << threadname() - << ": " << ex.what(); -} - -// -// Serialization -// - -/** - * Adapter for arrays to look like strings (for PB serialization). - */ -class ser_array -{ - commons::array<char> a_; - size_t size_; -public: - ser_array(size_t size = buf_size) : a_(size), size_(0) {} - char *data() const { return a_.get(); } - size_t size() const { return size_; } - void clear() { size_ = 0; } - void stretch(size_t size) { - if (size > a_.size()) - a_.reset(new char[size], size); - size_ = size; - } -}; - -//typedef string ser_t; -typedef ser_array ser_t; - -template<typename T> -void -ser(writer &w, const T &msg) -{ - uint32_t len = msg.ByteSize(); - w.mark(); - w.reserve(len); - check(msg.SerializeToArray(w.cur(), len)); - w.skip(len); -} - -/** - * Serialization. - * - * TODO: experiment with which method is the fastest: using a string as shown - * here or computing the bytesize then allocating (or grabbing/reserving) the - * array. - */ -template<typename T> -void -ser(string &s, const T &msg) -{ - // Serialize message to a buffer. - uint32_t len; - s.append(sizeof len, '\0'); - check(msg.AppendToString(&s)); - - // Warn if the message is large. - if (s.size() > 1000000) - cout << "serializing large message of " << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); - char *plen = reinterpret_cast<char*>(&len); - copy(plen, plen + sizeof len, s.begin()); -} - -template<typename T> -inline void -ser(ser_array &s, const T &msg) -{ - int len = msg.ByteSize(); - - // Grow the array as needed. - s.stretch(len + sizeof(uint32_t)); - - // Serialize message to a buffer with four-byte length prefix. - check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); - *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); -} - -/** - * Serialization. - */ -template<typename T> -inline void -ser(ostream &s, const T &msg) -{ - uint32_t len = htonl(uint32_t(msg.ByteSize())); - s.write(reinterpret_cast<const char*>(&len), sizeof len); - check(msg.SerializeToOstream(&s)); -} - -// -// Messaging -// - -/** - * Send a message to some destinations. - */ -inline void -bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) -{ - if (!fake_bcast) { - foreach (st_netfd_t dst, dsts) { - st_timed_write(dst, msg.data(), msg.size()); - } - } -} - -/** - * Send a message to some destinations, using whichever method of network IO - * was chosen (sync or async). - */ -template<typename T> -inline void -bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) -{ - ser_t s; - ser(s, msg); - bcastbuf(dsts, s); -} - -/** - * Send a message to a single recipient. - */ -inline void -sendbuf(st_netfd_t dst, const ser_t &msg) -{ - if (!fake_bcast) - st_timed_write(dst, msg.data(), msg.size()); -} - -/** - * Send a message to a single recipient. - */ -template<typename T> -inline void -sendmsg(st_netfd_t dst, const T &msg) -{ - ser_t s; - ser(s, msg); - sendbuf(dst, s); -} - -/** - * Read a message. This is done in two steps: first by reading the length - * prefix, then by reading the actual body. This function also provides a way - * to measure how much time is spent actually reading the message from the - * network. Such measurement only makes sense for large messages which take a - * long time to receive. - * - * \param[in] src The socket from which to read. - * - * \param[in] msg The protobuf to read into. - * - * \param[out] start_time If not null, record the time at which we start to - * receive the message (after the length is received). - * - * \param[out] stop_time If not null, record the time at which we finish - * receiving the message (before we deserialize the protobuf). - * - * \param[out] len If not null, record the size of the serialized message - * in bytes. - * - * \param[in] timeout on each of the two read operations (first one is on - * length, second one is on the rest). - * - * \return The length of the serialized message. - */ -template <typename T> -size_t -readmsg(st_netfd_t src, T & msg, long long *start_time = nullptr, long long - *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - // Read the message length. - uint32_t len; - checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - timeout), - static_cast<ssize_t>(sizeof len)); - if (start_time != nullptr) - *start_time = current_time_millis(); - len = ntohl(len); - - // Parse the message body. Try stack-allocation if possible. - scoped_array<char> sbuf; - char *buf; - if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); - else sbuf.reset(buf = new char[len]); - checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); - if (stop_time != nullptr) - *stop_time = current_time_millis(); - check(msg.ParseFromArray(buf, len)); - - return len; -} - -/** - * Same as the above readmsg(), but returns an internally constructed message. - * This is a "higher-level" readmsg() that relies on return-value optimization - * for avoiding unnecessary copies. - */ -template <typename T> -inline T -readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - T msg; - readmsg(src, msg, nullptr, nullptr, timeout); - return msg; -} - -/** - * Same as the above readmsg() but uses an st_reader instead of a raw - * st_netfd_t. - */ -template <typename T> -inline void -readmsg(st_reader &src, T & msg) -{ - managed_array<char> a = src.read(sizeof(uint32_t)); - uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); - check(msg.ParseFromArray(src.read(len), len)); -} - -template<typename T> -inline void -readmsg(anchored_stream_reader &src, T &msg) -{ - uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(checkpass(src.read(len)), len)); -} - -template<typename T> -inline void -readmsg(istream &src, T &msg) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); -#if 0 - IstreamInputStream iis(&src); - LimitingInputStream lis(&iis, len); - check(msg.ParseFromZeroCopyStream(&lis)); -#else - char buf[len]; - src.read(buf, len); - check(msg.ParseFromArray(buf, len)); -#endif -} - -inline uint32_t -readlen(istream &src) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); - ASSERT(len < 10000); - return len; -} - Added: ydb/trunk/src/util.lzz.clamp =================================================================== --- ydb/trunk/src/util.lzz.clamp (rev 0) +++ ydb/trunk/src/util.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -0,0 +1,534 @@ +#hdr +#include "unsetprefs.h" +#include <cstring> // size_t +#include <iosfwd> // streamoff +#include <st.h> +#include <string> +#include <map> +#include <set> +#include <utility> // pair +#include <vector> +#include <commons/array.h> +#include <commons/delegates.h> +#include <commons/nullptr.h> +#include <arpa/inet.h> // htonl, ntohl +//#include <commons/st/st.h> + +using namespace std; +using namespace boost; +using namespace commons; + +namespace commons { + class st_reader; + class stream_writer; + class anchored_stream_reader; +} +namespace ydb { namespace msg { typedef stream_writer writer; } } +using namespace ydb::msg; +namespace google { namespace protobuf { class Message; } } +using google::protobuf::Message; +#end + +#src +#include "unsetprefs.h" +#include <boost/foreach.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/scoped_array.hpp> +#include <commons/st/reader.h> +#include <commons/st/threads.h> +#include <commons/streamreader.h> +#include <commons/streamwriter.h> +#include <commons/time.h> +#include <sys/socket.h> // getpeername +#include <gtest/gtest.h> +#include <netinet/in.h> // in_addr etc. +#include <google/protobuf/message.h> +//#include <google/protobuf/io/zero_copy_stream_impl.h> +//using namespace google::protobuf::io; +#include "setprefs.h" +#end + +#if 1 + +// +// Globals +// + +bool fake_bcast, profile_threads, multirecover, debug_threads; +size_t buf_size; +long long write_thresh; + +// +// Display +// + +void +showdatarate(const char *action, streamoff len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << double(len) / double(time) / 1000 << " MB/s)" << endl; +} + +void +showdatarate(const char *action, size_t len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << double(len) / double(time) / 1000 << " MB/s)" << endl; +} + +void +showtput(const char *action, long long stop_time, long long start_time, + int stop_count, int start_count) +{ + long long time_diff = stop_time - start_time; + int count_diff = stop_count - start_count; + double rate = double(count_diff) * 1000. / double(time_diff); + cout << action << " " << count_diff << " txns [" + << start_count << ".." << stop_count + << "] in " << time_diff << " ms [" + << start_time << ".." << stop_time + << "] (" + << rate << " tps)" << endl; +} + +// +// Calculations +// + +pair<size_t, size_t> +recovery_range(size_t size, int mypos, int nnodes) +{ + return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, + multirecover ? size * (mypos + 1) / size_t(nnodes) : size); +} + +inline bool +check_interval(int seqno, int interval) +{ + return interval > 0 && seqno % interval == interval - 1; +} + +/** + * Return range * part / nparts, but with proper casting. Assumes that part < + * nparts. + */ +inline int +interp(int range, int part, int nparts) { + return static_cast<int>(static_cast<long long>(range) * part / nparts); +} + +#src +TEST(interp_test, basics) { + EXPECT_EQ(0, interp(3, 0, 3)); + EXPECT_EQ(1, interp(3, 1, 3)); + EXPECT_EQ(2, interp(3, 2, 3)); + EXPECT_EQ(3, interp(3, 3, 3)); + + EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); + EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); + EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); +} +#end + +/** + * Convenience function for calculating percentages. + */ +template<typename T> +inline double pct(T sub, T tot) +{ + return 100 * double(sub) / double(tot); +} + +// +// ST IO +// + +/** + * Perform an st_write but warn if it took over write_thresh ms. + */ +void +st_timed_write(st_netfd_t dst, const void *buf, size_t len) +{ + long long before_write = -1; + if (write_thresh > 0) { + before_write = current_time_millis(); + } + + checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + + if (write_thresh > 0) { + long long write_time = current_time_millis() - before_write; + if (write_time > write_thresh) { + cout << "thread " << threadname() << " write of " << len + << " bytes to dst " << show_sockaddr(dst) << " blocked for " + << write_time << " ms" << endl; + } + } +} + +// +// ST Sockets +// + +char * +show_sockaddr(st_netfd_t fd) +{ + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd), + reinterpret_cast<sockaddr*>(&sa), + &salen)); + return inet_ntoa(sa.sin_addr); +} + +inline const string& +nfd2name(st_netfd_t fd) +{ + return nfdnames[fd]; +} + +map<st_netfd_t, string> nfdnames; + +// +// ST Threads +// + +/** + * The list of all threads. Keep track of these so that we may cleanly shut + * down all threads. + */ +set<st_thread_t> threads; + +/** + * RAII for adding/removing the current thread from the global threads set. + */ +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * For debug/error-printing purposes. + */ +typedef map<st_thread_t, string> threadnames_t; +threadnames_t threadnames; +st_thread_t last_thread; + +/** + * For profiling. + */ +map<st_thread_t, long long> threadtimes; +long long thread_start_time; + +/** + * Look up thread name, or just show thread ID. + */ +const string & +threadname(st_thread_t t = st_thread_self()) { + threadnames_t::iterator it = threadnames.find(t); + if (it == threadnames.end()) { + return threadnames[t] = lexical_cast<string>(t); + } else { + return it->second; + } +} + +/** + * Debug function for thread names. Remember what we're switching from. + */ +void +switch_out_cb() +{ + if (debug_threads) last_thread = st_thread_self(); + if (profile_threads) + threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; +} + +/** + * Debug function for thread names. Show what we're switching from/to. + */ +void +switch_in_cb() +{ + if (debug_threads && last_thread != st_thread_self()) { + cout << "switching"; + if (last_thread != 0) cout << " from " << threadname(last_thread); + cout << " to " << threadname() << endl; + } + if (profile_threads) + thread_start_time = current_time_millis(); +} + +/** + * Print to cerr a thread exception. + */ +ostream& +cerr_thread_ex(const std::exception &ex) +{ + return cerr << "exception in thread " << threadname() + << ": " << ex.what(); +} + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + */ +void +my_spawn_helper(const fn f) +{ + thread_eraser eraser; + try { f(); } + catch (std::exception &ex) { cerr_thread_ex(ex) << endl; } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + * \param[in] f The function to execute. + */ +st_thread_t +my_spawn(const fn &f, string name) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f)); + threads.insert(t); + threadnames[t] = name; + return t; +} + + +// +// Serialization +// + +/** + * Adapter for arrays to look like strings (for PB serialization). + */ +class ser_array +{ + commons::array<char> a_; + size_t size_; +public: + ser_array(size_t size = buf_size) : a_(size), size_(0) {} + char *data() const { return a_.get(); } + size_t size() const { return size_; } + void clear() { size_ = 0; } + void stretch(size_t size) { + if (size > a_.size()) + a_.reset(new char[size], size); + size_ = size; + } +}; + +//typedef string ser_t; +typedef ser_array ser_t; + +void +ser(writer &w, const Message &msg) +{ + uint32_t len = msg.ByteSize(); + w.mark(); + w.reserve(len); + check(msg.SerializeToArray(w.cur(), len)); + w.skip(len); +} + +/** + * Serialization. + * + * TODO: experiment with which method is the fastest: using a string as shown + * here or computing the bytesize then allocating (or grabbing/reserving) the + * array. + */ +void +ser(string &s, const Message &msg) +{ + // Serialize message to a buffer. + uint32_t len; + s.append(sizeof len, '\0'); + check(msg.AppendToString(&s)); + + // Warn if the message is large. + if (s.size() > 1000000) + cout << "serializing large message of " << s.size() << " bytes" << endl; + + // Prefix the message with a four-byte length. + len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); + char *plen = reinterpret_cast<char*>(&len); + copy(plen, plen + sizeof len, s.begin()); +} + +void +ser(ser_array &s, const Message &msg) +{ + int len = msg.ByteSize(); + + // Grow the array as needed. + s.stretch(len + sizeof(uint32_t)); + + // Serialize message to a buffer with four-byte length prefix. + check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); + *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); +} + +/** + * Serialization. + */ +void +ser(ostream &s, const Message &msg) +{ + uint32_t len = htonl(uint32_t(msg.ByteSize())); + s.write(reinterpret_cast<const char*>(&len), sizeof len); + check(msg.SerializeToOstream(&s)); +} + +// +// Messaging +// + +/** + * Send a message to some destinations. + */ +void +bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) +{ + if (!fake_bcast) { + foreach (st_netfd_t dst, dsts) { + st_timed_write(dst, msg.data(), msg.size()); + } + } +} + +/** + * Send a message to some destinations, using whichever method of network IO + * was chosen (sync or async). + */ +void +bcastmsg(const vector<st_netfd_t> &dsts, const Message &msg) +{ + ser_t s; + ser(s, msg); + bcastbuf(dsts, s); +} + +/** + * Send a message to a single recipient. + */ +void +sendbuf(st_netfd_t dst, const ser_t &msg) +{ + if (!fake_bcast) + st_timed_write(dst, msg.data(), msg.size()); +} + +/** + * Send a message to a single recipient. + */ +void +sendmsg(st_netfd_t dst, const Message &msg) +{ + ser_t s; + ser(s, msg); + sendbuf(dst, s); +} + +/** + * Read a message. This is done in two steps: first by reading the length + * prefix, then by reading the actual body. This function also provides a way + * to measure how much time is spent actually reading the message from the + * network. Such measurement only makes sense for large messages which take a + * long time to receive. + * + * \param[in] src The socket from which to read. + * + * \param[in] msg The protobuf to read into. + * + * \param[out] start_time If not null, record the time at which we start to + * receive the message (after the length is received). + * + * \param[out] stop_time If not null, record the time at which we finish + * receiving the message (before we deserialize the protobuf). + * + * \param[out] len If not null, record the size of the serialized message + * in bytes. + * + * \param[in] timeout on each of the two read operations (first one is on + * length, second one is on the rest). + * + * \return The length of the serialized message. + */ +size_t +readmsg(st_netfd_t src, Message &msg, long long *start_time = nullptr, long long + *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + // Read the message length. + uint32_t len; + checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, + timeout), + static_cast<ssize_t>(sizeof len)); + if (start_time != nullptr) + *start_time = current_time_millis(); + len = ntohl(len); + + // Parse the message body. Try stack-allocation if possible. + scoped_array<char> sbuf; + char *buf; + if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); + else sbuf.reset(buf = new char[len]); + checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); + if (stop_time != nullptr) + *stop_time = current_time_millis(); + check(msg.ParseFromArray(buf, len)); + + return len; +} + +/** + * Same as the above readmsg() but uses an st_reader instead of a raw + * st_netfd_t. + */ +void +readmsg(st_reader &src, Message &msg) +{ + managed_array<char> a = src.read(sizeof(uint32_t)); + uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); + check(msg.ParseFromArray(src.read(len), len)); +} + +void +readmsg(anchored_stream_reader &src, Message &msg) +{ + uint32_t len = ntohl(src.read<uint32_t>()); + check(msg.ParseFromArray(checkpass(src.read(len)), len)); +} + +void +readmsg(istream &src, Message &msg) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); +#if 0 + IstreamInputStream iis(&src); + LimitingInputStream lis(&iis, len); + check(msg.ParseFromZeroCopyStream(&lis)); +#else + char buf[len]; + src.read(buf, len); + check(msg.ParseFromArray(buf, len)); +#endif +} + +inline uint32_t +readlen(istream &src) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); + ASSERT(len < 10000); + return len; +} + +#endif Modified: ydb/trunk/src/ydb.lzz.clamp =================================================================== --- ydb/trunk/src/ydb.lzz.clamp 2009-03-24 06:10:43 UTC (rev 1329) +++ ydb/trunk/src/ydb.lzz.clamp 2009-03-24 06:10:53 UTC (rev 1330) @@ -6,7 +6,6 @@ #include <string> #include <iostream> #include <st.h> -#include <commons/st/st.h> #include "tpcc/clock.h" #include "tpcc/randomgenerator.h" #include "tpcc/tpccclient.h" @@ -26,54 +25,23 @@ #include "unsetprefs.h" #include <csignal> // sigaction, etc. #include <cstring> // strsignal, size_t +#include <boost/archive/binary_iarchive.hpp> #include <boost/program_options.hpp> #include <gtest/gtest.h> #include <malloc.h> #include <string> +#include <commons/st/io.h> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> #include "setprefs.h" #end using namespace google; using namespace testing; +using namespace boost::archive; -// -// Utilities/system -// - +namespace { /** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (std::exception &ex) { - cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. Not actually - * used anywhere. - */ -st_thread_t -my_spawn(const function0<void> &f, string name, bool intr = false) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - threadnames[t] = name; - return t; -} - -/** * Memory monitor. */ void @@ -127,6 +95,7 @@ } } } +} // // Main @@ -361,13 +330,13 @@ if (use_pb_res) { run_leader<pb_traits, pb_traits>(minreps, leader_port); } else { - run_leader<pb_traits, rb_traits>(minreps, leader_port); + //run_leader<pb_traits, rb_traits>(minreps, leader_port); } } else { if (use_pb_res) { - run_leader<rb_traits, pb_traits>(minreps, leader_port); + //run_leader<rb_traits, pb_traits>(minreps, leader_port); } else { - run_leader<rb_traits, rb_traits>(minreps, leader_port); + //run_leader<rb_traits, rb_traits>(minreps, leader_port); } } } else { @@ -375,13 +344,13 @@ if (use_pb_res) { run_replica<pb_traits, pb_traits>(leader_host, leader_port, listen_port); } else { - run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); + //run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); } } else { if (use_pb_res) { - run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); + //run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); } else { - run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); + //run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); } } } @@ -394,6 +363,17 @@ } } +#if 0 +template<typename Types, typename RTypes> +void +run_leader(int minreps, uint16_t leader_port); +template<typename Types, typename RTypes> +void +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port); +#endif + +#if 1 +namespace { /** * Run the leader. */ @@ -415,6 +395,7 @@ vector<replica_info> replicas; st_closing_all_infos close_replicas(replicas); cout << "waiting for at least " << minreps << " replicas to join" << endl; + Join join; for (int i = 0; i < minreps; ++i) { st_netfd_t fd; { @@ -422,7 +403,7 @@ fd = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join = readmsg<Join>(fd); + readmsg(fd, join); replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); } cout << "got all " << minreps << " replicas" << endl; @@ -487,7 +468,8 @@ else throw break_exception(); } - Join join = readmsg<Join>(joiner); + Join join; + readmsg(joiner, join); replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); @@ -1072,3 +1054,5 @@ stop_hub.insert(st_thread_self()); } +} +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |