[Assorted-commits] SF.net SVN: assorted:[1314] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-20 17:45:49
|
Revision: 1314 http://assorted.svn.sourceforge.net/assorted/?rev=1314&view=rev Author: yangzhang Date: 2009-03-20 17:45:38 +0000 (Fri, 20 Mar 2009) Log Message: ----------- - added precompiled headers - renamed main2 to ydb - moved some more code around Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/msg.h ydb/trunk/src/stxn.lzz.clamp Added Paths: ----------- ydb/trunk/src/ydb.lzz.clamp Removed Paths: ------------- ydb/trunk/src/main2.lzz.clamp Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-20 07:58:28 UTC (rev 1313) +++ ydb/trunk/src/Makefile 2009-03-20 17:45:38 UTC (rev 1314) @@ -1,7 +1,9 @@ TARGET := ydb WTF := wtf -LZZS := $(patsubst %.clamp,%,$(wildcard *.lzz.clamp)) +CLAMPS := $(wildcard *.lzz.clamp) +PURELZZS := $(foreach lzz,$(wildcard *.lzz),$(if $(wildcard $(lzz).clamp),,$(lzz))) +LZZS := $(patsubst %.clamp,%,$(CLAMPS)) $(PURELZZS) LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) @@ -38,13 +40,13 @@ endif # CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) CXX := $(WTF) ccache $(CXX) -pipe -LD := $(CXX) +CC := $(CXX) # for linking LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ -lboost_serialization-gcc43-mt $(PPROF) -CXXFLAGS := $(OPT) -pthread $(GPROF) \ +CXXFLAGS0 := $(OPT) -pthread $(GPROF) \ -Wall \ -Werror \ -Wextra \ @@ -77,6 +79,7 @@ -std=gnu++0x \ -march=native \ $(CXXFLAGS) +CXXFLAGS := $(CXXFLAGS0) -include pch.h # \ -Wmissing-noreturn \ @@ -97,12 +100,13 @@ %.pb.o: %.pb.cc %.pb.h $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< +%.o: pch.h.gch stxn.o: main.hh $(PBHDRS) main.o: util.hh msg.h $(PBHDRS) util.o: msg.h $(PBHDRS) -main2.o: main.hh stxn.hh tpcc.hh $(PBHDRS) +ydb.o: main.hh stxn.hh tpcc.hh util.hh $(PBHDRS) tpcc.o: main.hh util.hh $(PBHDRS) -ydb: main.o main2.o util.o tpcc.o stxn.o +ydb: ydb.o tpcc.o main.o util.o stxn.o ydb.pb.o $(TPCC_OBJS) # $(OBJS) tpcc/%.o: tpcc/%.cc make -C tpcc/ @@ -126,23 +130,27 @@ mkdir -p clamp/ clamp --outdir clamp/ --prefix $(basename $@) < $< | \ sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ - sed "$$( echo -e '$$i\\\n\#src\n$$a\\\n\#end' )" > $@ + sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ chmod -w $@ -all.h: - fgrep '#include' main.lzz.clamp > all.h +pch.h: + svn ls -rHEAD -R | \ + grep -v '/$$' | \ + xargs sed 's/.*\binclude\b *<\(.*\)>.*/\#include <\1>/; t succ; d; :succ /commons/ d' | \ + sort -u > $@ -all.h.gch: all.h - $(COMPILE.cc) $(PBHDRS) $(OUTPUT_OPTION) $< +%.h.gch: CXXFLAGS = $(CXXFLAGS0) +%.h.gch: %.h + $(LINK.cc) $(OUTPUT_OPTION) $< clean: rm -rf clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) \ - main.lzz main2.lzz main.cc main.hh main2.hh main2.cc \ + main.lzz ydb.lzz main.cc main.hh ydb.hh ydb.cc \ util.cc util.hh tpcc.lzz tpcc.hh tpcc.cc make -C tpcc/ clean distclean: clean - rm -f all.h all.h.gch + rm -f pch.h pch.h.gch doc: $(SRCS) $(HDRS) doxygen Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-20 07:58:28 UTC (rev 1313) +++ ydb/trunk/src/main.lzz.clamp 2009-03-20 17:45:38 UTC (rev 1314) @@ -205,180 +205,6 @@ txn_wal *g_twal; //tpcc_wal *g_tpcc_wal; -class response_handler -{ -public: - response_handler(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) - : - replica(replica), - seqno(seqno), - rid(rid), - recover_signals(recover_signals), - caught_up(caught_up), - sub(recover_signals.subscribe()), - start_time(current_time_millis()), - recovery_start_time(caught_up ? -1 : start_time), - recovery_end_time(-1), - start_seqno(seqno), - recovery_start_seqno(caught_up ? -1 : seqno), - recovery_end_seqno(-1), - last_seqno(-1) - {} - - template<typename Types> - void run() { - typedef typename Types::Response Response; - typedef typename Types::ResponseBatch ResponseBatch; - - finally f(bind(&response_handler::cleanup, this)); - - commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - st_reader reader(replica, rbuf.get(), rbuf.size()); - writer w(lambda(const void*, size_t) { - throw not_supported_exception("response handler should not be writing"); - }, wbuf.get(), wbuf.size()); - stream s(reader,w); - - scoped_ptr<ResponseBatch> pbatch(new_ResponseBatch<ResponseBatch>(s)); - ResponseBatch &batch = *pbatch; - - long long last_display_time = current_time_millis(); - - function<void()> loop_cleanup = - bind(&response_handler::loop_cleanup, this); - - while (true) { - finally f(loop_cleanup); - uint32_t prefix = 0; - - // Read the message, but correctly respond to interrupts so that we can - // cleanly exit (slightly tricky). - if (last_seqno + 1 == seqno) { - // Stop-interruptible in case we're already caught up. - try { - st_intr intr(stop_hub); - if (Types::is_pb()) readmsg(reader, batch); - else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } - } catch (...) { // TODO: only catch interruptions - // This check on seqnos is OK for termination since the seqno will - // never grow again if stop_hub is set. - if (last_seqno + 1 == seqno) { - cout << rid << ": "; - cout << "clean stop; next expected seqno is " << seqno - << " (last seqno was " << last_seqno << ")" << endl; - break; - } else { - continue; - } - } - } else { - // Only kill-interruptible because we want a clean termination (want - // to get all the acks back). - st_intr intr(kill_hub); - if (Types::is_pb()) readmsg(reader, batch); - else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } - } - - for (int i = 0; i < batch.res_size(); ++i) { - const Response &res = batch.res(i); - // Determine if this response handler's host (the only joiner) has finished - // catching up. If it has, then broadcast a signal so that all response - // handlers will know about this event. - int rseqno = res.seqno(); - if (rseqno <= last_seqno) - throw msg_exception(string("response seqno decreased from ") + - lexical_cast<string>(last_seqno) + " to " + - lexical_cast<string>(rseqno)); - bool rcaught_up = res.caught_up(); - for (int r = 0; r < res.result_size(); ++r) { - cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; - } - if (!caught_up && rcaught_up) { - long long now = current_time_millis(), time_diff = now - start_time; - caught_up = true; - recover_signals.push(now); - cout << rid << ": "; - cout << "recovering node caught up; took " - << time_diff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); - } - } - if (check_interval(rseqno, handle_responses_display)) { - cout << rid << ": " << "got response " << rseqno << " from " - << replica << "; "; - long long display_time = current_time_millis(); - showtput("handling", display_time, last_display_time, rseqno, - rseqno - handle_responses_display); - last_display_time = display_time; - } - if (check_interval(rseqno, yield_interval)) { - st_sleep(0); - } - last_seqno = rseqno; - } - } - } - -private: - void loop_cleanup() { - // The first timestamp that comes down the subscription pipeline is the - // recovery start time, issued by the main thread. The second one is the - // recovery end time, issued by the response handler associated with the - // joiner. - if (recovery_start_time == -1 && !sub.empty()) { - recovery_start_time = sub.take(); - recovery_start_seqno = last_seqno; - cout << rid << ": "; - showtput("before recovery, finished", recovery_start_time, start_time, - recovery_start_seqno, 0); - } else if (recovery_end_time == -1 && !sub.empty()) { - recovery_end_time = sub.take(); - recovery_end_seqno = last_seqno; - cout << rid << ": "; - showtput("during recovery, finished roughly", recovery_end_time, - recovery_start_time, recovery_end_seqno, recovery_start_seqno); - } - } - - void cleanup() { - long long end_time = current_time_millis(); - cout << rid << ": "; - showtput("handled roughly", end_time, start_time, seqno, start_seqno); - if (recovery_end_time > -1) { - cout << rid << ": "; - showtput("after recovery, finished", end_time, recovery_end_time, - seqno, recovery_end_seqno); - } - } - - st_netfd_t replica; - const int &seqno; - int rid; - st_multichannel<long long> &recover_signals; - bool caught_up; - st_channel<long long> ⊂ - long long start_time, recovery_start_time, recovery_end_time; - int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; -}; - -/** - * Swallow replica responses. - */ -template<typename Types> -void -handle_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) -{ - response_handler h(replica, seqno, rid, recover_signals, caught_up); - h.run<Types>(); -} - struct recreq { int start_seqno, end_seqno; }; Deleted: ydb/trunk/src/main2.lzz.clamp =================================================================== --- ydb/trunk/src/main2.lzz.clamp 2009-03-20 07:58:28 UTC (rev 1313) +++ ydb/trunk/src/main2.lzz.clamp 2009-03-20 17:45:38 UTC (rev 1314) @@ -1,1073 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <boost/function.hpp> -#include <boost/scoped_ptr.hpp> -#include <string> -#include <iostream> -#include <st.h> -#include <commons/st/st.h> -#include "tpcc/clock.h" -#include "tpcc/randomgenerator.h" -#include "tpcc/tpccclient.h" -#include "tpcc/tpccgenerator.h" -#include "tpcc/tpcctables.h" -#include "util.hh" -#include "tpcc.hh" -#include "stxn.hh" -#include "main.hh" - -using namespace boost; -using namespace std; -using namespace commons; -#end - -#src -#include "unsetprefs.h" -#include <csignal> // sigaction, etc. -#include <cstring> // strsignal, size_t -#include <boost/program_options.hpp> -#include <gtest/gtest.h> -#include <malloc.h> -#include <string> -#include "setprefs.h" -#end - -using namespace google; -using namespace testing; - -// -// Utilities/system -// - -/** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (std::exception &ex) { - cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. Not actually - * used anywhere. - */ -st_thread_t -my_spawn(const function0<void> &f, string name, bool intr = false) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - threadnames[t] = name; - return t; -} - -/** - * Memory monitor. - */ -void -memmon() -{ - while (!stop_hub) { - { - st_intr intr(stop_hub); - st_sleep(1); - } - malloc_stats(); - } -} - -int sig_pipe[2]; - -// -// Signals -// - -/** - * Raw signal handler that triggers the (synchronous) handler. - */ -void handle_sig(int sig) { - int err = errno; - cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; - checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), - static_cast<ssize_t>(sizeof sig)); - errno = err; -} - -/** - * Synchronous part of the signal handler; cleanly interrrupts any threads that - * have marked themselves as interruptible. - */ -void handle_sig_sync() { - st_closing fd(checkerr(st_netfd_open(sig_pipe[0]))); - while (true) { - int sig; - checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(sizeof sig)); - if (sig == SIGINT) { - if (!stop_hub) stop_hub.set(); - else kill_hub.set(); - } else if (sig == SIGTERM) { - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - } else if (sig == SIGUSR1) { - toggle(do_pause); - } - } -} - -// -// Main -// - -/** - * Initialization and command-line parsing. - */ -int -main(int argc, char **argv) -{ - namespace po = boost::program_options; - try { - GOOGLE_PROTOBUF_VERIFY_VERSION; - - bool is_leader, use_epoll; - int minreps; - uint16_t leader_port, listen_port; - string leader_host; - - // Parse options. - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "show this help message") - ("debug-memory,M", po::bool_switch(&debug_memory), - "enable memory monitoring/debug outputs") - ("debug-threads,d",po::bool_switch(&debug_threads), - "enable context switch debug outputs") - ("profile-threads,q",po::bool_switch(&profile_threads), - "enable profiling of threads") - ("epoll,e", po::bool_switch(&use_epoll), - "use epoll (select is used by default)") - ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery (for recoverer)") - ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery (for recoverer)") - ("multirecover,m", po::bool_switch(&multirecover), - "recover from multiple hosts, instead of just one (specified via leader)") - ("rec-twal", po::bool_switch(&rec_twal), - "recover from twal") - ("rec-pwal", po::bool_switch(&rec_pwal), - "recover from pwal") - ("disk,k", po::bool_switch(&disk), - "use disk-based recovery") - ("ship-log", po::bool_switch(&ship_log), - "ship the log instead of the complete database state") - ("dump,D", po::bool_switch(&dump), - "replicas should finally dump their state to a tmp file for " - "inspection/diffing") - ("suppress-txn-msgs", po::bool_switch(&suppress_txn_msgs), - "suppress txn msgs") - ("fake-exec", po::bool_switch(&fake_exec), - "don't actually execute txns") - ("fake-bcast", po::bool_switch(&fake_bcast), - "when using --bcast-async, don't actually perform the socket write") - ("show-updates,U", po::bool_switch(&show_updates), - "log operations that touch (update/read/delete) an existing key") - ("count-updates,u",po::bool_switch(&count_updates), - "count operations that touch (update/read/delete) an existing key") - ("general-txns,g", po::bool_switch(&general_txns), - "issue read and delete transactions as well as the default of (only) insertion/update transactions (for leader)") - ("use-pb", po::bool_switch(&use_pb), - "use protocol buffers instead of raw buffers for txns") - ("use-pb-res", po::bool_switch(&use_pb_res), - "use protocol buffers instead of raw buffers for responses") - ("twal", po::bool_switch(&use_twal), - "enable transactional write-ahead logging") - ("pwal", po::bool_switch(&use_pwal), - "enable physical write-ahead logging") - ("force-ser", po::bool_switch(&force_ser), - "force issue_txns to serialize its Txns") - ("leader,l", po::bool_switch(&is_leader), - "run the leader (run replica by default)") - ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), - "exit after the joiner fully recovers (for leader)") - ("batch-size,b", po::value<int>(&batch_size)->default_value(100), - "number of txns to batch up in each msg (for leader)") - ("tpcc", po::bool_switch(&do_tpcc), - "run the TPCC workload") - ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), - "exit after txn seqno is issued (for leader)") - ("accept-joiner-size,s", - po::value<size_t>(&accept_joiner_size)->default_value(0), - "accept recovering joiner (start recovery) after DB grows to this size " - "(for leader)") - ("handle-responses-display", - po::value<int>(&handle_responses_display)->default_value(0), - "number of responses before printing current handling rate (for leader)") - ("catch-up-display", - po::value<int>(&catch_up_display)->default_value(0), - "number of catch-up txns before printing current recovery rate and queue length (for recoverer)") - ("issue-display", - po::value<int>(&issue_display)->default_value(0), - "number of txns before showing the current issue rate (for leader)") - ("process-display", - po::value<int>(&process_display)->default_value(0), - "number of txns before showing the current issue rate (for worker)") - ("issuing-interval", - po::value<int>(&issuing_interval)->default_value(0), - "seconds to sleep between issuing txns (for leader)") - ("min-ops,o", - po::value<int>(&min_ops)->default_value(5), - "lower bound on randomly generated number of operations per txn (for leader)") - ("max-ops,O", - po::value<int>(&max_ops)->default_value(5), - "upper bound on randomly generated number of operations per txn (for leader)") - ("warehouses,w", - po::value<int>(&nwarehouses)->default_value(1), - "number of warehouses to model") - ("fail-seqno", - po::value<int>(&fail_seqno)->default_value(0), - "fail after processing this seqno (for replica only)") - ("accept-joiner-seqno,j", - po::value<int>(&accept_joiner_seqno)->default_value(0), - "accept recovering joiner (start recovery) after this seqno (for leader " - "only)") - ("leader-host,H", - po::value<string>(&leader_host)->default_value(string("localhost")), - "hostname or address of the leader") - ("leader-port,P", - po::value<uint16_t>(&leader_port)->default_value(7654), - "port the leader listens on") - ("read-buf", po::value<size_t>(&read_buf_size)->default_value(1e7), - "size of the incoming (read) buffer in bytes") - ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), - "size of the outgoing (write) buffer in bytes") - ("yield-interval,y", po::value<int>(&yield_interval)->default_value(1000), - "number of txns before yielding") - ("timelim,T", po::value<long long>(&timelim)->default_value(0), - "general network IO time limit in milliseconds, or 0 for none") - ("write-thresh,w", po::value<long long>(&write_thresh)->default_value(200), - "if positive and any txn write exceeds this, then print a message") - ("read-thresh,r", po::value<long long>(&read_thresh)->default_value(0), - "if positive and any txn read exceeds this, then print a message") - ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), - "port to listen on (for worker)") - ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(200000), - "timeout for some IO operations that should actually time out (in microseconds)") - ("test", "execute unit tests instead of running the normal system") - ("minreps,n", po::value<int>(&minreps)->default_value(2), - "minimum number of replicas the system is willing to process txns on"); - - po::variables_map vm; - try { - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - - if (vm.count("help")) { - cout << desc << endl; - return 0; - } - - // Validate arguments. - check(min_ops > 0); - check(max_ops > 0); - check(max_ops >= min_ops); - } catch (std::exception &ex) { - cerr << ex.what() << endl << endl << desc << endl; - return 1; - } - - // Run unit-tests. - if (vm.count("test")) { - InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); - } - - // Initialize support for ST working with asynchronous signals. - check0x(pipe(sig_pipe)); - struct sigaction sa; - sa.sa_handler = handle_sig; - check0x(sigemptyset(&sa.sa_mask)); - sa.sa_flags = 0; - check0x(sigaction(SIGINT, &sa, nullptr)); - check0x(sigaction(SIGTERM, &sa, nullptr)); - check0x(sigaction(SIGUSR1, &sa, nullptr)); - - // Initialize ST. - if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); - check0x(st_init()); - my_spawn(bind(handle_sig_sync), "handle_sig_sync"); - if (debug_threads || profile_threads) { - st_set_switch_out_cb(switch_out_cb); - st_set_switch_in_cb(switch_in_cb); - } - - // Initialize thread manager for clean shutdown of all threads. - thread_eraser eraser; - threads.insert(st_thread_self()); - threadnames[st_thread_self()] = "main"; - - // Print memory debugging information. - if (debug_memory) { - my_spawn(memmon, "memmon"); - } - - long long start = thread_start_time = current_time_millis(); - - // At the end, cleanly stop the bcaster thread and print thread profiling - // information. - finally f(lambda() { - if (profile_threads) { - long long end = current_time_millis(); - long long all = end - __ref(start); - cout << "thread profiling results:" << endl; - long long total = 0; - typedef pair<st_thread_t, long long> entry; - foreach (entry p, threadtimes) { - total += p.second; - } - foreach (entry p, threadtimes) { - cout << "- " << threadname(p.first) << ": " << p.second << " ms (" - << pct(p.second, total) << "% of total, " - << pct(p.second, all) << "% of all)" << endl; - } - cout << "- total: " << total << " ms (" << pct(total, all) - << "% of all)" << endl; - cout << "- unaccounted: " << all - total << " ms (" - << pct(all - total, all) << "% of all)" << endl; - cout << "- all: " << all << " ms" << endl; - } - }); - - // Initialize the map. - init_map(g_map); - - cout << "pid " << getpid() << endl; - - // Which role are we? - if (is_leader) { - if (use_pb) { - if (use_pb_res) { - run_leader<pb_traits, pb_traits>(minreps, leader_port); - } else { - run_leader<pb_traits, rb_traits>(minreps, leader_port); - } - } else { - if (use_pb_res) { - run_leader<rb_traits, pb_traits>(minreps, leader_port); - } else { - run_leader<rb_traits, rb_traits>(minreps, leader_port); - } - } - } else { - if (use_pb) { - if (use_pb_res) { - run_replica<pb_traits, pb_traits>(leader_host, leader_port, listen_port); - } else { - run_replica<pb_traits, rb_traits>(leader_host, leader_port, listen_port); - } - } else { - if (use_pb_res) { - run_replica<rb_traits, pb_traits>(leader_host, leader_port, listen_port); - } else { - run_replica<rb_traits, rb_traits>(leader_host, leader_port, listen_port); - } - } - } - - return 0; - } catch (std::exception &ex) { - // Must catch all exceptions at the top to make the stack unwind. - cerr_thread_ex(ex) << endl; - return 1; - } -} - -/** - * Run the leader. - */ -template<typename Types, typename RTypes> -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - st_multichannel<long long> recover_signals; - - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - cout << "waiting for at least " << minreps << " replicas to join" << endl; - for (int i = 0; i < minreps; ++i) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(fd); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - cout << "got all " << minreps << " replicas" << endl; - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - init.set_multirecover(multirecover); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - st_bool accept_joiner; - int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; - foreach (const replica_info &r, replicas) newreps.push(r); - function<void()> f; - if (do_tpcc) - f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); - else - f = bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_joining join_issue_txns(my_spawn(f, "issue_txns")); - - finally fin(bind(summarize, "LEADER", ref(seqno))); - - try { - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - function<void()> fn; - if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); - else - fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); - handlers.insert(my_spawn(fn, "handle_responses")); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - try { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); - } catch (std::exception &ex) { - string s(ex.what()); - if (s.find("Interrupted system call") == s.npos) - throw; - else - throw break_exception(); - } - Join join = readmsg<Join>(joiner); - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - init.set_yourhost(replicas.back().host()); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - function<void()> handle_responses_joiner_fn; - if (do_tpcc) - handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); - else - handle_responses_joiner_fn = - bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, - ref(recover_signals), false); - newreps.push(replicas.back()); - handlers.insert(my_spawn(handle_responses_joiner_fn, - "handle_responses_joiner")); - } catch (break_exception &ex) { - } catch (std::exception &ex) { - // TODO: maybe there's a cleaner way to do this final step before waiting with the join - cerr_thread_ex(ex) << endl; - throw; - } -} - -void -summarize(const char *role, int seqno) -{ - cout << role << " SUMMARY\n"; - if (do_tpcc) { - cout << "seqno: " << seqno << endl; - if (g_tables != nullptr) { - cout << "state:\n"; - g_tables->show(); - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - // XXX iterate & dump - } - } - } else { - cout << "- total updates = " << updates << "\n" - << "- final DB state: seqno = " << seqno << ", size = " - << g_map.size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << seqno << endl; - foreach (const entry &p, g_map) { - of << p.first << ": " << p.second << endl; - } - } - } -} - -/** - * Run a replica. - */ -template<typename Types, typename RTypes> -void -run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - if (disk) { - // Disk IO threads. - for (int i = 0; i < 5; ++i) { - //thread somethread(threadfunc); - } - } - - // Initialize database state. - int seqno = -1; - mii &map = g_map; - commons::array<char> recarr(0); - if (do_tpcc) { - TPCCTables *tables = new TPCCTables(); - g_tables.reset(tables); - SystemClock* clock = new SystemClock(); - - // Create a generator for filling the database. - RealRandomGenerator* random = new RealRandomGenerator(); - NURandC cLoad = NURandC::makeRandom(random); - random->setC(cLoad); - - // Generate the data - cout << "loading " << nwarehouses << " warehouses" << endl; - char now[Clock::DATETIME_SIZE+1]; - clock->getDateTimestamp(now); - TPCCGenerator generator(random, now, Item::NUM_ITEMS, - District::NUM_PER_WAREHOUSE, - Customer::NUM_PER_DISTRICT, - NewOrder::INITIAL_NUM_PER_DISTRICT); - long long start_time = current_time_millis(); - generator.makeItemsTable(tables); - for (int i = 0; i < nwarehouses; ++i) { - generator.makeWarehouse(tables, i+1); - } - cout << "loaded " << nwarehouses << " warehouses in " - << current_time_millis() - start_time << " ms" << endl; - tables->show(); - } - recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); - - finally f(bind(summarize, "REPLICA", ref(seqno))); - st_channel<recovery_t> send_states; - - cout << "starting as replica on port " << listen_port << endl; - - // Listen for connections from other replicas. - st_netfd_t listener = st_tcp_listen(listen_port); - - // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, - timeout); - st_closing closing(leader); - Join join; - join.set_port(listen_port); - sendmsg(leader, join); - Init init; - { - st_intr intr(stop_hub); - readmsg(leader, init); - } - uint32_t listen_host = init.yourhost(); - multirecover = init.multirecover(); - - // Display the info. - cout << "got init msg with txn seqno " << init.txnseqno() - << " and hosts:" << endl; - vector<st_netfd_t> replicas; - st_closing_all close_replicas(replicas); - int mypos = -1; - for (int i = 0; i < init.node_size(); ++i) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host = { sa.host() }; - bool is_self = sa.host() == listen_host && sa.port() == listen_port; - cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, - INET_ADDRSTRLEN)) - << ':' << sa.port() << (is_self ? " (self)" : "") << endl; - if (is_self) mypos = i; - if (!is_self && (init.txnseqno() > 0 || rec_twal)) { - replicas.push_back(st_tcp_connect(host, - static_cast<uint16_t>(sa.port()), - timeout)); - } - } - - // Initialize physical or txn log. - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Process txns. - st_channel<chunk> backlog; - function<void()> process_fn; - if (do_tpcc) - process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), - ref(backlog), init.txnseqno(), mypos, init.node_size()); - else - process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), - ref(send_states), ref(backlog), init.txnseqno(), mypos, - init.node_size()); - st_joining join_proc(my_spawn(process_fn, "process_txns")); - st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? - my_spawn(bind(recover_joiner, listener, ref(send_states)), - "recover_joiner") : - nullptr); - - try { - // If there's anything to recover. - if (init.txnseqno() > 0 || fail_seqno > 0) { - if (do_tpcc) { - - // - // TPCC txns - // - - function<void()> rec_twal_fn = lambda() { - int &seqno = __ref(seqno); - cout << "recovering from twal" << endl; - long long start_time = current_time_millis(); - g_twal->flush(); - sync(); - ifstream inf("twal"); - TpccReq req; - while (inf.peek() != ifstream::traits_type::eof()) { - ASSERT(inf.good()); - readmsg(inf, req); - process_tpcc(req, seqno, nullptr); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - showdatarate("recovered from twal", inf.tellg(), - current_time_millis() - start_time); - cout << "now at seqno " << seqno << endl; - }; - - function<void()> recv_log_fn = lambda() { - st_netfd_t src = __ref(replicas[0]); - int &seqno = __ref(seqno); - ASSERT(fail_seqno == seqno); - recreq r = { fail_seqno + 1, resume.take() }; - st_write(src, r); - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - function<void(anchored_stream_reader &reader)> overflow_fn = - lambda(anchored_stream_reader &reader) { - shift_reader(reader); - }; - anchored_stream_reader reader(st_read_fn(src), - st_read_fully_fn(src), - overflow_fn, rbuf.get(), rbuf.size()); - TpccReq req; - while (seqno < r.end_seqno) { - { st_intr intr(stop_hub); readmsg(reader, req); } - process_tpcc(req, seqno, nullptr); - reader.set_anchor(); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - }; - - if (rec_twal) { - failed.waitset(); - g_tables.reset(new TPCCTables); - tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); - commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), - orig.size() - sizeof(tpcc_recovery_header)); - g_tables->deser(mypos, init.node_size(), hdr, body); - body.release(); - rec_twal_fn(); - failed.reset(); - recv_log_fn(); - } - -#if 0 - st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); - st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); - - st_join(rec_twal_thread); - st_join(recv_log_thread); -#endif - - if (rec_pwal) { - // Recover from phy log. - } else if (rec_twal) { - // Recover from txn log. - } else { - - g_tables.reset(new TPCCTables); - - // - // Build-up - // - - if (ship_log) { - } else { - // XXX indent - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - tpcc_recovery_header hdr; - checkeqnneg(st_read_fully(__ref(replicas[i]), - &hdr, sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof hdr)); - check(hdr.seqno >= 0); - - cout << "receiving recovery of " << hdr.len << " bytes" << endl; - - long long start_time = current_time_millis(); - __ref(recarr).reset(new char[hdr.len], hdr.len); - checkeqnneg(st_read_fully(__ref(replicas[i]), - __ref(recarr).get(), hdr.len, - ST_UTIME_NO_TIMEOUT), - ssize_t(hdr.len)); - - long long before_deser = current_time_millis(); - showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); - - if (__ref(seqno) == -1) - __ref(seqno) = hdr.seqno; - else - checkeq(__ref(seqno), hdr.seqno); - - g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); - - long long end_time = current_time_millis(); - showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); - cout << "receive & deserialize took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; - cout << "after deserialize, db state is now at seqno " - << hdr.seqno << ":" << endl; - g_tables->show(); - -#if 0 - // Resize the table if necessary. - - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(first) = false; - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } - } - - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); -#endif - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - TpccReq req; - while (!backlog.empty()) { - chunk chunk = backlog.take(); - cout << "took from backlog, now has " << backlog.queue().size() - << " chunks" << endl; - sized_array<char> &buf = chunk.get<0>(); - char *begin = chunk.get<1>(), *end = chunk.get<2>(); - ASSERT(buf.get() <= begin && begin < buf.end()); - ASSERT(buf.get() < end && end < buf.end()); - process_buf(begin, end, req, seqno); - } - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); - - } else { - - // - // Simple txns - // - - if (rec_pwal) { - // Recover from physical log. - cout << "recovering from pwal" << endl; - long long start_time = current_time_millis(); - ifstream inf("pwal"); - binary_iarchive in(inf); - int rseqno = -1; - while (inf.peek() != ifstream::traits_type::eof()) { - int op; - in & op; - switch (op) { - case op_del: - { - int key; - in & key; - mii::iterator it = map.find(key); - map.erase(it); - break; - } - case op_write: - { - int key, val; - in & key & val; - map[key] = val; - break; - } - case op_commit: - ++rseqno; - break; - } - if (check_interval(rseqno, yield_interval)) st_sleep(0); - } - seqno = init.txnseqno() - 1; - showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); - cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; - } else { - - // - // Build-up - // - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); - - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(first) = false; - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } - } - - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - long long start_time = current_time_millis(); - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); - long long end_time = current_time_millis(); - - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - showdatarate("got recovery message", len, end_time - start_time); - cout << "receive took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; -#if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); - } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; -#endif - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - // XXX - using msg::TxnBatch; - using msg::Txn; - commons::array<char> rbuf(0), wbuf(buf_size); - reader reader(nullptr, rbuf.get(), rbuf.size()); - writer writer(lambda(const void*, size_t) { - throw not_supported_exception("should not be writing responses during catch-up phase"); - }, wbuf.get(), wbuf.size()); - stream s(reader, writer); - TxnBatch batch(s); - while (!backlog.empty()) { - chunk chunk = backlog.take(); - sized_array<char> &buf = chunk.get<0>(); - ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - ASSERT(chunk.get<1>() < chunk.get<2>()); - swap(buf, reader.buf()); - reader.reset_range(chunk.get<1>(), chunk.get<2>()); - while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = ntohl(reader.read<uint32_t>()); - ASSERT(prefix < 10000); - ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); - batch.Clear(); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; - process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } - - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "caught up txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; - } - } - ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); - } - } - g_caught_up = true; -#if 0 - while (!backlog.empty()) { - using pb::Txn; - shared_ptr<Txn> p = backlog.take(); - process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; - } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } - } -#endif - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); - } - } - } catch (std::exception &ex) { - cerr_thread_ex(ex) << endl; - throw; - } - - stop_hub.insert(st_thread_self()); -} Modified: ydb/trunk/src/msg.h =================================================================== --- ydb/trunk/src/msg.h 2009-03-20 07:58:28 UTC (rev 1313) +++ ydb/trunk/src/msg.h 2009-03-20 17:45:38 UTC (rev 1314) @@ -55,7 +55,7 @@ using namespace commons; using namespace std; -short unset = -7654; +static const short unset = -7654; using ydb::pb::Op_OpType; Modified: ydb/trunk/src/stxn.lzz.clamp =================================================================== --- ydb/trunk/src/stxn.lzz.clamp 2009-03-20 07:58:28 UTC (rev 1313) +++ ydb/trunk/src/stxn.lzz.clamp 2009-03-20 17:45:38 UTC (rev 1314) @@ -556,3 +556,178 @@ { return recovery_t(); } + +class response_handler +{ +public: + response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + start_seqno(seqno), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + template<typename Types> + void run() { + typedef typename Types::Response Response; + typedef typename Types::ResponseBatch ResponseBatch; + + finally f(bind(&response_handler::cleanup, this)); + + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(replica, rbuf.get(), rbuf.size()); + writer w(lambda(const void*, size_t) { + throw not_supported_exception("response handler should not be writing"); + }, wbuf.get(), wbuf.size()); + stream s(reader,w); + + scoped_ptr<ResponseBatch> pbatch(new_ResponseBatch<ResponseBatch>(s)); + ResponseBatch &batch = *pbatch; + + long long last_display_time = current_time_millis(); + + function<void()> loop_cleanup = + bind(&response_handler::loop_cleanup, this); + + while (true) { + finally f(loop_cleanup); + uint32_t prefix = 0; + + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (last_seqno + 1 == seqno) { + // Stop-interruptible in case we're already caught up. + try { + st_intr intr(stop_hub); + if (Types::is_pb()) readmsg(reader, batch); + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } + } catch (...) { // TODO: only catch interruptions + // This check on seqnos is OK for termination since the seqno will + // never grow again if stop_hub is set. + if (last_seqno + 1 == seqno) { + cout << rid << ": "; + cout << "clean stop; next expected seqno is " << seqno + << " (last seqno was " << last_seqno << ")" << endl; + break; + } else { + continue; + } + } + } else { + // Only kill-interruptible because we want a clean termination (want + // to get all the acks back). + st_intr intr(kill_hub); + if (Types::is_pb()) readmsg(reader, batch); + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } + } + + for (int i = 0; i < batch.res_size(); ++i) { + const Response &res = batch.res(i); + // Determine if this response handler's host (the only joiner) has finished + // catching up. If it has, then broadcast a signal so that all response + // handlers will know about this event. + int rseqno = res.seqno(); + if (rseqno <= last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(rseqno)); + bool rcaught_up = res.caught_up(); + for (int r = 0; r < res.result_size(); ++r) { + cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; + } + if (!caught_up && rcaught_up) { + long long now = current_time_millis(), time_diff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": "; + cout << "recovering node caught up; took " + << time_diff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + if (check_interval(rseqno, handle_responses_display)) { + cout << rid << ": " << "got response " << rseqno << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, rseqno, + rseqno - handle_responses_display); + last_display_time = display_time; + } + if (check_interval(rseqno, yield_interval)) { + st_sleep(0); + } + last_seqno = rseqno; + } + } + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = last_seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = last_seqno; + cout << rid << ": "; + showtput("during recovery, finished roughly", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + } + + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled roughly", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +/** + * Swallow replica responses. + */ +template<typename Types> +void +handle_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run<Types>(); +} + Copied: ydb/trunk/src/ydb.lzz.clamp (from rev 1313, ydb/trunk/src/main2.lzz.clamp) =================================================================== --- ydb/trunk/src/ydb.lzz.clamp (rev 0) +++ ydb/trunk/src/ydb.lzz.clamp 2009-03-20 17:45:38 UTC (rev 1314) @@ -0,0 +1,1073 @@ +#hdr +#include "unsetprefs.h" +#include <boost/function.hpp> +#include <boost/scoped_ptr.hpp> +#include <string> +#include <iostream> +#include <st.h> +#include <commons/st/st.h> +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" +#include "util.hh" +#include "tpcc.hh" +#include "stxn.hh" +#include "main.hh" + +using namespace boost; +using namespace std; +using namespace commons; +#end + +#src +#include "unsetprefs.h" +#include <csignal> // sigaction, etc. +#include <cstring> // strsignal, size_t +#include <boost/program_options.hpp> +#include <gtest/gtest.h> +#include <malloc.h> +#include <string> +#include "setprefs.h" +#end + +using namespace google; +using namespace testing; + +// +// Utilities/system +// + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. + */ +void +my_spawn_helper(const function0<void> f, bool intr) +{ + thread_eraser eraser; + try { + f(); + } catch (std::exception &ex) { + cerr_thread_ex(ex) << (intr ? "; interrupting!" : "") << endl; + if (intr) stop_hub.set(); + } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. Not actually + * used anywhere. + */ +st_thread_t +my_spawn(const function0<void> &f, string name, bool intr = false) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); + threads.insert(t); + threadnames[t] = name; + return t; +} + +/** + * Memory monitor. + */ +void +memmon() +{ + while (!stop_hub) { + { + st_intr intr(stop_hub); + st_sleep(1); + } + malloc_stats(); + } +} + +int sig_pipe[2]; + +// +// Signals +// + +/** + * Raw signal handler that triggers the (synchronous) handler. + */ +void handle_sig(int sig) { + int err = errno; + cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), + static_cast<ssize_t>(sizeof sig)); + errno = err; +} + +/** + * Synchronous part of the signal handler; cleanly interrrupts any threads that + * have marked themselves as interruptible. + */ +void handle_sig_sync() { + st_closing fd(checkerr(st_netfd_open(sig_pipe[0]))); + while (true) { + int sig; + checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(sizeof sig)); + if (sig == SIGINT) { + ... [truncated message content] |