[Assorted-commits] SF.net SVN: assorted:[1332] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-24 09:11:34
|
Revision: 1332 http://assorted.svn.sourceforge.net/assorted/?rev=1332&view=rev Author: yangzhang Date: 2009-03-24 09:11:22 +0000 (Tue, 24 Mar 2009) Log Message: ----------- .lzz.clamp -> .clamp.lzz; cleaned up makefile Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/mkdeps.py Added Paths: ----------- ydb/trunk/src/leader.clamp.lzz ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/run.clamp.lzz ydb/trunk/src/stxn.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz ydb/trunk/src/util.clamp.lzz ydb/trunk/src/ydb.clamp.lzz Removed Paths: ------------- ydb/trunk/src/leader.lzz.clamp ydb/trunk/src/main.lzz.clamp ydb/trunk/src/rectpcc.lzz.clamp ydb/trunk/src/replica.lzz.clamp ydb/trunk/src/run.lzz.clamp ydb/trunk/src/stxn.lzz.clamp ydb/trunk/src/tpcc.lzz.clamp ydb/trunk/src/util.lzz.clamp ydb/trunk/src/ydb.lzz.clamp Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/Makefile 2009-03-24 09:11:22 UTC (rev 1332) @@ -3,10 +3,11 @@ # SHELL := bash -WTF := wtf -ORIGCXX := $(CXX) CCACHE := ccache export CCACHE_PREFIX := distcc +ifneq ($(WTF),) +WTF := wtf +endif CXX := $(WTF) $(CCACHE) $(CXX) -pipe TARGET_ARCH := $(shell [[ "$$(uname -m)" == x86_64 ]] && echo -m64 || echo -m32 ) \ @@ -95,35 +96,28 @@ # SVNURL := https://assorted.svn.sourceforge.net/svnroot/assorted/ydb/trunk/src - TARGET := ydb -CLAMPS := $(wildcard *.lzz.clamp) -CLAMPLZZS:= $(patsubst %.clamp,%,$(CLAMPS)) -PURELZZS := $(foreach lzz,$(wildcard *.lzz),$(if $(wildcard $(lzz).clamp),,$(lzz))) -LZZS := $(CLAMPLZZS) $(PURELZZS) -LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) -LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) -LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) +CLAMPLZZS:= $(wildcard *.clamp.lzz) +CLAMPS := $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.hh.clamp,$(lzz))) \ + $(foreach lzz,$(CLAMPLZZS),$(patsubst %.clamp.lzz,%.cc.clamp,$(lzz))) +CLAMPOUTS:= $(foreach clamp,$(CLAMPS),$(patsubst %.clamp,%,$(clamp))) PBS := $(wildcard *.proto) -PBHDRS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) -PBSRCS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) -PBOBJS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.o,$(pb))) +PBOUTS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) \ + $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) COGS := $(wildcard tpcc/*.cog) -COGSRCS := $(foreach cog,$(COGS),$(patsubst %.cog,%,$(cog))) +COGOUTS := $(foreach cog,$(COGS),$(patsubst %.cog,%,$(cog))) -GENHDRS := $(LZZHDRS) $(PBHDRS) $(COGHDRS) -GENSRCS := $(LZZSRCS) $(PBSRCS) $(COGSRCS) GENOBJS := $(LZZOBJS) $(PBOBJS) $(COGOBJS) +GENOUTS := $(CLAMPOUTS) $(PBOUTS) $(COGOUTS) -TPCC_OBJS:= clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables -TPCC_OBJS:= $(foreach i,$(TPCC_OBJS),tpcc/$(i).o) +TPCCOBJS := clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables +TPCCOBJS := $(foreach i,$(TPCCOBJS),tpcc/$(i).o) -HDRS := $(GENHDRS) -SRCS := $(GENSRCS) -OBJS := $(GENOBJS) $(TPCC_OBJS) +SRCS := $(GENOUTS) msg.h +OBJS := $(patsubst %.cc,%.o,$(filter %.cc,$(SRCS))) $(TPCCOBJS) # # Rules @@ -131,7 +125,7 @@ all: $(TARGET) -doc: $(SRCS) $(HDRS) +doc: $(SRCS) doxygen %.pb.o: WARNINGS = -Wall -Werror @@ -143,24 +137,11 @@ %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< -# ORIG -# -#%.cc %.hh: %.lzz -# lzz -hx hh -sx cc -hl -sl -hd -sd $< -# -#%.lzz: %.lzz.clamp -# rm -f $@ -# mkdir -p .clamp/ -# clamp --outdir .clamp/ --prefix $(basename $@) < $< | \ -# sed "$$( echo -e '1i\\\n\#hdr\n1a\\\n\#end' )" | \ -# sed "$$( echo -e '$$i\\\n\#hdr\n$$a\\\n\#end' )" > $@ -# chmod -w $@ - -%.cc.clamp %.hh.clamp: %.lzz.clamp +%.cc.clamp %.hh.clamp: %.clamp.lzz ln -sf $< $(basename $<) rm -f $(basename $(basename $<)).{hh,cc}.clamp lzz -hx hh.clamp -sx cc.clamp -hd -sd $(basename $<) - chmod -w $(basename $(basename $<)).{hh.clamp,cc.clamp} + chmod -w $(basename $(basename $<)).{hh,cc}.clamp %.cc: %.cc.clamp rm -f $@ @@ -207,12 +188,12 @@ -Wno-unused-parameter clean: - rm -rf .clamp/ $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) $(CLAMPLZZS) *.d *.hh.clamp *.cc.clamp + rm -rf $(GENOUTS) $(OBJS) $(TARGET) $(CLAMPLZZS) .clamp/ *.d distclean: clean rm -f pch.h pch.h.gch -.SECONDARY: $(GENSRCS) $(GENHDRS) $(OBJS) main.lzz pch.h.gch +.SECONDARY: $(GENOUTS) $(OBJS) pch.h.gch serperf: ydb.pb.o ser: ydb.pb.o Copied: ydb/trunk/src/leader.clamp.lzz (from rev 1331, ydb/trunk/src/leader.lzz.clamp) =================================================================== --- ydb/trunk/src/leader.clamp.lzz (rev 0) +++ ydb/trunk/src/leader.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,137 @@ +#hdr +#include <stdint.h> +#end + +#src +#include "unsetprefs.h" +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#include "setprefs.h" +#end + +/** + * Run the leader. + */ +void +run_leader(int minreps, uint16_t leader_port) +{ + cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; + + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(leader_port); + st_closing close_listener(listener); + vector<replica_info> replicas; + st_closing_all_infos close_replicas(replicas); + cout << "waiting for at least " << minreps << " replicas to join" << endl; + Join join; + for (int i = 0; i < minreps; ++i) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + readmsg(fd, join); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } + cout << "got all " << minreps << " replicas" << endl; + + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + init.set_multirecover(multirecover); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } + + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } + + // Start dispatching queries. + st_bool accept_joiner; + int seqno = 0; + st_channel<replica_info> newreps; + st_channel<st_netfd_t> delreps; + foreach (const replica_info &r, replicas) newreps.push(r); + function<void()> f; + if (do_tpcc) + f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); + else + f = bind(issue_txns, ref(newreps), ref(seqno), ref(accept_joiner)); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); + + finally fin(bind(summarize, "LEADER", ref(seqno))); + + try { + // Start handling responses. + st_thread_group handlers; + int rid = 0; + foreach (replica_info r, replicas) { + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), ref(delreps), true); + else + fn = bind(handle_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + try { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + accept_joiner.waitset(); + } catch (std::exception &ex) { + string s(ex.what()); + if (s.find("Interrupted system call") == s.npos) + throw; + else + throw break_exception(); + } + Join join; + readmsg(joiner, join); + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + init.set_yourhost(replicas.back().host()); + sendmsg(joiner, init); + recover_signals.push(current_time_millis()); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), ref(delreps), false); + else + handle_responses_joiner_fn = + bind(handle_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + newreps.push(replicas.back()); + handlers.insert(my_spawn(handle_responses_joiner_fn, + "handle_responses_joiner")); + } catch (break_exception &ex) { + } catch (std::exception &ex) { + // TODO: maybe there's a cleaner way to do this final step before waiting with the join + cerr_thread_ex(ex) << endl; + throw; + } +} Property changes on: ydb/trunk/src/leader.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/leader.lzz.clamp =================================================================== --- ydb/trunk/src/leader.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/leader.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,137 +0,0 @@ -#hdr -#include <stdint.h> -#end - -#src -#include "unsetprefs.h" -#include <commons/st/sockets.h> -#include <commons/st/threads.h> -#include "run.hh" -#include "stxn.hh" -#include "tpcc.hh" -#include "setprefs.h" -#end - -/** - * Run the leader. - */ -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - st_multichannel<long long> recover_signals; - - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); - g_twal = twal.get(); - scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); - g_wal = pwal.get(); - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - cout << "waiting for at least " << minreps << " replicas to join" << endl; - Join join; - for (int i = 0; i < minreps; ++i) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - readmsg(fd, join); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - cout << "got all " << minreps << " replicas" << endl; - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - init.set_multirecover(multirecover); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - st_bool accept_joiner; - int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; - foreach (const replica_info &r, replicas) newreps.push(r); - function<void()> f; - if (do_tpcc) - f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); - else - f = bind(issue_txns, ref(newreps), ref(seqno), ref(accept_joiner)); - st_joining join_issue_txns(my_spawn(f, "issue_txns")); - - finally fin(bind(summarize, "LEADER", ref(seqno))); - - try { - // Start handling responses. - st_thread_group handlers; - int rid = 0; - foreach (replica_info r, replicas) { - function<void()> fn; - if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); - else - fn = bind(handle_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); - handlers.insert(my_spawn(fn, "handle_responses")); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - try { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - accept_joiner.waitset(); - } catch (std::exception &ex) { - string s(ex.what()); - if (s.find("Interrupted system call") == s.npos) - throw; - else - throw break_exception(); - } - Join join; - readmsg(joiner, join); - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - init.set_yourhost(replicas.back().host()); - sendmsg(joiner, init); - recover_signals.push(current_time_millis()); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - function<void()> handle_responses_joiner_fn; - if (do_tpcc) - handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); - else - handle_responses_joiner_fn = - bind(handle_responses, joiner, ref(seqno), rid++, - ref(recover_signals), false); - newreps.push(replicas.back()); - handlers.insert(my_spawn(handle_responses_joiner_fn, - "handle_responses_joiner")); - } catch (break_exception &ex) { - } catch (std::exception &ex) { - // TODO: maybe there's a cleaner way to do this final step before waiting with the join - cerr_thread_ex(ex) << endl; - throw; - } -} Copied: ydb/trunk/src/main.clamp.lzz (from rev 1331, ydb/trunk/src/main.lzz.clamp) =================================================================== --- ydb/trunk/src/main.clamp.lzz (rev 0) +++ ydb/trunk/src/main.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,313 @@ +#hdr +#include "unsetprefs.h" +#include <boost/tuple/tuple.hpp> +#include <commons/st/intr.h> +#include <commons/st/sync.h> +#include <commons/st/channel.h> +#include <fstream> // ofstream +#include <vector> +#include "util.hh" +#include "setprefs.h" + +namespace boost { namespace archive { class binary_oarchive; } } + +using namespace boost; +using namespace boost::archive; +using namespace commons; +using namespace std; +using namespace ydb; +using namespace ydb::msg; +#end + +#src +#include "unsetprefs.h" +#include <boost/foreach.hpp> +#include <boost/archive/binary_oarchive.hpp> +#include <commons/assert.h> +#include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/sockets.h> +#include <iostream> +#include <unistd.h> // pipe, write, sync +#include "tpcc/tpcctables.h" +#include "msg.h" +#include "setprefs.h" +#end + +typedef tuple<sized_array<char>, char*, char*> chunk; + +typedef commons::array<char> recovery_t; + + +// Configuration. +st_utime_t timeout; +int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, + stop_on_seqno, batch_size, handle_responses_display, fail_seqno, + catch_up_display, issue_display, nwarehouses, + process_display; +size_t accept_joiner_size, read_buf_size; +bool yield_during_build_up, yield_during_catch_up, dump, show_updates, + count_updates, stop_on_recovery, general_txns, + disk, debug_memory, use_pwal, use_twal, + use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, + suppress_txn_msgs, force_ser, fake_exec, ship_log; +long long timelim, read_thresh; + +// Control. +st_intr_bool stop_hub, kill_hub; +st_bool do_pause; +// On leader, signifies that a node is in fail mode. On replica, signifies that a node is in fail mode/recovering from the twal. +st_bool failed; +// The seqno on which we should resume. +st_channel<int> resume; +bool stopped_issuing; + +// Statistics. +int updates; + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + /** port is the replica's listen port, not the port bound to the fd socket. */ + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } + /** The port on which the replica connected to us. */ + uint16_t local_port() const { return sockaddr().sin_port; } + uint32_t host() const { return sockaddr().sin_addr.s_addr; } + sockaddr_in sockaddr() const { sockaddr_in sa; sockaddr(sa); return sa; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all_infos +{ + public: + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { + cout << "closing all conns to replicas (replica_infos)" << endl; + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +#if 0 +st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; + +/** + * The worker that performs the actual broadcasting. + */ +void +bcaster() +{ + while (!kill_hub) { + pair<st_netfd_t, shared_ptr<string> > pr; + { + st_intr intr(kill_hub); + pr = msgs.take(); + } + st_netfd_t dst = pr.first; + shared_ptr<string> &p = pr.second; + if (p.get() == nullptr) break; + string &s = *p.get(); + + if (!fake_bcast) + st_timed_write(dst, s.data(), s.size()); + } +} + +/** + * Asynchronous version of the broadcaster. + */ +void +bcastbuf_async(const vector<st_netfd_t> &dsts, const ser_t &msg) +{ + shared_ptr<string> p(new string); + ser(*p.get(), msg); + foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); +} +#endif + +enum { op_del, op_write, op_commit }; + +/** + * ARIES write-ahead log. No undo logging necessary (no steal). + */ +class wal +{ +public: + wal(const string &fname) : + of_(fname.c_str()), + ar_(new binary_oarchive(of())) + {} + ~wal() { delete ar_; } + template <typename T> + void log(const T &msg) { ser(of(), msg); } + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } + void logbuf(const void *buf, size_t len) { + of().write(reinterpret_cast<const char*>(buf), len); + } + void logdel(int key) { + int op = op_del; // TODO: is this really necessary? + ar() & op & key; + } + void logwrite(int key, int val) { + int op = op_write; + ar() & op & key & val; + } + void logcommit() { + int op = op_commit; + ar() & op; + } + void flush() { of().flush(); } +private: + ofstream of_; + //unique_ptr<binary_oarchive> ar_; + binary_oarchive *ar_; + ofstream &of() { return of_; } + binary_oarchive &ar() { return *ar_; }; +}; + +// TODO? +class txn_wal { +public: + txn_wal(const string &fname) : of(fname.c_str()) {} + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } + void logbuf(const void *buf, size_t len) { + of.write(reinterpret_cast<const char*>(buf), len); + } + void flush() { of.flush(); } +private: + ofstream of; +}; + +// Globals +wal *g_wal; +txn_wal *g_twal; +//tpcc_wal *g_tpcc_wal; + +struct recreq { + int start_seqno, end_seqno; +}; + +/** + * Help the recovering node. + * + * \param[in] listener The connection on which we're listening for connections + * from recovering joiners. + * + * \param[in] map The database state. + * + * \param[in] seqno The sequence number. Always starts at 0. + * + * \param[in] send_states Channel of snapshots of the database state to receive + * from process_txns. + */ +void +recover_joiner(st_netfd_t listener, + st_channel<recovery_t> &send_states) +{ + cout << "waiting for joiner" << endl; + recovery_t recovery; + st_netfd_t joiner; + if (ship_log) { + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + st_closing closing(joiner); + recreq r; + st_read(joiner, r); + commons::array<char> wbuf(buf_size); + writer writer(lambda(const void *buf, size_t len) { + st_write(__ref(joiner), buf, len); + }, wbuf.get(), wbuf.size()); + cout << "got joiner's connection, sending log from seqnos " + << r.start_seqno << " to " << r.end_seqno << endl; + + g_twal->flush(); + sync(); + ifstream inf("twal"); + long long start_time = current_time_millis(); + for (int seqno = 0; seqno < r.start_seqno; ++seqno) { + ASSERT(inf.good()); + inf.seekg(readlen(inf), ios::cur); + } + long long mid_time = current_time_millis(); + streamoff mid_off = inf.tellg(); + showdatarate("scanned log", mid_off, mid_time - start_time); + for (int seqno = r.start_seqno; seqno < r.end_seqno; ++seqno) { + ASSERT(inf.good()); + uint32_t len = readlen(inf); + inf.read(writer.reserve(len), len); + writer.mark(); + cout << seqno << ' ' << len << endl; + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + writer.mark_and_flush(); + long long end_time = current_time_millis(); + streamoff end_off = inf.tellg(); + showdatarate("shipped log", end_off - mid_off, end_time - mid_time); + } else { + { + st_intr intr(stop_hub); + // Wait for the snapshot. + recovery = send_states.take(); + if (recovery == nullptr) { + return; + } + // Wait for the new joiner. + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + + st_closing closing(joiner); + cout << "got joiner's connection, sending recovery of " + << recovery.size() << " bytes" << endl; + long long start_time = current_time_millis(); + st_write(joiner, recovery.get(), recovery.size()); + long long diff = current_time_millis() - start_time; + showdatarate("sent recovery", recovery.size(), diff); + } +} + +void +threadfunc() +{ + while (true) { + sleep(3); + cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; + } +} Deleted: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/main.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,313 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <boost/tuple/tuple.hpp> -#include <commons/st/intr.h> -#include <commons/st/sync.h> -#include <commons/st/channel.h> -#include <fstream> // ofstream -#include <vector> -#include "util.hh" -#include "setprefs.h" - -namespace boost { namespace archive { class binary_oarchive; } } - -using namespace boost; -using namespace boost::archive; -using namespace commons; -using namespace std; -using namespace ydb; -using namespace ydb::msg; -#end - -#src -#include "unsetprefs.h" -#include <boost/foreach.hpp> -#include <boost/archive/binary_oarchive.hpp> -#include <commons/assert.h> -#include <commons/time.h> -#include <commons/st/io.h> -#include <commons/st/sockets.h> -#include <iostream> -#include <unistd.h> // pipe, write, sync -#include "tpcc/tpcctables.h" -#include "msg.h" -#include "setprefs.h" -#end - -typedef tuple<sized_array<char>, char*, char*> chunk; - -typedef commons::array<char> recovery_t; - - -// Configuration. -st_utime_t timeout; -int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size, handle_responses_display, fail_seqno, - catch_up_display, issue_display, nwarehouses, - process_display; -size_t accept_joiner_size, read_buf_size; -bool yield_during_build_up, yield_during_catch_up, dump, show_updates, - count_updates, stop_on_recovery, general_txns, - disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, - suppress_txn_msgs, force_ser, fake_exec, ship_log; -long long timelim, read_thresh; - -// Control. -st_intr_bool stop_hub, kill_hub; -st_bool do_pause; -// On leader, signifies that a node is in fail mode. On replica, signifies that a node is in fail mode/recovering from the twal. -st_bool failed; -// The seqno on which we should resume. -st_channel<int> resume; -bool stopped_issuing; - -// Statistics. -int updates; - -/** - * Used by the leader to bookkeep information about replicas. - */ -class replica_info -{ - public: - /** port is the replica's listen port, not the port bound to the fd socket. */ - replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} - st_netfd_t fd() const { return fd_; } - /** The port on which the replica is listening. */ - uint16_t port() const { return port_; } - /** The port on which the replica connected to us. */ - uint16_t local_port() const { return sockaddr().sin_port; } - uint32_t host() const { return sockaddr().sin_addr.s_addr; } - sockaddr_in sockaddr() const { sockaddr_in sa; sockaddr(sa); return sa; } - void sockaddr(sockaddr_in &sa) const { - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd_), - reinterpret_cast<struct sockaddr*>(&sa), - &salen)); - } - private: - st_netfd_t fd_; - uint16_t port_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all_infos -{ - public: - st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all_infos() { - cout << "closing all conns to replicas (replica_infos)" << endl; - foreach (replica_info r, rs_) - check0x(st_netfd_close(r.fd())); - } - private: - const vector<replica_info> &rs_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all -{ - public: - st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} - ~st_closing_all() { - foreach (st_netfd_t r, rs_) - check0x(st_netfd_close(r)); - } - private: - const vector<st_netfd_t> &rs_; -}; - -#if 0 -st_channel<pair<st_netfd_t, shared_ptr<string> > > msgs; - -/** - * The worker that performs the actual broadcasting. - */ -void -bcaster() -{ - while (!kill_hub) { - pair<st_netfd_t, shared_ptr<string> > pr; - { - st_intr intr(kill_hub); - pr = msgs.take(); - } - st_netfd_t dst = pr.first; - shared_ptr<string> &p = pr.second; - if (p.get() == nullptr) break; - string &s = *p.get(); - - if (!fake_bcast) - st_timed_write(dst, s.data(), s.size()); - } -} - -/** - * Asynchronous version of the broadcaster. - */ -void -bcastbuf_async(const vector<st_netfd_t> &dsts, const ser_t &msg) -{ - shared_ptr<string> p(new string); - ser(*p.get(), msg); - foreach (st_netfd_t dst, dsts) msgs.push(make_pair(dst, p)); -} -#endif - -enum { op_del, op_write, op_commit }; - -/** - * ARIES write-ahead log. No undo logging necessary (no steal). - */ -class wal -{ -public: - wal(const string &fname) : - of_(fname.c_str()), - ar_(new binary_oarchive(of())) - {} - ~wal() { delete ar_; } - template <typename T> - void log(const T &msg) { ser(of(), msg); } - void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } - void logbuf(const void *buf, size_t len) { - of().write(reinterpret_cast<const char*>(buf), len); - } - void logdel(int key) { - int op = op_del; // TODO: is this really necessary? - ar() & op & key; - } - void logwrite(int key, int val) { - int op = op_write; - ar() & op & key & val; - } - void logcommit() { - int op = op_commit; - ar() & op; - } - void flush() { of().flush(); } -private: - ofstream of_; - //unique_ptr<binary_oarchive> ar_; - binary_oarchive *ar_; - ofstream &of() { return of_; } - binary_oarchive &ar() { return *ar_; }; -}; - -// TODO? -class txn_wal { -public: - txn_wal(const string &fname) : of(fname.c_str()) {} - void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } - void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); - } - void flush() { of.flush(); } -private: - ofstream of; -}; - -// Globals -wal *g_wal; -txn_wal *g_twal; -//tpcc_wal *g_tpcc_wal; - -struct recreq { - int start_seqno, end_seqno; -}; - -/** - * Help the recovering node. - * - * \param[in] listener The connection on which we're listening for connections - * from recovering joiners. - * - * \param[in] map The database state. - * - * \param[in] seqno The sequence number. Always starts at 0. - * - * \param[in] send_states Channel of snapshots of the database state to receive - * from process_txns. - */ -void -recover_joiner(st_netfd_t listener, - st_channel<recovery_t> &send_states) -{ - cout << "waiting for joiner" << endl; - recovery_t recovery; - st_netfd_t joiner; - if (ship_log) { - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - st_closing closing(joiner); - recreq r; - st_read(joiner, r); - commons::array<char> wbuf(buf_size); - writer writer(lambda(const void *buf, size_t len) { - st_write(__ref(joiner), buf, len); - }, wbuf.get(), wbuf.size()); - cout << "got joiner's connection, sending log from seqnos " - << r.start_seqno << " to " << r.end_seqno << endl; - - g_twal->flush(); - sync(); - ifstream inf("twal"); - long long start_time = current_time_millis(); - for (int seqno = 0; seqno < r.start_seqno; ++seqno) { - ASSERT(inf.good()); - inf.seekg(readlen(inf), ios::cur); - } - long long mid_time = current_time_millis(); - streamoff mid_off = inf.tellg(); - showdatarate("scanned log", mid_off, mid_time - start_time); - for (int seqno = r.start_seqno; seqno < r.end_seqno; ++seqno) { - ASSERT(inf.good()); - uint32_t len = readlen(inf); - inf.read(writer.reserve(len), len); - writer.mark(); - cout << seqno << ' ' << len << endl; - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - writer.mark_and_flush(); - long long end_time = current_time_millis(); - streamoff end_off = inf.tellg(); - showdatarate("shipped log", end_off - mid_off, end_time - mid_time); - } else { - { - st_intr intr(stop_hub); - // Wait for the snapshot. - recovery = send_states.take(); - if (recovery == nullptr) { - return; - } - // Wait for the new joiner. - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - - st_closing closing(joiner); - cout << "got joiner's connection, sending recovery of " - << recovery.size() << " bytes" << endl; - long long start_time = current_time_millis(); - st_write(joiner, recovery.get(), recovery.size()); - long long diff = current_time_millis() - start_time; - showdatarate("sent recovery", recovery.size(), diff); - } -} - -void -threadfunc() -{ - while (true) { - sleep(3); - cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; - } -} Modified: ydb/trunk/src/mkdeps.py =================================================================== --- ydb/trunk/src/mkdeps.py 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/mkdeps.py 2009-03-24 09:11:22 UTC (rev 1332) @@ -24,7 +24,7 @@ @memoized def src(i): if i.endswith('.hh'): - clamp = path(i[:-3] + '.lzz.clamp') + clamp = path(i[:-3] + '.clamp.lzz') lzz = path(i[:-2] + '.lzz') if clamp.isfile(): return clamp if lzz.isfile(): return lzz @@ -39,8 +39,8 @@ for dep in deps(src(hdr)): yield dep -for i in pwd.glob('*.lzz.clamp'): - print sub(r'\.lzz\.clamp', '.o', i), ':', sub(r'\.lzz\.clamp', '.hh', i), ' '.join(deps(i)) +for i in pwd.glob('*.clamp.lzz'): + print sub(r'\.clamp\.lzz', '.o', i), ':', sub(r'\.clamp\.lzz', '.hh', i), ' '.join(deps(i)) for i in pwd.glob('*.d'): with file(i) as f: @@ -52,4 +52,4 @@ elif '_cc_lambda_' in word: print sub(r'(\.clamp/(.+)_cc_lambda_.+\.clamp_h)', r'\1: \2.cc.clamp', word) else: - print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.lzz.clamp', word) + print sub(r'(\.clamp/(.+)_lambda_.+\.clamp_h)', r'\1: \2.clamp.lzz', word) Copied: ydb/trunk/src/rectpcc.clamp.lzz (from rev 1331, ydb/trunk/src/rectpcc.lzz.clamp) =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz (rev 0) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,175 @@ +#hdr +#include "tpcc.hh" +namespace ydb { namespace pb { class Init; } } +using namespace ydb::pb; +#end + +#src +#include "unsetprefs.h" +#include <commons/time.h> +#include <commons/st/io.h> +#include <commons/st/threads.h> +#include <commons/st/reader.h> +#include "tpcc/tpcctables.h" +#include "ydb.pb.h" +#include "setprefs.h" +#end + +void +rec_tpcc(int &seqno, int mypos, const Init &init, + const vector<st_netfd_t> &replicas, recovery_t &orig, + st_channel<chunk> &backlog) +{ + commons::array<char> recarr(0); + + function<void()> rec_twal_fn = lambda() { + int &seqno = __ref(seqno); + cout << "recovering from twal" << endl; + long long start_time = current_time_millis(); + g_twal->flush(); + sync(); + ifstream inf("twal"); + TpccReq req; + while (inf.peek() != ifstream::traits_type::eof()) { + ASSERT(inf.good()); + readmsg(inf, req); + process_tpcc(req, seqno, nullptr); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + showdatarate("recovered from twal", inf.tellg(), + current_time_millis() - start_time); + cout << "now at seqno " << seqno << endl; + }; + + function<void()> recv_log_fn = lambda() { + st_netfd_t src = __ref(replicas[0]); + int &seqno = __ref(seqno); + ASSERT(fail_seqno == seqno); + recreq r = { fail_seqno + 1, resume.take() }; + st_write(src, r); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + function<void(anchored_stream_reader &reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + shift_reader(reader); + }; + anchored_stream_reader reader(st_read_fn(src), + st_read_fully_fn(src), + overflow_fn, rbuf.get(), rbuf.size()); + TpccReq req; + while (seqno < r.end_seqno) { + { st_intr intr(stop_hub); readmsg(reader, req); } + process_tpcc(req, seqno, nullptr); + reader.set_anchor(); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + }; + + if (rec_twal) { + failed.waitset(); + g_tables.reset(new TPCCTables); + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); + commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), + orig.size() - sizeof(tpcc_recovery_header)); + g_tables->deser(mypos, init.node_size(), hdr, body); + body.release(); + rec_twal_fn(); + failed.reset(); + recv_log_fn(); + } + +#if 0 + st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); + st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); + + st_join(rec_twal_thread); + st_join(recv_log_thread); +#endif + + if (rec_pwal) { + // Recover from phy log. + } else if (rec_twal) { + // Recover from txn log. + } else { + + g_tables.reset(new TPCCTables); + + // + // Build-up + // + + if (ship_log) { + } else { + // XXX indent + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); +} Property changes on: ydb/trunk/src/rectpcc.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/rectpcc.lzz.clamp =================================================================== --- ydb/trunk/src/rectpcc.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/rectpcc.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,175 +0,0 @@ -#hdr -#include "tpcc.hh" -namespace ydb { namespace pb { class Init; } } -using namespace ydb::pb; -#end - -#src -#include "unsetprefs.h" -#include <commons/time.h> -#include <commons/st/io.h> -#include <commons/st/threads.h> -#include <commons/st/reader.h> -#include "tpcc/tpcctables.h" -#include "ydb.pb.h" -#include "setprefs.h" -#end - -void -rec_tpcc(int &seqno, int mypos, const Init &init, - const vector<st_netfd_t> &replicas, recovery_t &orig, - st_channel<chunk> &backlog) -{ - commons::array<char> recarr(0); - - function<void()> rec_twal_fn = lambda() { - int &seqno = __ref(seqno); - cout << "recovering from twal" << endl; - long long start_time = current_time_millis(); - g_twal->flush(); - sync(); - ifstream inf("twal"); - TpccReq req; - while (inf.peek() != ifstream::traits_type::eof()) { - ASSERT(inf.good()); - readmsg(inf, req); - process_tpcc(req, seqno, nullptr); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - showdatarate("recovered from twal", inf.tellg(), - current_time_millis() - start_time); - cout << "now at seqno " << seqno << endl; - }; - - function<void()> recv_log_fn = lambda() { - st_netfd_t src = __ref(replicas[0]); - int &seqno = __ref(seqno); - ASSERT(fail_seqno == seqno); - recreq r = { fail_seqno + 1, resume.take() }; - st_write(src, r); - sized_array<char> rbuf(new char[read_buf_size], read_buf_size); - function<void(anchored_stream_reader &reader)> overflow_fn = - lambda(anchored_stream_reader &reader) { - shift_reader(reader); - }; - anchored_stream_reader reader(st_read_fn(src), - st_read_fully_fn(src), - overflow_fn, rbuf.get(), rbuf.size()); - TpccReq req; - while (seqno < r.end_seqno) { - { st_intr intr(stop_hub); readmsg(reader, req); } - process_tpcc(req, seqno, nullptr); - reader.set_anchor(); - if (check_interval(seqno, yield_interval)) st_sleep(0); - } - }; - - if (rec_twal) { - failed.waitset(); - g_tables.reset(new TPCCTables); - tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); - commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), - orig.size() - sizeof(tpcc_recovery_header)); - g_tables->deser(mypos, init.node_size(), hdr, body); - body.release(); - rec_twal_fn(); - failed.reset(); - recv_log_fn(); - } - -#if 0 - st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); - st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); - - st_join(rec_twal_thread); - st_join(recv_log_thread); -#endif - - if (rec_pwal) { - // Recover from phy log. - } else if (rec_twal) { - // Recover from txn log. - } else { - - g_tables.reset(new TPCCTables); - - // - // Build-up - // - - if (ship_log) { - } else { - // XXX indent - - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - tpcc_recovery_header hdr; - checkeqnneg(st_read_fully(__ref(replicas[i]), - &hdr, sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof hdr)); - check(hdr.seqno >= 0); - - cout << "receiving recovery of " << hdr.len << " bytes" << endl; - - long long start_time = current_time_millis(); - __ref(recarr).reset(new char[hdr.len], hdr.len); - checkeqnneg(st_read_fully(__ref(replicas[i]), - __ref(recarr).get(), hdr.len, - ST_UTIME_NO_TIMEOUT), - ssize_t(hdr.len)); - - long long before_deser = current_time_millis(); - showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); - - if (__ref(seqno) == -1) - __ref(seqno) = hdr.seqno; - else - checkeq(__ref(seqno), hdr.seqno); - - g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); - - long long end_time = current_time_millis(); - showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); - cout << "receive & deserialize took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; - cout << "after deserialize, db state is now at seqno " - << hdr.seqno << ":" << endl; - g_tables->show(); - - }, "recovery_builder" + lexical_cast<string>(i))); - } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - - } - } - - // - // Catch-up - // - - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - TpccReq req; - while (!backlog.empty()) { - chunk chunk = backlog.take(); - cout << "took from backlog, now has " << backlog.queue().size() - << " chunks" << endl; - sized_array<char> &buf = chunk.get<0>(); - char *begin = chunk.get<1>(), *end = chunk.get<2>(); - ASSERT(buf.get() <= begin && begin < buf.end()); - ASSERT(buf.get() < end && end < buf.end()); - process_buf(begin, end, req, seqno); - } - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); -} Copied: ydb/trunk/src/replica.clamp.lzz (from rev 1331, ydb/trunk/src/replica.lzz.clamp) =================================================================== --- ydb/trunk/src/replica.clamp.lzz (rev 0) +++ ydb/trunk/src/replica.clamp.lzz 2009-03-24 09:11:22 UTC (rev 1332) @@ -0,0 +1,362 @@ +#hdr +#include "unsetprefs.h" +#include <string> +#end + +#src +#include "unsetprefs.h" +#include <boost/archive/binary_iarchive.hpp> +#include <commons/st/sockets.h> +#include <commons/st/threads.h> +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" +#include "rectpcc.hh" +#include "run.hh" +#include "stxn.hh" +#include "tpcc.hh" +#end + +/** + * Run a replica. + */ +void +run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) +{ + if (disk) { + // Disk IO threads. + for (int i = 0; i < 5; ++i) { + //thread somethread(threadfunc); + } + } + + // Initialize database state. + int seqno = -1; + mii &map = g_map; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + tables->show(); + } + recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); + + finally f(bind(summarize, "REPLICA", ref(seqno))); + st_channel<recovery_t> send_states; + + cout << "starting as replica on port " << listen_port << endl; + + // Listen for connections from other replicas. + st_netfd_t listener = st_tcp_listen(listen_port); + + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + { + st_intr intr(stop_hub); + readmsg(leader, init); + } + uint32_t listen_host = init.yourhost(); + multirecover = init.multirecover(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); + int mypos = -1; + for (int i = 0; i < init.node_size(); ++i) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (is_self) mypos = i; + if (!is_self && (init.txnseqno() > 0 || rec_twal)) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); + } + } + + // Initialize physical or txn log. + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + + // Process txns. + st_channel<chunk> backlog; + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns, leader, ref(map), ref(seqno), + ref(send_states), ref(backlog), init.txnseqno(), mypos, + init.node_size()); + st_joining join_proc(my_spawn(process_fn, "process_txns")); + st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? + my_spawn(bind(recover_joiner, listener, ref(send_states)), + "recover_joiner") : + nullptr); + + try { + // If there's anything to recover. + if (init.txnseqno() > 0 || fail_seqno > 0) { + if (do_tpcc) { + + rec_tpcc(seqno, mypos, init, replicas, orig, backlog); + + } else { + + // + // Simple txns + // + + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; +#if 0 + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); + } + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; +#endif + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } + } + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn(map, txn, seqno); + if (fake_exec && !use_pb) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } + } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); + } + } + g_caught_up = true; +#if 0 + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } + } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } + } + } catch (std::exception &ex) { + cerr_thread_ex(ex) << endl; + throw; + } + + stop_hub.insert(st_thread_self()); +} Property changes on: ydb/trunk/src/replica.clamp.lzz ___________________________________________________________________ Added: svn:mergeinfo + Deleted: ydb/trunk/src/replica.lzz.clamp =================================================================== --- ydb/trunk/src/replica.lzz.clamp 2009-03-24 07:54:47 UTC (rev 1331) +++ ydb/trunk/src/replica.lzz.clamp 2009-03-24 09:11:22 UTC (rev 1332) @@ -1,362 +0,0 @@ -#hdr -#include "unsetprefs.h" -#include <string> -#end - -#src -#include "unsetprefs.h" -#include <boost/archive/binary_iarchive.hpp> -#include <commons/st/sockets.h> -#include <commons/st/threads.h> -#include "tpcc/clock.h" -#include "tpcc/randomgenerator.h" -#include "tpcc/tpccclient.h" -#include "tpcc/tpccgenerator.h" -#include "tpcc/tpcctables.h" -#include "rectpcc.hh" -#include "run.hh" -#include "stxn.hh" -#include "tpcc.hh" -#end - -/** - * Run a replica. - */ -void -run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - if (disk) { - // Disk IO threads. - ... [truncated message content] |