[Assorted-commits] SF.net SVN: assorted:[1313] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-20 07:58:33
|
Revision: 1313 http://assorted.svn.sourceforge.net/assorted/?rev=1313&view=rev Author: yangzhang Date: 2009-03-20 07:58:28 +0000 (Fri, 20 Mar 2009) Log Message: ----------- - broke up the program into multiple modules - using new custom clamp - ser.h -> msg.h - still some renaming to do Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/ser.cc Added Paths: ----------- ydb/trunk/src/main2.lzz.clamp ydb/trunk/src/msg.h ydb/trunk/src/setprefs.h ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/unsetprefs.h ydb/trunk/src/util.lzz Removed Paths: ------------- ydb/trunk/src/ser.h Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-20 05:35:47 UTC (rev 1312) +++ ydb/trunk/src/Makefile 2009-03-20 07:58:28 UTC (rev 1313) @@ -38,6 +38,7 @@ endif # CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) CXX := $(WTF) ccache $(CXX) -pipe +LD := $(CXX) LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ @@ -93,13 +94,15 @@ all: $(TARGET) -$(TARGET): $(OBJS) - $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) -o $@ - %.pb.o: %.pb.cc %.pb.h $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< -main.o: main.cc $(PBHDRS) +stxn.o: main.hh $(PBHDRS) +main.o: util.hh msg.h $(PBHDRS) +util.o: msg.h $(PBHDRS) +main2.o: main.hh stxn.hh tpcc.hh $(PBHDRS) +tpcc.o: main.hh util.hh $(PBHDRS) +ydb: main.o main2.o util.o tpcc.o stxn.o tpcc/%.o: tpcc/%.cc make -C tpcc/ @@ -109,12 +112,8 @@ .SECONDARY: tpcc/tpcctables.cc tpcc/tpcctables.o -%.o: %.cc - $(COMPILE.cc) $(OUTPUT_OPTION) $< - %.cc %.hh: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< - python -c 'pars = file("lambda_impl.clamp_h").read().split("\n\n"); hh = file("main.hh").read(); print >> file("main.cc", "a"), pars[-1]; print >> file("main.hh", "w"), "\n\n".join(pars[:-1] + [hh])' %.pb.cc: %.proto protoc --cpp_out=. $< @@ -124,11 +123,12 @@ %.lzz: %.lzz.clamp rm -f $@ - clamp < $< | sed '1d' > $@ + mkdir -p clamp/ + clamp --outdir clamp/ --prefix $(basename $@) < $< | \ + sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ + sed "$$( echo -e '$$i\\\n\#src\n$$a\\\n\#end' )" > $@ chmod -w $@ -main.o: ser.h - all.h: fgrep '#include' main.lzz.clamp > all.h @@ -136,7 +136,9 @@ $(COMPILE.cc) $(PBHDRS) $(OUTPUT_OPTION) $< clean: - rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h + rm -rf clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) \ + main.lzz main2.lzz main.cc main.hh main2.hh main2.cc \ + util.cc util.hh tpcc.lzz tpcc.hh tpcc.cc make -C tpcc/ clean distclean: clean @@ -159,5 +161,5 @@ p2: p2.cc $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) -ser: ser.cc ser.h ydb.pb.o +ser: ser.cc msg.h ydb.pb.o $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) $(OUTPUT_OPTION) Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-20 05:35:47 UTC (rev 1312) +++ ydb/trunk/src/main.lzz.clamp 2009-03-20 07:58:28 UTC (rev 1313) @@ -1,103 +1,52 @@ #hdr -#define __STDC_FORMAT_MACROS +#include "unsetprefs.h" #include <boost/archive/binary_iarchive.hpp> #include <boost/archive/binary_oarchive.hpp> #include <boost/bind.hpp> #include <boost/foreach.hpp> -#include <boost/program_options.hpp> -#include <boost/range/iterator_range.hpp> -#include <boost/scoped_array.hpp> -#include <boost/shared_ptr.hpp> +//#include <boost/range/iterator_range.hpp> +//#include <boost/shared_ptr.hpp> #include <boost/tuple/tuple.hpp> #include <commons/assert.h> -#include <commons/memory.h> #include <commons/nullptr.h> -#include <commons/rand.h> -#include <commons/snap_map.h> #include <commons/st/st.h> #include <commons/time.h> -#include <commons/unique_ptr.h> -#include <csignal> // sigaction etc. -#include <cstdio> -#include <cstring> // strsignal #include <fstream> // ofstream -#include <google/dense_hash_map> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include <gtest/gtest.h> -#include <inttypes.h> // PRId64 #include <iostream> -#include <malloc.h> -#include <map> -#include <netinet/in.h> // in_addr etc. -#include <set> -#include <sys/socket.h> // getpeername -#include <sys/types.h> // ssize_t -#include <tr1/unordered_map> -#include <unistd.h> // pipe, write, sync #include <vector> -#include "ser.h" -#include "tpcc/clock.h" -#include "tpcc/randomgenerator.h" -#include "tpcc/tpccclient.h" -#include "tpcc/tpccgenerator.h" -#include "tpcc/tpcctables.h" -#include "ydb.pb.h" +#include "msg.h" +#include "util.hh" +#include "setprefs.h" -#define function boost::function -#define foreach BOOST_FOREACH -#define shared_ptr boost::shared_ptr -#define ref boost::ref -#define tuple boost::tuple -#define make_tuple boost::make_tuple - using namespace boost; using namespace boost::archive; using namespace commons; -using namespace google; -using namespace google::protobuf::io; using namespace std; -using namespace std::tr1; -using namespace testing; using namespace ydb; -using namespace ydb::pb; using namespace ydb::msg; #end -//#define map_t unordered_map -//#define map_t map -//#define map_t dense_hash_map -#define map_t snap_map -typedef map_t<int, int> mii; -typedef mii::value_type entry; +#src +#include <unistd.h> // pipe, write, sync +#end typedef tuple<sized_array<char>, char*, char*> chunk; -//typedef unique_ptr<Recovery> recovery_t; typedef commons::array<char> recovery_t; -template<typename T> void init_map(T &map) {} -template<> void init_map(dense_hash_map<int, int> &map) { - map.set_empty_key(-1); - map.set_deleted_key(-2); -} -template<> void init_map(snap_map<int, int> &map) { - map.set_empty_key(-1); - map.set_deleted_key(-2); -} - // Configuration. st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, stop_on_seqno, batch_size, handle_responses_display, fail_seqno, catch_up_display, issue_display, nwarehouses, process_display; -size_t accept_joiner_size, buf_size, read_buf_size; +size_t accept_joiner_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, - count_updates, stop_on_recovery, general_txns, profile_threads, - debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, + count_updates, stop_on_recovery, general_txns, + disk, debug_memory, use_pwal, use_twal, use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, - suppress_txn_msgs, fake_bcast, force_ser, fake_exec, ship_log; -long long timelim, read_thresh, write_thresh; + suppress_txn_msgs, force_ser, fake_exec, ship_log; +long long timelim, read_thresh; // Control. st_intr_bool stop_hub, kill_hub; @@ -112,147 +61,6 @@ int updates; /** - * Convenience function for calculating percentages. - */ -template<typename T> -inline double pct(T sub, T tot) -{ - return 100 * double(sub) / double(tot); -} - -/** - * Convenience class for performing long-jumping break. - */ -class break_exception : public std::exception {}; - -/** - * The list of all threads. Keep track of these so that we may cleanly shut - * down all threads. - */ -set<st_thread_t> threads; - -/** - * RAII for adding/removing the current thread from the global threads set. - */ -class thread_eraser -{ - public: - thread_eraser() { threads.insert(st_thread_self()); } - ~thread_eraser() { threads.erase(st_thread_self()); } -}; - -/** - * For debug/error-printing purposes. - */ -map<st_thread_t, string> threadnames; -st_thread_t last_thread; - -/** - * For profiling. - */ -map<st_thread_t, long long> threadtimes; -long long thread_start_time; - -/** - * Look up thread name, or just show thread ID. - */ -inline string -threadname(st_thread_t t = st_thread_self()) { - if (threadnames.find(t) != threadnames.end()) { - return threadnames[t]; - } else { - return lexical_cast<string>(t); - } -} - -/** - * Debug function for thread names. Remember what we're switching from. - */ -inline void -switch_out_cb() -{ - if (debug_threads) last_thread = st_thread_self(); - if (profile_threads) - threadtimes[st_thread_self()] += current_time_millis() - thread_start_time; -} - -/** - * Debug function for thread names. Show what we're switching from/to. - */ -inline void switch_in_cb() -{ - if (debug_threads && last_thread != st_thread_self()) { - cout << "switching"; - if (last_thread != 0) cout << " from " << threadname(last_thread); - cout << " to " << threadname() << endl; - } - if (profile_threads) - thread_start_time = current_time_millis(); -} - -/** - * Print to cerr a thread exception. - */ -ostream& -cerr_thread_ex(const std::exception &ex) -{ - return cerr << "exception in thread " << threadname() - << ": " << ex.what(); -} - -/** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (std::exception &ex) { - cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. Not actually - * used anywhere. - */ -st_thread_t -my_spawn(const function0<void> &f, string name, bool intr = false) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - threadnames[t] = name; - return t; -} - -char * -show_sockaddr(st_netfd_t fd) -{ - sockaddr_in sa; - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd), - reinterpret_cast<sockaddr*>(&sa), - &salen)); - return inet_ntoa(sa.sin_addr); -} - -map<st_netfd_t, string> nfdnames; - -inline const string& -nfd2name(st_netfd_t fd) -{ - return nfdnames[fd]; -} - -/** * Used by the leader to bookkeep information about replicas. */ class replica_info @@ -309,95 +117,10 @@ const vector<st_netfd_t> &rs_; }; +#if 0 st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; /** - * Adapter for arrays to look like strings (for PB serialization). - */ -class ser_array -{ - commons::array<char> a_; - size_t size_; -public: - ser_array(size_t size = buf_size) : a_(size), size_(0) {} - char *data() const { return a_.get(); } - size_t size() const { return size_; } - void clear() { size_ = 0; } - void stretch(size_t size) { - if (size > a_.size()) - a_.reset(new char[size], size); - size_ = size; - } -}; - -//typedef string ser_t; -typedef ser_array ser_t; - -template<typename T> -void -ser(writer &w, const T &msg) -{ - uint32_t len = msg.ByteSize(); - w.mark(); - w.reserve(len); - check(msg.SerializeToArray(w.cur(), len)); - w.skip(len); -} - -/** - * Serialization. - * - * TODO: experiment with which method is the fastest: using a string as shown - * here or computing the bytesize then allocating (or grabbing/reserving) the - * array. - */ -template<typename T> -void -ser(string &s, const T &msg) -{ - // Serialize message to a buffer. - uint32_t len; - s.append(sizeof len, '\0'); - check(msg.AppendToString(&s)); - - // Warn if the message is large. - if (s.size() > 1000000) - cout << "serializing large message of " << s.size() << " bytes" << endl; - - // Prefix the message with a four-byte length. - len = htonl(static_cast<uint32_t>(s.size() - sizeof len)); - char *plen = reinterpret_cast<char*>(&len); - copy(plen, plen + sizeof len, s.begin()); -} - -template<typename T> -inline void -ser(ser_array &s, const T &msg) -{ - int len = msg.ByteSize(); - - // Grow the array as needed. - s.stretch(len + sizeof(uint32_t)); - - // Serialize message to a buffer with four-byte length prefix. - check(msg.SerializeToArray(s.data() + sizeof(uint32_t), len)); - *reinterpret_cast<uint32_t*>(s.data()) = htonl(uint32_t(len)); -} - -/** - * Serialization. - */ -template<typename T> -inline void -ser(ostream &s, const T &msg) -{ - uint32_t len = htonl(uint32_t(msg.ByteSize())); - s.write(reinterpret_cast<const char*>(&len), sizeof len); - check(msg.SerializeToOstream(&s)); -} - -#if 0 -/** * The worker that performs the actual broadcasting. */ void @@ -431,193 +154,6 @@ } #endif -/** - * Perform an st_write but warn if it took over write_thresh ms. - */ -void -st_timed_write(st_netfd_t dst, const void *buf, size_t len) -{ - long long before_write = -1; - if (write_thresh > 0) { - before_write = current_time_millis(); - } - - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - - if (write_thresh > 0) { - long long write_time = current_time_millis() - before_write; - if (write_time > write_thresh) { - cout << "thread " << threadname() << " write of " << len - << " bytes to dst " << show_sockaddr(dst) << " blocked for " - << write_time << " ms" << endl; - } - } -} - -/** - * Send a message to some destinations. - */ -inline void -bcastbuf(const vector<st_netfd_t> &dsts, const ser_t &msg) -{ - if (!fake_bcast) { - foreach (st_netfd_t dst, dsts) { - st_timed_write(dst, msg.data(), msg.size()); - } - } -} - -/** - * Send a message to some destinations, using whichever method of network IO - * was chosen (sync or async). - */ -template<typename T> -inline void -bcastmsg(const vector<st_netfd_t> &dsts, const T &msg) -{ - ser_t s; - ser(s, msg); - bcastbuf(dsts, s); -} - -/** - * Send a message to a single recipient. - */ -inline void -sendbuf(st_netfd_t dst, const ser_t &msg) -{ - if (!fake_bcast) - st_timed_write(dst, msg.data(), msg.size()); -} - -/** - * Send a message to a single recipient. - */ -template<typename T> -inline void -sendmsg(st_netfd_t dst, const T &msg) -{ - ser_t s; - ser(s, msg); - sendbuf(dst, s); -} - -/** - * Read a message. This is done in two steps: first by reading the length - * prefix, then by reading the actual body. This function also provides a way - * to measure how much time is spent actually reading the message from the - * network. Such measurement only makes sense for large messages which take a - * long time to receive. - * - * \param[in] src The socket from which to read. - * - * \param[in] msg The protobuf to read into. - * - * \param[out] start_time If not null, record the time at which we start to - * receive the message (after the length is received). - * - * \param[out] stop_time If not null, record the time at which we finish - * receiving the message (before we deserialize the protobuf). - * - * \param[out] len If not null, record the size of the serialized message - * in bytes. - * - * \param[in] timeout on each of the two read operations (first one is on - * length, second one is on the rest). - * - * \return The length of the serialized message. - */ -template <typename T> -size_t -readmsg(st_netfd_t src, T & msg, long long *start_time = nullptr, long long - *stop_time = nullptr, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - // Read the message length. - uint32_t len; - checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - timeout), - static_cast<ssize_t>(sizeof len)); - if (start_time != nullptr) - *start_time = current_time_millis(); - len = ntohl(len); - - // Parse the message body. Try stack-allocation if possible. - scoped_array<char> sbuf; - char *buf; - if (len <= 4096) buf = reinterpret_cast<char*>(alloca(len)); - else sbuf.reset(buf = new char[len]); - checkeqnneg(st_read_fully(src, buf, len, timeout), int(len)); - if (stop_time != nullptr) - *stop_time = current_time_millis(); - check(msg.ParseFromArray(buf, len)); - - return len; -} - -/** - * Same as the above readmsg(), but returns an internally constructed message. - * This is a "higher-level" readmsg() that relies on return-value optimization - * for avoiding unnecessary copies. - */ -template <typename T> -inline T -readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - T msg; - readmsg(src, msg, nullptr, nullptr, timeout); - return msg; -} - -/** - * Same as the above readmsg() but uses an st_reader instead of a raw - * st_netfd_t. - */ -template <typename T> -inline void -readmsg(st_reader &src, T & msg) -{ - managed_array<char> a = src.read(sizeof(uint32_t)); - uint32_t len = ntohl(*reinterpret_cast<const uint32_t*>(a.get())); - check(msg.ParseFromArray(src.read(len), len)); -} - -template<typename T> -inline void -readmsg(anchored_stream_reader &src, T &msg) -{ - uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(checkpass(src.read(len)), len)); -} - -template<typename T> -inline void -readmsg(istream &src, T &msg) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); -#if 0 - IstreamInputStream iis(&src); - LimitingInputStream lis(&iis, len); - check(msg.ParseFromZeroCopyStream(&lis)); -#else - char buf[len]; - src.read(buf, len); - check(msg.ParseFromArray(buf, len)); -#endif -} - -inline uint32_t -readlen(istream &src) -{ - uint32_t len; - src.read(reinterpret_cast<char*>(&len), sizeof len); - len = ntohl(len); - ASSERT(len < 10000); - return len; -} - enum { op_del, op_write, op_commit }; /** @@ -665,971 +201,10 @@ }; // Globals -mii g_map; wal *g_wal; txn_wal *g_twal; //tpcc_wal *g_tpcc_wal; -/** - * Keep issuing transactions to the replicas. - */ -template<typename Types> -void -issue_txns(st_channel<replica_info> &newreps, int &seqno, - st_bool &accept_joiner) -{ - typedef typename Types::TxnBatch TxnBatch; - typedef typename Types::Txn Txn; - typedef typename Types::Op Op; - - Op_OpType types[] = {Op::read, Op::write, Op::del}; - vector<st_netfd_t> fds; - long long start_time = current_time_millis(); - - finally f(lambda () { - showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), - 0); - }); - - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - reader r(nullptr, rbuf.get(), rbuf.size()); - function<void(const void*, size_t)> fn; - if (use_twal) - fn = bind(&txn_wal::logbuf, g_twal, _1, _2); - else - fn = lambda(const void *buf, size_t len) { - foreach (st_netfd_t dst, __ref(fds)) - st_timed_write(dst, buf, len); - }; - - char *real_wbuf = newreps.empty() ? rbuf.get() : wbuf.get(); - size_t real_wbuf_size = newreps.empty() ? rbuf.size() : wbuf.size(); - writer w(fn, real_wbuf, real_wbuf_size); - stream s(r,w); - scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); - TxnBatch batch = *pbatch; - if (Types::is_pb()) - for (int t = 0; t < batch_size; ++t) - batch.add_txn(); - - ser_t serbuf; - while (!stop_hub) { - w.mark(); - batch.Clear(); - - // Did we get a new member? If so, notify an arbitrary member (the first - // one) to prepare to send recovery information (by sending an - // empty/default Txn). - // XXX rec_pwal - if (!newreps.empty() && seqno > 0 && !rec_pwal) { - start_txn(batch); - fin_txn(batch); - // TODO: verify that this made the catch-up stream more efficient, - // starting it only at the point necessary - w.mark_and_flush(); - if (Types::is_pb()) { - if (multirecover) bcastmsg(fds, batch); - else sendmsg(fds[0], batch); - } - batch.Clear(); - } - // Bring in any new members. - // TODO more efficient: copy/extend/append - while (!newreps.empty()) { - fds.push_back(newreps.take().fd()); - } - - // Generate some random transactions. - start_txn(batch); - for (int t = 0; t < batch_size && !stop_hub; ++t) { - char *txn_start = w.cur(); - Txn &txn = *batch.add_txn(); - txn.set_seqno(seqno); - int count = randint(min_ops, max_ops + 1); - start_op(txn); - for (int o = 0; o < count; ++o) { - Op *op = txn.add_op(); - int rtype = general_txns ? randint(3) : 1, - rkey = randint(), - rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } - fin_op(txn); - - // Process immediately if not bcasting. - if (fds.empty()) { - --seqno; - r.reset_range(txn_start, w.cur()); - if (!Types::is_pb()) txn.Clear(); - process_txn<Types, pb_traits>(g_map, txn, seqno, nullptr); - } - - // Checkpoint. - if (check_interval(seqno, yield_interval)) st_sleep(0); - if (check_interval(seqno, issue_display)) { - cout << "issued txn " << seqno << endl; - if (timelim > 0 && current_time_millis() - start_time > timelim) { - cout << "time's up; issued " << seqno << " txns in " << timelim - << " ms" << endl; - stop_hub.set(); - } - } - - // For debugging purposes. - if (issuing_interval > 0) { - st_sleep(issuing_interval); - } - - // Are we to accept a new joiner? - if (seqno == accept_joiner_seqno) { - accept_joiner.set(); - } - - // Set the stopping seqno. - if (seqno == stop_on_seqno) { - cout << "stopping on issue of seqno " << seqno << endl; - stop_hub.set(); - } - - ++seqno; - } - fin_txn(batch); - - bool do_bcast = !fds.empty() && !suppress_txn_msgs; - if (Types::is_pb()) { - // Broadcast/log/serialize. - if (force_ser || do_bcast || use_twal) { - serbuf.clear(); - ser(serbuf, batch); - if (do_bcast) bcastbuf(fds, serbuf); - if (use_twal) g_twal->logbuf(serbuf); - } - } else { - // Reset if we have nobody to send to (incl. disk) or if we actually have - // no txns (possible due to loop structure; want to avoid to avoid - // confusing with the 0-txn message signifying "prepare a recovery msg"). - if (!do_bcast && !use_twal) { - w.reset(); - } - } - - // Pause? - if (do_pause) - do_pause.waitreset(); - } - - // This means "The End." - if (!fds.empty()) { - w.mark(); - batch.Clear(); - start_txn(batch); - Txn &txn = *batch.add_txn(); - txn.set_seqno(-1); - start_op(txn); - fin_op(txn); - fin_txn(batch); - if (Types::is_pb()) bcastmsg(fds, batch); - w.mark_and_flush(); - } -} - -#if 0 -template<typename Types, typename RTypes> -void -process_txn_ext(mii &map, const typename Types::Txn &txn, int &seqno, - typename RTypes::Response *res, ext_map ext) -{ - response -} -#endif - -/** - * Process a transaction: update DB state (incl. seqno) and send response to - * leader. - */ -template<typename Types, typename RTypes> -void -process_txn(mii &map, const typename Types::Txn &txn, int &seqno, - typename RTypes::Response *res) -{ - typedef typename Types::Txn Txn; - typedef typename Types::Op Op; - checkeq(txn.seqno(), seqno + 1); - seqno = txn.seqno(); - if (res != nullptr) { - res->set_seqno(seqno); - res->set_caught_up(true); - start_result(*res); - } - if (!fake_exec) { - for (int o = 0; o < txn.op_size(); ++o) { - const Op &op = txn.op(o); - const char type = op.type(); - const int key = op.key(); - mii::iterator it = map.find(key); - if (show_updates || count_updates) { - if (it != map.end()) { - if (show_updates) cout << "existing key: " << key << endl; - if (count_updates) ++updates; - } - } - switch (type) { - case Op::read: - if (res != nullptr) { - if (it == map.end()) res->add_result(0); - else res->add_result(it->second); - } - break; - case Op::write: - { - int value = op.value(); - if (use_pwal) g_wal->logwrite(key, value); - if (it == map.end()) map[key] = value; - else it->second = value; - break; - } - case Op::del: - if (it != map.end()) { - if (use_pwal) g_wal->logdel(key); - map.erase(it); - } - break; - } - } - } - if (res != nullptr) - fin_result(*res); - if (use_pwal) g_wal->logcommit(); -} - -void -showdatarate(const char *action, streamoff len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showdatarate(const char *action, size_t len, long long time) -{ - cout << action << " of " << len << " bytes in " << time << " ms (" - << double(len) / double(time) / 1000 << " MB/s)" << endl; -} - -void -showtput(const char *action, long long stop_time, long long start_time, - int stop_count, int start_count) -{ - long long time_diff = stop_time - start_time; - int count_diff = stop_count - start_count; - double rate = double(count_diff) * 1000. / double(time_diff); - cout << action << " " << count_diff << " txns [" - << start_count << ".." << stop_count - << "] in " << time_diff << " ms [" - << start_time << ".." << stop_time - << "] (" - << rate << " tps)" << endl; -} - -/** - * Return range * part / nparts, but with proper casting. Assumes that part < - * nparts. - */ -inline int -interp(int range, int part, int nparts) { - return static_cast<int>(static_cast<long long>(range) * part / nparts); -} - -#src -TEST(interp_test, basics) { - EXPECT_EQ(0, interp(3, 0, 3)); - EXPECT_EQ(1, interp(3, 1, 3)); - EXPECT_EQ(2, interp(3, 2, 3)); - EXPECT_EQ(3, interp(3, 3, 3)); - - EXPECT_EQ(0, interp(RAND_MAX, 0, 2)); - EXPECT_EQ(RAND_MAX / 2, interp(RAND_MAX, 1, 2)); - EXPECT_EQ(RAND_MAX, interp(RAND_MAX, 2, 2)); -} -#end - -unique_ptr<TPCCTables> g_tables; - -void -mkres(TpccRes *res, const OrderStatusOutput &output) -{ - OrderStatusOutputMsg &msg = *res->mutable_order_status(); - msg.set_c_id(output.c_id); - msg.set_c_balance(output.c_balance); - msg.set_o_id(output.o_id); - msg.set_o_carrier_id(output.o_carrier_id); - foreach (const OrderStatusOutput::OrderLineSubset &src, output.lines) { - OrderLineSubsetMsg &dst = *msg.add_line(); - dst.set_ol_i_id(src.ol_i_id); - dst.set_ol_supply_w_id(src.ol_supply_w_id); - dst.set_ol_quantity(src.ol_quantity); - dst.set_ol_amount(src.ol_amount); - dst.set_ol_delivery_d(src.ol_delivery_d); - } - msg.set_c_first(output.c_first); - msg.set_c_middle(output.c_middle); - msg.set_c_last(output.c_last); - msg.set_o_entry_d(output.o_entry_d); -} - -void -mkres(TpccRes *res, const PaymentOutput &output) -{ - PaymentOutputMsg &msg = *res->mutable_payment(); - - WarehouseMsg &w = *msg.mutable_warehouse(); - w.set_w_id(output.warehouse.w_id); - w.set_w_tax(output.warehouse.w_tax); - w.set_w_ytd(output.warehouse.w_ytd); - w.set_w_name(output.warehouse.w_name); - w.set_w_street_1(output.warehouse.w_street_1); - w.set_w_street_2(output.warehouse.w_street_2); - w.set_w_city(output.warehouse.w_city); - w.set_w_state(output.warehouse.w_state); - w.set_w_zip(output.warehouse.w_zip); - - DistrictMsg &d = *msg.mutable_district(); - d.set_d_id(output.district.d_id); - d.set_d_w_id(output.district.d_w_id); - d.set_d_tax(output.district.d_tax); - d.set_d_ytd(output.district.d_ytd); - d.set_d_next_o_id(output.district.d_next_o_id); - d.set_d_name(output.district.d_name); - d.set_d_street_1(output.district.d_street_1); - d.set_d_street_2(output.district.d_street_2); - d.set_d_city(output.district.d_city); - d.set_d_state(output.district.d_state); - d.set_d_zip(output.district.d_zip); - - CustomerMsg &c = *msg.mutable_customer(); - c.set_c_id(output.customer.c_id); - c.set_c_d_id(output.customer.c_d_id); - c.set_c_w_id(output.customer.c_w_id); - c.set_c_credit_lim(output.customer.c_credit_lim); - c.set_c_discount(output.customer.c_discount); - c.set_c_balance(output.customer.c_balance); - c.set_c_ytd_payment(output.customer.c_ytd_payment); - c.set_c_payment_cnt(output.customer.c_payment_cnt); - c.set_c_delivery_cnt(output.customer.c_delivery_cnt); - c.set_c_first(output.customer.c_first); - c.set_c_middle(output.customer.c_middle); - c.set_c_last(output.customer.c_last); - c.set_c_street_1(output.customer.c_street_1); - c.set_c_street_2(output.customer.c_street_2); - c.set_c_city(output.customer.c_city); - c.set_c_state(output.customer.c_state); - c.set_c_zip(output.customer.c_zip); - c.set_c_phone(output.customer.c_phone); - c.set_c_since(output.customer.c_since); - c.set_c_credit(output.customer.c_credit); - c.set_c_data(output.customer.c_data); -} - -void -process_tpcc(const TpccReq &req, int &seqno, TpccRes *res) -{ - checkeq(req.seqno(), seqno + 1); - ++seqno; - if (res != nullptr) { - res->Clear(); - res->set_seqno(seqno); - } - // First three are read-only txns, so doesn't make sense to exec if no res to - // put results. They constitute only 8% of the workload. - if (req.has_stock_level()) { - if (res != nullptr) { - const StockLevelMsg &sl = req.stock_level(); - int result = g_tables->stockLevel(sl.warehouse_id(), sl.district_id(), sl.threshold()); - StockLevelOutputMsg &msg = *res->mutable_stock_level(); - msg.set_result(result); - } - } else if (req.has_order_status_1()) { - if (res != nullptr) { - const OrderStatusMsg1 &os = req.order_status_1(); - OrderStatusOutput output; - g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.customer_id(), &output); - mkres(res, output); - } - } else if (req.has_order_status_2()) { - if (res != nullptr) { - const OrderStatusMsg2 &os = req.order_status_2(); - OrderStatusOutput output; - g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.c_last().c_str(), &output); - mkres(res, output); - } - } else if (req.has_new_order()) { - const NewOrderMsg &no = req.new_order(); - vector<NewOrderItem> items(no.item_size()); - for (int i = 0; i < no.item_size(); ++i) { - NewOrderItem &dst = items[i]; - const NewOrderItemMsg &src = no.item(i); - dst.i_id = src.i_id(); - dst.ol_supply_w_id = src.ol_supply_w_id(); - dst.ol_quantity = src.ol_quantity(); - } - NewOrderOutput output; - g_tables->newOrder(no.warehouse_id(), no.district_id(), - no.customer_id(), items, no.now().c_str(), - &output); - if (res != nullptr) { - NewOrderOutputMsg &msg = *res->mutable_new_order(); - msg.set_w_tax(output.w_tax); - msg.set_d_tax(output.d_tax); - msg.set_o_id(output.o_id); - msg.set_c_discount(output.c_discount); - msg.set_total(output.total); - foreach (const NewOrderOutput::ItemInfo &src, output.items) { - ItemInfoMsg &dst = *msg.add_item(); - dst.set_s_quantity(src.s_quantity); - dst.set_i_price(src.i_price); - dst.set_ol_amount(src.ol_amount); - dst.set_brand_generic(src.brand_generic); - dst.set_i_name(src.i_name); - } - msg.set_c_last(output.c_last); - msg.set_c_credit(output.c_credit); - msg.set_status(output.status); - } - } else if (req.has_payment_1()) { - const PaymentMsg1 &p = req.payment_1(); - PaymentOutput output; - g_tables->payment(p.warehouse_id(), p.district_id(), p.c_warehouse_id(), - p.c_district_id(), p.customer_id(), p.h_amount(), - p.now().c_str(), &output); - if (res != nullptr) mkres(res, output); - } else if (req.has_payment_2()) { - const PaymentMsg2 &p = req.payment_2(); - PaymentOutput output; - g_tables->payment(p.warehouse_id(), p.district_id(), p.c_warehouse_id(), - p.c_district_id(), p.c_last().c_str(), p.h_amount(), - p.now().c_str(), &output); - if (res != nullptr) mkres(res, output); - } else if (req.has_delivery()) { - const DeliveryMsg &d = req.delivery(); - vector<DeliveryOrderInfo> orders; - g_tables->delivery(d.warehouse_id(), d.carrier_id(), d.now().c_str(), &orders); - if (res != nullptr) { - DeliveryOutputMsg &msg = *res->mutable_delivery(); - foreach (const DeliveryOrderInfo &src, orders) { - DeliveryOrderInfoMsg &dst = *msg.add_order(); - dst.set_d_id(src.d_id); - dst.set_o_id(src.o_id); - } - } - } else { - ASSERT(false); - } -} - -void -process_tpccs(st_netfd_t leader, int &seqno, - st_channel<recovery_t> &send_states, - st_channel<chunk> &backlog, int init_seqno, - int mypos, int nnodes) -{ - bool caught_up = init_seqno == 0; - // Means we're currently ignoring the incoming txns until we see a fail-ack - // from the leader. - bool depleting = false; - long long start_time = current_time_millis(), - time_failed = -1, - time_caught_up = caught_up ? start_time : -1; - int seqno_caught_up = caught_up ? seqno : -1; - // Used by joiner only to tell where we actually started (init_seqno is just - // the seqno reported by the leader in the Init message, but it may have - // issued more since the Init message). - int first_seqno = -1; - char *marker = nullptr; - int first_seqno_in_chunk = -1; - TpccReq req; - TpccRes res; - txn_wal &wal = *g_twal; - - function<void(anchored_stream_reader& reader)> overflow_fn = - lambda(anchored_stream_reader &reader) { - if (__ref(caught_up)) { - // Anchor should already be correctly set, so just shift down. - shift_reader(reader); - } else if (__ref(first_seqno_in_chunk) == __ref(seqno) + 1) { - // Has the replayer just caught up to the start of the chunk? - ASSERT(reader.buf().get() == reader.anchor()); - // Replay all messages up to but not included the current unprocessed - // message (which we may be in the middle of receiving, triggering this - // overflow). - process_buf(reader.anchor(), __ref(marker), __ref(req), __ref(seqno)); - // Update the anchor to point to the unprocessed message, so that we - // shift the unprocessed message down. - reader.anchor() = __ref(marker); - shift_reader(reader); - } else { - // Push onto backlog and put in new buffer. - ASSERT(reader.buf().get() == reader.anchor()); - __ref(backlog).push(make_tuple(reader.buf(), reader.anchor(), __ref(marker))); - reader.anchor() = __ref(marker); - replace_reader(reader); - cout << "added to backlog, now has " << __ref(backlog).queue().size() - << " chunks" << endl; - } - __ref(marker) = reader.buf().get(); - }; - - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - commons::array<char> wbuf(buf_size); - anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), - overflow_fn, rbuf.get(), rbuf.size()); - writer w(lambda(const void *buf, size_t len) { - st_write(__ref(leader), buf, len); - }, wbuf.get(), wbuf.size()); - - finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(recovery_t()); - __ref(w).mark_and_flush(); - }); - - function<void()> send_failure_msg = lambda() { - TpccRes &res = __ref(res); - writer &w = __ref(w); - res.Clear(); - res.set_seqno(-1); - ser(w, res); - w.mark_and_flush(); - }; - - while (true) - { - marker = reader.start(); - - { - st_intr intr(stop_hub); - readmsg(reader, req); - } - - if (req.seqno() == -1) { - // End of stream. - break; - } else if (req.seqno() == -2) { - // Prepare recovery msg. - send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); - } else { - - if (depleting) { - if (req.seqno() == -3) { - // Fail-ack. Should not be receiving anything until we resume. - failed.waitreset(); - send_failure_msg(); - // Note that we don't reset depleting; we want the next iteration to - // fall through to the next case in this if-else chain.... - - // Adjust reader so that the next xact (the first one after failure) - // will go to the start of the buffer; this is necessary for - // backlogging. - reader.set_anchor(); - shift_reader(reader); - } else if (!failed) { - // This is the first txn after resuming. Tell the recoverer task - // that this is the seqno to build up to (from another replica's - // log). - resume.push(req.seqno()); - depleting = false; - } - // Ignore all other messages. - } - - if (!depleting) { - if (req.seqno() == -3) { - // Ignore the fail-ack. - } else { - if (use_twal) wal.logbuf(marker, reader.start() - marker); - - // Backlog (auto/implicit) or process. - if (!caught_up) { - // If we were at the start of a new buffer (our chunk was recently reset). - if (reader.buf().get() == marker) - first_seqno_in_chunk = req.seqno(); - // If we fully caught up. - if (req.seqno() == seqno + 1) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_tpccs caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; - } - } - if (caught_up) { - // Process. - process_tpcc(req, seqno, &res); - ser(w, res); - reader.set_anchor(); - } - - // Display/yield. - if (check_interval(req.seqno(), process_display)) - cout << (caught_up ? "processed req " : "backlogged req ") - << req.seqno() << endl; - if (check_interval(req.seqno(), yield_interval)) st_sleep(0); - - // Die. - if (fail_seqno > 0 && req.seqno() == fail_seqno) { - cout << "process_tpccs failing on seqno " << fail_seqno; - time_failed = current_time_millis(); - showtput("; live-processed ", time_failed, start_time, seqno, 0); - ASSERT(init_seqno == 0); - caught_up = false; - depleting = true; - seqno = -1; - - failed.set(); - send_failure_msg(); - } - } - } - - } - } - -} - -void -process_buf(char *begin, char *end, TpccReq &req, int &seqno) -{ - ASSERT(begin < end); - raw_reader reader(begin); - while (reader.ptr() < end) { - uint32_t len = ntohl(reader.read<uint32_t>()); - ASSERT(len < 10000); - ASSERT(reinterpret_cast<char*>(reader.ptr()) + len <= end); - check(req.ParseFromArray(reader.readptr(len), len)); - process_tpcc(req, seqno, nullptr); - if (check_interval(req.seqno(), yield_interval)) st_sleep(0); - if (check_interval(req.seqno(), process_display)) { - cout << "caught up to req " << req.seqno() << endl; - } - } -} - -recovery_t -make_tpcc_recovery(int mypos, int nnodes, int seqno) -{ - long long start_time = current_time_millis(); - cout << "serializing recovery, db state is now at seqno " - << seqno << ":" << endl; - g_tables->show(); - recovery_t recovery = g_tables->ser(mypos, nnodes, seqno); - showdatarate("serialized recovery", recovery.size(), - current_time_millis() - start_time); - return recovery; -} - -/** - * Actually do the work of executing a transaction and sending back the reply. - * - * \param[in] leader The connection to the leader. - * - * \param[in] map The data store. - * - * \param[in] seqno The sequence number last seen. This always starts at 0, - * but may be bumped up by the recovery procedure. - * - * \param[in] send_states Channel of snapshots of the database state to send to - * recovering nodes (sent to recover_joiner). - * - * \param[in] backlog The backlog of txns that need to be processed. - * - * \param[in] init_seqno The seqno that was sent in the Init message from the - * leader. The first expected seqno. - * - * \param[in] mypos This host's position in the Init message list. Used for - * calculating the sub-range of the map for which this node is responsible. - * - * \param[in] nnodes The total number nodes in the Init message list. - * - * \param[in] wal The WAL. - */ -template<typename Types, typename RTypes> -void -process_txns(st_netfd_t leader, mii &map, int &seqno, - st_channel<recovery_t> &send_states, - /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ - st_channel<chunk> &backlog, int init_seqno, - int mypos, int nnodes) -{ - typedef typename Types::TxnBatch TxnBatch; - typedef typename Types::Txn Txn; - typedef typename Types::Op Op; - typedef typename RTypes::Response Response; - typedef typename RTypes::ResponseBatch ResponseBatch; - - bool caught_up = init_seqno == 0; - long long start_time = current_time_millis(), - time_caught_up = caught_up ? start_time : -1; - int seqno_caught_up = caught_up ? seqno : -1; - // Used by joiner only to tell where we actually started (init_seqno is just - // the seqno reported by the leader in the Init message, but it may have - // issued more since the Init message). - int first_seqno = -1; - - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - commons::array<char> wbuf(buf_size); - st_reader reader(leader, rbuf.get(), rbuf.size()); - writer w(lambda(const void *buf, size_t len) { - checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); - }, wbuf.get(), wbuf.size()); - stream s(reader, w); - - finally f(lambda () { - long long now = current_time_millis(); - showtput("processed", now, __ref(start_time), __ref(seqno), - __ref(init_seqno)); - if (!__ref(caught_up)) { - cout << "live-processing: never entered this phase (never caught up)" << endl; - } else { - showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), - __ref(seqno_caught_up)); - } - __ref(send_states).push(recovery_t()); - __ref(w).mark_and_flush(); - }); - - try { - scoped_ptr<TxnBatch> pbatch(new_TxnBatch<TxnBatch>(s)); - TxnBatch &batch = *pbatch; - scoped_ptr<ResponseBatch> presbatch(new_ResponseBatch<ResponseBatch>(s)); - ResponseBatch &resbatch = *presbatch; - ser_t serbuf; - char *first_start = reader.start(); - ASSERT(first_start == rbuf.get()); - const size_t headerlen = sizeof(uint32_t) + sizeof(short) + sizeof(int); - while (true) { - uint32_t prefix = 0; - char *start = reader.start(); - - // Will overflow on next few reads ("header")? - if (!caught_up && reader.unread() + reader.rem() < headerlen) { - sized_array<char> buf(new char[read_buf_size], read_buf_size); - memcpy(buf.get(), reader.start(), reader.unread()); - swap(buf, reader.buf()); - reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); - backlog.push(make_tuple(buf, first_start, start)); - first_start = reader.start(); - } - - if (Types::is_pb()) { - long long before_read = -1; - if (read_thresh > 0) { - before_read = current_time_millis(); - } - { - st_intr intr(stop_hub); - readmsg(reader, batch); - } - if (read_thresh > 0) { - long long read_time = current_time_millis() - before_read; - if (read_time > read_thresh) { - cout << "thread " << threadname() - << ": read took " << read_time << " ms" << endl; - } - } - } else { - st_intr intr(stop_hub); - prefix = ntohl(reader.read<uint32_t>()); - check(prefix < 10000); - batch.Clear(); - } - - if (batch.txn_size() > 0) { - const Txn &first_txn = batch.txn(0); - if (first_txn.seqno() < 0) { - break; - } else if (first_txn.seqno() > seqno + 1) { - // In backlogging mode? - - // Skip entire message, pushing it to the thread that's handling - // recovery for later processing once snapshot is received. - // TODO: implement and use anchors instead? - if (first_seqno == -1) - cout << "first seqno: " << (first_seqno = first_txn.seqno()) << endl; - - // Caught up? - if (first_seqno == seqno + 1) { - // Rewind so we process accumulated messages. - reader.reset_range(first_start, reader.end()); - continue; - } - - // About to overflow? - if (reader.unread() + reader.rem() < prefix + sizeof(uint32_t) - headerlen) { - // Move current partial message to new buffer. - sized_array<char> tmp(new char[read_buf_size], read_buf_size); - raw_writer ser(tmp.get()); - ser.write(prefix); - ser.write(short(batch.txn_size())); - ser.write(first_txn.seqno()); - memcpy(tmp.get() + headerlen, reader.start(), reader.unread()); - - // Swap the buffers. - swap(tmp, reader.buf()); - reader.reset_range(reader.buf().get() + headerlen, reader.buf().get() + headerlen + reader.unread()); - ASSERT(tmp.get() <= first_start && first_start < tmp.end()); - ASSERT(tmp.get() < start && start < tmp.end()); - ASSERT(first_start < start); - backlog.push(make_tuple(tmp, first_start, start)); - first_start = reader.buf().get(); - first_seqno = first_txn.seqno(); - } - - // Fill up rest of the message - ASSERT(reader.unread() + reader.rem() >= prefix + sizeof(uint32_t) - headerlen); - check0x(reader.accum(prefix + sizeof(uint32_t) - headerlen)); - } else { - // Regular transaction batch. - if (!caught_up) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; - } - w.mark(); - resbatch.Clear(); - start_res(resbatch); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = t == 0 ? first_txn : batch.txn(t); - Response *res = resbatch.add_res(); - process_txn<Types, RTypes>(map, txn, seqno, res); -#if 0 - if (!sending_recovery) { - process_txn<Types, RTypes>(map, txn, seqno, res); - } else { - process_txn_ext(map, txn, seqno, res, ext); - } -#endif - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } - - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "processed txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; - } - } - fin_res(resbatch); - if (RTypes::is_pb() && resbatch.res_size() > 0) { - serbuf.clear(); - ser(serbuf, resbatch); - sendbuf(leader, serbuf); - } - } - } else if (multirecover || mypos == 0) { - // Empty (default) TxnBatch means "generate a snapshot." - send_states.push(make_recovery(map, mypos, nnodes, seqno)); - } - } - } catch (break_exception &ex) { - } - -} - -#if 0 -template<typename mii> -unique_ptr<Recovery> -make_recovery(const mii &map, int mypos, int nnodes, int seqno) -{ - // TODO make this faster - cout << "generating recovery..." << endl; - unique_ptr<Recovery> recovery(new Recovery); - typedef ::map<int, int> mii_; - mii_ map_(map.begin(), map.end()); - mii_::const_iterator begin = - map_.lower_bound(multirecover ? interp(RAND_MAX, mypos, nnodes) : 0); - mii_::const_iterator end = multirecover && mypos < nnodes - 1 ? - map_.lower_bound(interp(RAND_MAX, mypos + 1, nnodes)) : map_.end(); - cout << "generating recovery over " << begin->first << ".." - << (end == map_.end() ? "end" : lexical_cast<string>(end->first)); - if (multirecover) - cout << " (node " << mypos << " of " << nnodes << ")"; - cout << endl; - long long start_snap = current_time_millis(); - foreach (const pii &p, make_iterator_range(begin, end)) { - Recovery_Pair *pair = recovery->add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - cout << "generating recovery took " - << current_time_millis() - start_snap << " ms" << endl; - recovery->set_seqno(seqno); - return move(recovery); -} -#endif - -template<typename mii> -recovery_t -make_recovery(const mii &map, int mypos, int nnodes, int seqno) -{ - return recovery_t(); -} - -struct recovery_header -{ - int seqno; - size_t count; - size_t total; - size_t size; -}; - -pair<size_t, size_t> -recovery_range(size_t size, int mypos, int nnodes) -{ - return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, - multirecover ? size * (mypos + 1) / size_t(nnodes) : size); -} - -template<> -recovery_t -make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int seqno) -{ - const commons::array<entry> &src = map.get_table(); - pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); - size_t begin = range.first, end = range.second; - ASSERT(end > begin); - recovery_header hdr = { seqno, end - begin, src.size(), map.size() }; - size_t bodylen = sizeof(entry) * hdr.count; - cout << "generating recovery of " << hdr.size << " records in " - << hdr.count << " slots (" - << bodylen << " bytes); range is [" - << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; - long long start_time = current_time_millis(); - commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); - raw_writer ser(recovery.begin()); - ser.write(recovery.size()); - ser.write(hdr); - memcpy(ser.ptr(), src.begin() + begin, bodylen); - showdatarate("serialized recovery", recovery.size(), - current_time_millis() - start_time); - return recovery; -} - class response_handler { public: @@ -1804,156 +379,6 @@ h.run<Types>(); } -class tpcc_response_handler -{ -public: - tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, - st_channel<st_netfd_t> &delreps, bool caught_up) - : - replica(replica), - seqno(seqno), - rid(rid), - recover_signals(recover_signals), - delreps(delreps), - caught_up(caught_up), - sub(recover_signals.subscribe()), - start_time(current_time_millis()), - recovery_start_time(caught_up ? -1 : start_time), - recovery_end_time(-1), - start_seqno(seqno), - recovery_start_seqno(caught_up ? -1 : seqno), - recovery_end_seqno(-1), - last_seqno(-1) - {} - - void run() { - finally f(bind(&tpcc_response_handler::cleanup, this)); - - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - st_reader reader(replica, rbuf.get(), rbuf.size()); - writer w(lambda(const void*, size_t) { - throw not_supported_exception("response handler should not be writing"); - }, wbuf.get(), wbuf.size()); - stream s(reader,w); - - long long last_display_time = current_time_millis(); - - function<void()> loop_cleanup = - bind(&tpcc_response_handler::loop_cleanup, this); - - TpccRes res; - - while (true) { - finally f(loop_cleanup); - - // Read the message, but correctly respond to interrupts so that we can - // cleanly exit (slightly tricky). - if (stopped_issuing && last_seqno + 1 == seqno) { - break; - } else { - st_intr intr(kill_hub); - readmsg(reader, res); - } - - if (res.seqno() == -1) { - st_intr intr(stop_hub); - cout << "got a failed node" << endl; - delreps.push(replica); - readmsg(reader, res); - last_seqno = seqno - 1; - } else { - - if (res.seqno() < last_seqno) - throw msg_exception(string("response seqno decreased from ") + - lexical_cast<string>(last_seqno) + " to " + - lexical_cast<string>(res.seqno())); - - if (!caught_up) { - long long now = current_time_millis(), time_diff = now - start_time; - caught_up = true; - recover_signals.push(now); - cout << rid << ": " << "recovering node caught up; took " - << time_diff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } - } - - if (check_interval(res.seqno(), handle_responses_display)) { - cout << rid << ": " << "got response " << res.seqno() << " from " - << replica << "; "; - long long display_time = current_time_millis(); - showtput("handling", display_time, last_display_time, res.seqno(), - res.seqno() - handle_responses_display); - last_display_time = display_time; - } - if (check_interval(res.seqno(), yield_interval)) { - st_sleep(0); - } - last_seqno = res.seqno(); - - } - } - } - -private: - void loop_cleanup() { - // The first timestamp that comes down the subscription pipeline is the - // recovery start time, issued by the main thread. The second one is the - // recovery end time, issued by the response handler associated with the - // joiner. - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = last_seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = last_seqno; - cout << rid << ": "; - showtput("during recovery, finished roughly", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); - } - } - - void cleanup() { - long long end_time = current_time_millis(); - cout << rid << ": "; - showtput("handled roughly", end_time, start_time, seqno, start_seqno); - if (recovery_end_time > -1) { - cout << rid << ": "; - showtput("after recovery, finished", end_time, recovery_end_time, - seqno, recovery_end_seqno); - } - } - - st_netfd_t replica; - const int &seqno; - int rid; - st_multichannel<long long> &recover_signals; - st_channel<st_netfd_t> &delreps; - bool caught_up; - st_channel<long long> ⊂ - long long start_time, recovery_start_time, recovery_end_time; - int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; -}; - -void -handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, - st_channel<st_netfd_t> &delreps, bool caught_up) -{ - tpcc_response_handler h(replica, seqno, rid, recover_signals, delreps, - caught_up); - h.run(); -} - struct recreq { int start_seqno, end_seqno; }; @@ -2048,1305 +473,3 @@ cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; } } - -/** - * Run the leader. - */ -template<typename Types, typename RTypes> -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - st_multichannel<long long> recover_signals; - - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - cout << "waiting for at least " << minreps << " replicas to join" << endl; - for (int i = 0; i < minreps; ++i) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(fd); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - cout << "got all " << minreps << " replicas" << endl; - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - init.set_multirecover(multirecover); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - st_bool accept_joiner; - int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; - foreach (const replica_info &r, replicas) newreps.push(r); - function<void()> f; - if (do_tpcc) - f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); - else - f = bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_joining join_issue_txns(my_spawn(f, "issue_txns")); - - finally fin(bind(summarize, "LEADER", ref(seqno))); - - try { - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - function<void()> fn; - if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); - else - fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); - handlers.insert(my_spawn(fn, "handle_responses")); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - try { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); - } catch (std::exception &ex) { - string s(ex.what()); - if (s.find("Interrupted system call") == s.npos) - throw; - else - throw break_exception(); - } - Join join = readmsg<Join>(joiner); - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - init.set_yourhost(replicas.back().host()); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - function<void()> handle_responses_joiner_fn; - if (do_tpcc) - handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); - else - ... [truncated message content] |