[Assorted-commits] SF.net SVN: assorted:[1093] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-12-04 10:24:43
|
Revision: 1093 http://assorted.svn.sourceforge.net/assorted/?rev=1093&view=rev Author: yangzhang Date: 2008-12-04 10:24:36 +0000 (Thu, 04 Dec 2008) Log Message: ----------- - added command-line options - added a "higher-level" readmsg() that relies on RVO - fixed and lifted random number generator - cleaned up includes, RAII - updated doc Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/README 2008-12-04 10:24:36 UTC (rev 1093) @@ -10,12 +10,11 @@ Currently, the only recovery implemented mechanism is to have the first-joining replica serialize its entire database state and send that to the joining node. -If you start a system of $n$ replicas, then the leader will wait for $n-1$ of -them to join before it starts issuing transactions. (Think of $n-1$ as the -minimum number of replicas the system requires before it is willing to process -transactions.) Then when replica $n$ joins, it will need to catch up to the -current state of the system, and it will do so by contacting that first replica -and receiving a complete dump of its DB state. +If you start a system with a minimum of $n$ replicas, then the leader will wait +for that many to them to join before it starts issuing transactions. Then when +replica $n+1$ joins, it will need to catch up to the current state of the +system; it will do so by contacting the first-joining replica and receiving a +complete dump of its DB state. The leader will report the current txn seqno to the joiner, and start streaming txns beyond that seqno to the joiner, which the joiner will push onto its @@ -45,22 +44,23 @@ Usage ----- -To start a leader to manage 3 replicas, run: +To start a leader, run: - ./ydb 3 + ./ydb -l -This will listen on port 7654. Then to start the first two replicas, run: +Then to start the first two replicas, run: - ./ydb localhost 7654 7655 - ./ydb localhost 7654 7656 + ./ydb -p 7655 + ./ydb -p 7656 -This means "connect to the leader at localhost:7654, and listen on port 7655." -The replicas have to listen for connections from other replicas (namely the -recovering replica). +This means "connect to the leader at localhost:7654, and listen on port +7655/7656." The replicas have to listen for connections from other replicas +(namely the recovering replica). The leader waits for the minimum number +(default of 2) of replicas to join before beginning to issue transactions. The recovering replica then joins: - ./ydb localhost 7654 7657 + ./ydb -p 7657 It will connect to the first replica (on port 7655) and receive a DB dump from it. Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/src/Makefile 2008-12-04 10:24:36 UTC (rev 1093) @@ -19,8 +19,9 @@ SRCS := $(GENSRCS) OBJS := $(GENOBJS) -LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf -CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ +LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf \ + -lboost_program_options-gcc43-mt +CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/src/main.lzz 2008-12-04 10:24:36 UTC (rev 1093) @@ -1,22 +1,23 @@ #hdr #include <boost/bind.hpp> #include <boost/foreach.hpp> -#include <boost/lambda/lambda.hpp> +#include <boost/program_options.hpp> #include <boost/scoped_array.hpp> #include <commons/nullptr.h> +#include <commons/rand.h> #include <commons/st/st.h> #include <commons/time.h> -#include <csignal> +#include <csignal> // sigaction etc. #include <cstdio> -#include <cstdlib> -#include <cstring> +#include <cstring> // strsignal #include <iostream> #include <fstream> #include <map> +#include <netinet/in.h> // in_addr etc. #include <set> -#include <sstream> -#include <sys/types.h> -#include <unistd.h> +#include <sys/socket.h> // getpeername +#include <sys/types.h> // ssize_t +#include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH @@ -26,15 +27,11 @@ #end typedef pair<int, int> pii; - -// Why does just timeout require the `extern`? -extern const st_utime_t timeout = 1000000; -const int chkpt = 10000; -const bool verbose = true; -const bool yield_during_recovery = false; -const bool yield_during_catch_up = false; -const bool use_epoll = false; -const uint16_t base_port = 7654; +st_utime_t timeout; +int chkpt; +bool verbose; +bool yield_during_build_up; +bool yield_during_catch_up; st_intr_bool stop_hub, kill_hub; /** @@ -110,11 +107,11 @@ /** * RAII to close all contained netfds. */ -class st_closing_all +class st_closing_all_infos { public: - st_closing_all(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all() { + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { foreach (replica_info r, rs_) check0x(st_netfd_close(r.fd())); } @@ -123,6 +120,21 @@ }; /** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +/** * RAII for dumping the final state of the DB to a file on disk. */ class dump_state @@ -131,10 +143,9 @@ dump_state(const map<int, int> &map, const int &seqno) : map_(map), seqno_(seqno) {} ~dump_state() { - stringstream fname; - fname << "/tmp/ydb" << getpid(); - cout << "dumping DB state (" << seqno_ << ") to " << fname.str() << endl; - ofstream of(fname.str().c_str()); + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; + ofstream of(fname.c_str()); of << "seqno: " << seqno_ << endl; foreach (const pii &p, map_) { of << p.first << ": " << p.second << endl; @@ -209,10 +220,18 @@ } } -inline int -rand32(int max = RAND_MAX) +/** + * Same as the above readmsg(), but returns an internally constructed message. + * This is a "higher-level" readmsg() that relies on return-value optimization + * for avoiding unnecessary copies. + */ +template <typename T> +T +readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { - return static_cast<int>( random() / ( RAND_MAX / max ) ); + T msg; + readmsg(src, msg, timeout); + return msg; } /** @@ -236,10 +255,10 @@ // Generate a random transaction. Txn txn; txn.set_seqno(seqno++); - int count = rand32(5) + 1; + int count = randint(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); + int rtype = randint(3), rkey = randint(), rvalue = randint(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); @@ -339,7 +358,8 @@ } if (!caught_up && res.caught_up()) { caught_up = true; - cout << "recovering node caught up; took " << current_time_millis() - start_time << "ms" << endl; + cout << "recovering node caught up; took " + << current_time_millis() - start_time << "ms" << endl; } if (res.seqno() % chkpt == 0) { if (verbose) @@ -397,26 +417,25 @@ * Run the leader. */ void -run_leader(int nreps) +run_leader(int minreps, uint16_t leader_port) { cout << "starting as leader" << endl; + cout << "waiting for at least " << minreps << " replicas to join" << endl; // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(base_port); + st_netfd_t listener = st_tcp_listen(leader_port); st_closing close_listener(listener); // TODO rename these - int min_reps = nreps - 1; vector<replica_info> replicas; - st_closing_all close_replicas(replicas); - for (int i = 0; i < min_reps; i++) { + st_closing_all_infos close_replicas(replicas); + for (int i = 0; i < minreps; i++) { st_netfd_t fd; { st_intr intr(stop_hub); fd = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join; - readmsg(fd, join); + Join join = readmsg<Join>(fd); replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); } @@ -456,8 +475,7 @@ joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join; - readmsg(joiner, join); + Join join = readmsg<Join>(joiner); cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); sendmsg(joiner, init); @@ -473,7 +491,7 @@ * Run a replica. */ void -run_replica(char *leader_host, uint16_t leader_port, uint16_t listen_port) +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) { // Initialize database state. map<int, int> map; @@ -481,28 +499,30 @@ dump_state ds(map, seqno); st_bool send_state, sent_state; - cout << "starting as replica" << endl; + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. st_netfd_t listener = st_tcp_listen(listen_port); - st_thread_t rec = my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_state), - ref(sent_state))); + st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_state), + ref(sent_state)))); // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host, leader_port, timeout); + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); Join join; join.set_port(listen_port); sendmsg(leader, join); - Init init; - readmsg(leader, init); + Init init = readmsg<Init>(leader); uint32_t listen_host = init.yourhost(); // Display the info. cout << "got init msg with txn seqno " << init.txnseqno() << " and hosts:" << endl; vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); for (uint16_t i = 0; i < init.node_size(); i++) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; @@ -520,9 +540,9 @@ // Process txns. st_channel<Txn*> backlog; - st_thread_t proc = my_spawn(bind(process_txns, leader, ref(map), ref(seqno), - ref(send_state), ref(sent_state), - ref(backlog))); + st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), + ref(seqno), ref(send_state), + ref(sent_state), ref(backlog)))); // If there's anything to recover. if (init.txnseqno() > 0) { @@ -530,12 +550,15 @@ // Read the recovery message. Recovery recovery; - readmsg(replicas[0], recovery); + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); + } for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); if (i % chkpt == 0) { - if (yield_during_recovery) st_sleep(0); + if (yield_during_build_up) st_sleep(0); } } assert(seqno == -1 && @@ -554,13 +577,6 @@ } cout << "caught up." << endl; } - - st_join(proc); - st_join(rec); - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); - } - check0x(st_netfd_close(leader)); } int sig_pipe[2]; @@ -604,9 +620,65 @@ int main(int argc, char **argv) { + namespace po = boost::program_options; try { GOOGLE_PROTOBUF_VERIFY_VERSION; + bool is_leader, use_epoll; + int minreps; + uint16_t leader_port, listen_port; + string leader_host; + + // Parse options. + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("verbose,v", "enable periodic printing of txn processing progress") + ("epoll,e", po::bool_switch(&use_epoll), + "use epoll (select is used by default)") + ("yield-build-up", po::bool_switch(&yield_during_build_up), + "yield periodically during build-up phase of recovery") + ("yield-catch-up", po::bool_switch(&yield_during_catch_up), + "yield periodically during catch-up phase of recovery") + ("leader,l", po::bool_switch(&is_leader), + "run the leader (run replica by default)") + ("leader-host,H", + po::value<string>(&leader_host)->default_value(string("localhost")), + "hostname or address of the leader") + ("leader-port,P", + po::value<uint16_t>(&leader_port)->default_value(7654), + "port the leader listens on") + ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), + "number of txns before yielding/verbose printing") + ("listen-port,p", po::value<uint16_t>(&listen_port), + "port to listen on (replicas only)") + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + "timeout for IO operations (in microseconds)") + ("minreps,n", po::value<int>(&minreps)->default_value(2), + "minimum number of replicas the system is willing to process txns on"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + if (!is_leader && !vm.count("listen-port")) { + class parse_exception : public std::exception { + virtual const char *what() const throw() { + return "running replica requires listen port to be specified"; + } + }; + throw parse_exception(); + } + } catch (std::exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + // Initialize support for ST working with asynchronous signals. check0x(pipe(sig_pipe)); struct sigaction sa; @@ -624,19 +696,11 @@ thread_eraser eraser; threads.insert(st_thread_self()); - // Parse command-line arguments. - if (argc != 2 && argc != 4) - die("leader: ydb <nreplicas>\n" - "replica: ydb <leaderhost> <leaderport> <listenport>\n"); - bool is_leader = argc == 2; - // Which role are we? if (is_leader) { - run_leader(atoi(argv[1])); + run_leader(minreps, leader_port); } else { - run_replica(argv[1], - static_cast<uint16_t>(atoi(argv[2])), - static_cast<uint16_t>(atoi(argv[3]))); + run_replica(leader_host, leader_port, listen_port); } return 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |