[Assorted-commits] SF.net SVN: assorted:[1081] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-11-30 23:46:33
|
Revision: 1081 http://assorted.svn.sourceforge.net/assorted/?rev=1081&view=rev Author: yangzhang Date: 2008-11-30 23:46:31 +0000 (Sun, 30 Nov 2008) Log Message: ----------- - added simplest state-sending recovery - verifiably produce (dump) the same state on both machines - general clean up - filled out the README Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/README ydb/trunk/publish.bash Removed Paths: ------------- ydb/trunk/src/ydb.thrift Added: ydb/trunk/README =================================================================== --- ydb/trunk/README (rev 0) +++ ydb/trunk/README 2008-11-30 23:46:31 UTC (rev 1081) @@ -0,0 +1,120 @@ +Overview +-------- + +YDB (Yang's Database) is a simple replicated memory store, developed for the +purpose of researching various approaches to recovery in such OLTP-optimized +databases as [VOLTDB] (formerly H-Store/Horizontica). + +[VOLTDB]: http://db.cs.yale.edu/hstore/ + +Currently, the only recovery implemented mechanism is to have one of the +replicas serialize the entire database state and send that to the joining node. + +If you start a system of $n$ replicas, then the leader will wait for $n-1$ of +them to join before it starts issuing transactions. (Think of $n-1$ as the +minimum number of replicas the system requires before it is willing to process +transactions.) Then when replica $n$ joins, it will need to catch up to the +current state of the system, and it will do so by contacting one of the other +replicas and requesting a complete dump of its DB state. + +The leader will report the current txn seqno to the joiner, and start streaming +txns beyond that seqno to the joiner, which the joiner will push onto its +backlog. It will also instruct the standing replicas to snapshot and send +their DB state at this txn seqno. As a result, the standing replicas will +pause once they get this message until they can send their state to the joiner. + +Setup +----- + +Requirements: + +- [boost] 1.35 +- [C++ Commons] svn r1074 +- [GCC] 4.3.2 +- [Lazy C++] 2.8.0 +- [Protocol Buffers] 2.0.0 +- [State Threads] 1.8 + +[boost]: http://www.boost.org/ +[C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ +[GCC]: http://gcc.gnu.org/ +[Lazy C++]: http://www.lazycplusplus.com/ +[Protocol Buffers]: http://code.google.com/p/protobuf/ +[State Threads]: http://state-threads.sourceforge.net/ + +Usage +----- + +To start a leader to manage 2 replicas, run: + + ./ydb 2 + +This will listen on port 7654. Then to start a replica, run: + + ./ydb localhost 7654 7655 + +This means "connect to the leader at localhost:7654, and listen on port 7655." +The replicas have to listen for connections from other replicas. + +Currently handles only 2 replicas. + +Pseudo-code +----------- + +### Leader + + foreach event + if event == departure + remove replica + if event == join + add replica + send init msg to new replica + who else is in the system + which txn we're on + start sending txns to new replica + start handling responses from new replica + read responses up till the current seqno + +### Replica + + start listening for conns from new replicas + generate recovery msg from map + send recovery msg to new replica + send join msg to leader + recv init msg from leader + start recving txns from leader + if map is caught up + apply txn directly + else + push onto backlog + foreach replica + connect to replica + recv recovery msg from replica + apply the state + apply backlog + +Todo +---- + +- Add benchmarking information, e.g.: + - txns/second normally + - txns/second during recovery + - time to recover + - bytes used to recover + +- Run some benchmarks, esp. on multiple physical hosts. + +- Figure out why things are running so slowly with >2 replicas. + +- Add a variant of the recovery scheme so that the standing replicas can just + send any snapshot of their DB beyond a certain seqno. The joiner can simply + discard from its leader-populated backlog any txns before the seqno of the + actual state it receives. This way, no communication between the leader and + the standing replicas needs to take place, and the replicas don't need to + wait for the new guy to join before they can continue processing txns. + +- Add a recovery scheme to recover from multiple replicas simultaneously. + +- Add richer transactions/queries/operations. + +- Add disk-based recovery methods. Copied: ydb/trunk/publish.bash (from rev 1067, hash-join/trunk/publish.bash) =================================================================== --- ydb/trunk/publish.bash (rev 0) +++ ydb/trunk/publish.bash 2008-11-30 23:46:31 UTC (rev 1081) @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +fullname='YDB' +version=0.1 +license=gpl3 +websrcs=( README ) +rels=( src-tgz: ) +nodl=true +. assorted.bash "$@" Property changes on: ydb/trunk/publish.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/Makefile 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,4 +1,5 @@ TARGET := ydb +WTF := wtf LZZS := $(wildcard *.lzz) LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) @@ -31,10 +32,10 @@ $(CXX) -o $@ $^ $(LDFLAGS) %.o: %.cc $(PBHDRS) - wtf $(CXX) $(CXXFLAGS) -c -o $@ $< + $(WTF) $(CXX) $(CXXFLAGS) -c -o $@ $< %.o: %.pb.cc %.pb.h - wtf $(CXX) $(PBCXXFLAGS) -c -o $@ $< + $(WTF) $(CXX) $(PBCXXFLAGS) -c -o $@ $< %.cc: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/main.lzz 2008-11-30 23:46:31 UTC (rev 1081) @@ -6,13 +6,20 @@ #hdr #include <boost/bind.hpp> #include <boost/foreach.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/scoped_array.hpp> #include <commons/nullptr.h> #include <commons/st/st.h> +#include <csignal> #include <cstdio> #include <cstdlib> #include <iostream> +#include <fstream> #include <map> +#include <set> #include <sstream> +#include <sys/types.h> +#include <unistd.h> #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH @@ -21,11 +28,123 @@ using namespace std; #end -extern int chkpt = 1000; +typedef pair<int, int> pii; + +// Why does just timeout require the `extern`? extern const st_utime_t timeout = 1000000; -extern const bool verbose = false; +const int chkpt = 1000; +const bool verbose = true; +const uint16_t base_port = 7654; +st_intr_bool stop_hub, kill_hub; /** + * The list of threads. + */ +set<st_thread_t> threads; + +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. + */ +void +my_spawn_helper(const function0<void> f, bool intr) +{ + thread_eraser eraser; + try { + f(); + } catch (const exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + if (intr) stop_hub.set(); + } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + */ +st_thread_t +my_spawn(const function0<void> &f, bool intr = true) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); + threads.insert(t); + return t; +} + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } +#hdr +#define GETSA sockaddr_in sa; sockaddr(sa); return sa +#end + /** The port on which the replica connected to us. */ + uint16_t local_port() const { GETSA.sin_port; } + uint32_t host() const { GETSA.sin_addr.s_addr; } + sockaddr_in sockaddr() const { GETSA; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII for dumping the final state of the DB to a file on disk. + */ +class dump_state +{ + public: + dump_state(const map<int, int> &map, const int &seqno) + : map_(map), seqno_(seqno) {} + ~dump_state() { + stringstream fname; + fname << "/tmp/ydb" << getpid(); + cout << "dumping DB state (" << seqno_ << ") to " << fname.str() << endl; + ofstream of(fname.str().c_str()); + of << "seqno: " << seqno_ << endl; + foreach (const pii &p, map_) { + of << p.first << ": " << p.second << endl; + } + } + private: + const map<int, int> &map_; + const int &seqno_; +}; + +/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -44,7 +163,7 @@ foreach (st_netfd_t dst, dsts) { checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), static_cast<int>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), timeout), + checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), static_cast<int>(s.size())); } } @@ -52,7 +171,7 @@ /** * Send a message to a single recipient. */ -template<typename T> +template<typename T> void sendmsg(st_netfd_t dst, const T &msg) { @@ -65,19 +184,28 @@ */ template <typename T> void -readmsg(st_netfd_t src, T & msg) +readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { // Read the message length. uint32_t len; checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - ST_UTIME_NO_TIMEOUT), + timeout), static_cast<int>(sizeof len)); len = ntohl(len); +#define GETMSG(buf) \ + checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ + check(msg.ParseFromArray(buf, len)); + // Parse the message body. - char buf[len]; - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); - check(msg.ParseFromArray(buf, len)); + if (len < 4096) { + char buf[len]; + GETMSG(buf); + } else { + cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + scoped_array<char> buf(new char[len]); + GETMSG(buf.get()); + } } inline int @@ -90,239 +218,403 @@ * Keep issuing transactions to the replicas. */ void -issue_txns(const vector<st_netfd_t> &replicas) +issue_txns(st_channel<replica_info> &newreps, int &seqno) { Op_OpType types[] = {Op::read, Op::write, Op::del}; - size_t lastsize = replicas.size(); - cout << "replicas = " << &replicas << endl; - int i = 0; - while (true) { - if (replicas.size() != lastsize) { - cout << "size changed from " << lastsize << " to " << replicas.size() - << endl; - lastsize = replicas.size(); + vector<st_netfd_t> fds; + + while (!stop_hub) { + // Did we get a new member? + while (!newreps.empty()) { + if (seqno > 0) { + Txn txn; + bcastmsg(fds, txn); + } + fds.push_back(newreps.take().fd()); } + // Generate a random transaction. Txn txn; + txn.set_seqno(seqno++); int count = rand32(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); + int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); } - bcastmsg(replicas, txn); - if (++i % chkpt == 0) { - if (verbose) cout << "issued txn " << i << endl; + + // Broadcast. + bcastmsg(fds, txn); + + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) cout << "issued txn " << txn.seqno() << endl; st_sleep(0); } } } /** - * Keep swallowing replica responses. + * Process a transaction: update DB state (incl. seqno) and send response to + * leader. */ void -handle_responses(st_netfd_t replica) +process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno) { - int i = 0; - while (true) { - Response res; - readmsg(replica, res); - if (++i % chkpt == 0) { - if (verbose) - cout << "got response " << i << " from " << replica << " of size " - << res.result_size() << endl; - st_sleep(0); + checkeq(txn.seqno(), seqno + 1); + Response res; + res.set_seqno(txn.seqno()); + seqno = txn.seqno(); + for (int o = 0; o < txn.op_size(); o++) { + const Op &op = txn.op(o); + switch (op.type()) { + case Op::read: + res.add_result(map[op.key()]); + break; + case Op::write: + map[op.key()] = op.value(); + break; + case Op::del: + map.erase(op.key()); + break; } } + sendmsg(leader, res); } /** * Actually do the work of executing a transaction and sending back the reply. */ void -process_txns(st_netfd_t leader, map<int, int> &map) +process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, + st_bool &send_state, st_bool &sent_state, + st_channel<Txn*> &backlog) { - int i = 0; while (true) { Txn txn; - readmsg(leader, txn); + //{ + //st_intr intr(stop_hub); + readmsg(leader, txn); + //} - Response res; - for (int o = 0; o < txn.op_size(); o++) { - const Op &op = txn.op(o); - switch (op.type()) { - case Op::read: - res.add_result(map[op.key()]); - break; - case Op::write: - map[op.key()] = op.value(); - break; - case Op::del: - map.erase(op.key()); - break; + if (txn.has_seqno()) { + if (txn.seqno() == seqno + 1) { + process_txn(leader, map, txn, seqno); + } else { + // Queue up for later processing once a snapshot has been received. + backlog.push(new Txn(txn)); } + } else { + // Wait for the snapshot to be generated. + send_state.set(); + cout << "waiting for state to be sent" << endl; + sent_state.waitset(); + sent_state.reset(); } - sendmsg(leader, res); - if (++i % chkpt == 0) { - if (verbose) cout << "processed txn " << i << endl; + if (txn.seqno() % chkpt == 0) { + if (verbose) cout << "processed txn " << txn.seqno() << endl; st_sleep(0); } } } /** + * Keep swallowing replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno) +{ + while (true) { + Response res; + { + st_intr intr(kill_hub); + readmsg(replica, res); + } + if (res.seqno() % chkpt == 0) { + if (verbose) + cout << "got response " << res.seqno() << " from " << replica << endl; + st_sleep(0); + } + if (stop_hub && res.seqno() == seqno - 1) { + cout << "seqno = " << seqno - 1 << endl; + break; + } + } +} + +/** * Help the recovering node. */ void -recover_joiner(st_netfd_t listener, const map<int, int> &map) +recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, + st_bool &send_state, st_bool &sent_state) { - st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - cout << "got the joiner! " << joiner << endl; + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + st_closing closing(joiner); + cout << "got recoverer's connection" << endl; + + // Wait for the right time to generate the snapshot. + send_state.waitset(); + send_state.reset(); + + cout << "snapshotting state for recovery" << endl; Recovery recovery; - typedef pair<int, int> pii; - foreach (pii p, map) { + foreach (const pii &p, map) { Recovery_Pair *pair = recovery.add_pair(); pair->set_key(p.first); pair->set_value(p.second); } + recovery.set_seqno(seqno); + + // Notify process_txns that it may continue processing. + sent_state.set(); + + cout << "sending recovery" << endl; sendmsg(joiner, recovery); + cout << "sent" << endl; } -int -main(int argc, char **argv) +/** + * Run the leader. + */ +void +run_leader(int nreps) { - check0x(st_init()); - if (argc < 2) - die("leader: ydb <nreplicas>\n" - "replica: ydb <leaderhost> <leaderport> <listenport>\n" - "joiner: ydb <leaderhost> <leaderport>\n"); - bool is_leader = argc == 2; - bool is_joiner = argc == 3; - if (is_leader) { - cout << "starting as leader" << endl; + cout << "starting as leader" << endl; - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(7654); - vector<st_netfd_t> replicas; - for (int i = 1; i < atoi(argv[1]); i++) { - replicas.push_back(checkpass( - st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT))); + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(base_port); + st_closing close_listener(listener); + // TODO rename these + int min_reps = nreps - 1; + vector<replica_info> replicas; + st_closing_all close_replicas(replicas); + for (int i = 0; i < min_reps; i++) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); } + Join join; + readmsg(fd, join); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } - // Construct the initialization message. - Init init; - foreach (st_netfd_t r, replicas) { - // Get socket addresses. + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } - sockaddr_in sa; - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(r), reinterpret_cast<sockaddr*>(&sa), &salen)); + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } - SockAddr *psa = init.add_node(); - psa->set_host(sa.sin_addr.s_addr); - psa->set_port(sa.sin_port); - } + // Start dispatching queries. + int seqno = 0; + st_channel<replica_info> newreps; + const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); + st_thread_t swallower = my_spawn(bind(swallow, f)); + foreach (const replica_info &r, replicas) newreps.push(r); + st_joining join_swallower(swallower); - bcastmsg(replicas, init); + // Start handling responses. + st_thread_group handlers; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno)))); + } - // Start dispatching queries. - const function0<void> f = bind(issue_txns, ref(replicas)); - st_thread_t t = st_spawn(bind(swallow, f)); + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join; + readmsg(joiner, join); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); - // Start handling responses. - vector<st_thread_t> handlers(replicas.size()); - foreach (st_netfd_t r, replicas) { - handlers.push_back(st_spawn(bind(handle_responses, r))); - } + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno)))); +} - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - sendmsg(joiner, init); +/** + * Run a replica. + */ +void +run_replica(char *leader_host, uint16_t leader_port, uint16_t listen_port) +{ + // Initialize database state. + map<int, int> map; + int seqno = -1; + dump_state ds(map, seqno); + st_bool send_state, sent_state; - // Bring the new guy "back" into action. - Ready ready; - readmsg(joiner, ready); - cout << "the prodigal son has returned" << endl; - cout << "replicas = " << &replicas << endl; - replicas.push_back(joiner); - handlers.push_back(st_spawn(bind(handle_responses, joiner))); + cout << "starting as replica" << endl; - // Wait on other threads. - check0x(st_thread_join(t, nullptr)); + // Listen for connections from other replicas. + st_netfd_t listener = + st_tcp_listen(listen_port); + st_thread_t rec = my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_state), + ref(sent_state))); - // Cleanly close all connections. - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host, leader_port, timeout); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + readmsg(leader, init); + uint32_t listen_host = init.yourhost(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + for (uint16_t i = 0; i < init.node_size(); i++) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (!is_self) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); } - check0x(st_netfd_close(listener)); - } else { - map<int, int> map; + } - // Connect to the leader. - char *host = argv[1]; - uint16_t port = static_cast<uint16_t>(atoi(argv[2])); + // Process txns. + st_channel<Txn*> backlog; + st_thread_t proc = my_spawn(bind(process_txns, leader, ref(map), ref(seqno), + ref(send_state), ref(sent_state), + ref(backlog))); - if (!is_joiner) { - // Listen for then talk to the joiner. - st_netfd_t listener = - st_tcp_listen(static_cast<uint16_t>(atoi(argv[3]))); - st_spawn(bind(recover_joiner, listener, ref(map))); + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery" << endl; + + // Read the recovery message. + Recovery recovery; + foreach (st_netfd_t r, replicas) { + readmsg(r, recovery); } + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered." << endl; - st_sleep(0); - cout << "here" << endl; - st_netfd_t leader = st_tcp_connect(host, port, timeout); - cout << "there" << endl; + while (!backlog.empty()) { + Txn *p = backlog.take(); + process_txn(leader, map, *p, seqno); + delete p; + } + cout << "caught up." << endl; + } - // Read the initialization message. - Init init; - readmsg(leader, init); + st_join(proc); + st_join(rec); + foreach (st_netfd_t r, replicas) { + check0x(st_netfd_close(r)); + } + check0x(st_netfd_close(leader)); +} - // Display the info. - cout << "hosts:" << endl; - vector<st_netfd_t> replicas; - for (uint16_t i = 0; i < init.node_size(); i++) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host; - host.s_addr = sa.host(); - cout << checkpass(inet_ntop(AF_INET, &host, buf, INET_ADDRSTRLEN)) - << ':' << sa.port() << endl; - if (is_joiner) - replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(7655+i), - timeout)); - } +int sig_pipe[2]; - if (is_joiner) { - // Read the recovery message. - Recovery recovery; - readmsg(replicas[0], recovery); - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); +/** + * Raw signal handler that triggers the (synchronous) handler. + */ +void handle_sig(int sig) { + int err = errno; + cerr << "got signal: " << sig << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), sizeof sig); + errno = err; +} + +/** + * Synchronous part of the signal handler; cleanly interrrupts any threads that + * have marked themselves as interruptible. + */ +void handle_sig_sync() { + stfd fd = checkerr(st_netfd_open(sig_pipe[0])); + while (true) { + int sig; + checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), + sizeof sig); + if (sig == SIGINT) { + if (!stop_hub) stop_hub.set(); + else kill_hub.set(); + } else if (sig == SIGTERM) { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); } - cout << "recovered." << endl; - - // Notify the leader. - Ready ready; - sendmsg(leader, ready); } + break; + } +} - // Process txns. - st_thread_t t = st_spawn(bind(process_txns, leader, ref(map))); - check0x(st_thread_join(t, nullptr)); +int +main(int argc, char **argv) +{ + try { + GOOGLE_PROTOBUF_VERIFY_VERSION; - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); + check0x(pipe(sig_pipe)); + struct sigaction sa; + sa.sa_handler = handle_sig; + check0x(sigemptyset(&sa.sa_mask)); + sa.sa_flags = 0; + check0x(sigaction(SIGINT, &sa, nullptr)); + + //check0x(st_set_eventsys(ST_EVENTSYS_ALT)); + check0x(st_init()); + thread_eraser eraser; + st_spawn(bind(handle_sig_sync)); + threads.insert(st_thread_self()); + if (argc != 2 && argc != 4) + die("leader: ydb <nreplicas>\n" + "replica: ydb <leaderhost> <leaderport> <listenport>\n"); + bool is_leader = argc == 2; + + if (is_leader) { + run_leader(atoi(argv[1])); + } else { + run_replica(argv[1], + static_cast<uint16_t>(atoi(argv[2])), + static_cast<uint16_t>(atoi(argv[3]))); } - check0x(st_netfd_close(leader)); + + return 0; + } catch (const exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + return 1; } - - return 0; } Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/ydb.proto 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,10 +1,34 @@ +option optimize_for = SPEED; + +// A socket address (host:port). message SockAddr { - required int32 host = 1; - required int32 port = 2; + required uint32 host = 1; + required uint32 port = 2; } + +// Join request sent from nodes to leader. +message Join { + // The port on which the joining replica will listen for connections. + required uint32 port = 1; +} + +// Initialization message sent to a nodes when it joins. message Init { - repeated SockAddr node = 1; + // The current seqno that the server is on. + required uint32 txnseqno = 1; + // What the leader perceives to be the joining replica's IP address. + required uint32 yourhost = 2; + // The nodes that have joined (including the joining node); the ports here + // are the ports on which the nodes are listening. + repeated SockAddr node = 3; } + +// Sent to already-joined nodes to inform them of a newly joining node. +message Joined { + required SockAddr node = 1; +} + +// A single operation in a transaction. message Op { enum OpType { read = 0; @@ -15,19 +39,35 @@ required int32 key = 2; optional int32 value = 3; } + +// A transaction. Currently just a simple sequence of Ops. message Txn { - repeated Op op = 1; + optional uint32 seqno = 1; + repeated Op op = 2; } + +// Response to a transaction, containing a list of results. message Response { - repeated int32 result = 1; + // The txn that this is a response for. + required uint32 seqno = 1; + // The list of answers to read operations. + repeated int32 result = 2; } -message Ready { - optional int32 ready = 1; -} + +// Message from a running node to a joining node to bring it up to speed. message Recovery { message Pair { required int32 key = 1; required int32 value = 2; } - repeated Pair pair = 1; + // The seqno that this recovery message will bring us up through (the last + // txn seqno before the snapshot was generated). + required uint32 seqno = 1; + // The data map. + repeated Pair pair = 2; } + +// Message from a joining node to the leader to inform it that it is fully back +// into action. +message Ready { +} Deleted: ydb/trunk/src/ydb.thrift =================================================================== --- ydb/trunk/src/ydb.thrift 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/ydb.thrift 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,9 +0,0 @@ -enum op_type { read, write, rm } -struct sock_addr { i32 host, i16 port } -struct init { list<sock_addr> node } -struct op { op_type type, i32 key, optional i32 value } -struct txn { list<op> op } -struct response { list<i32> results } -struct ready {} -struct pair { i32 key, i32 value } -struct recovery { list<pair> pairs } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |