[Assorted-commits] SF.net SVN: assorted:[1098] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-12-11 19:16:42
|
Revision: 1098 http://assorted.svn.sourceforge.net/assorted/?rev=1098&view=rev Author: yangzhang Date: 2008-12-11 19:16:31 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - started using clamp - added benchmark performance summary output - fixed unnecessary premature spawn and final join with recover_joiner - fixed issue where stop_hub wasn't being interrupted - added optional time limit as an option, causing issue_txns to stop after some time - added default value for listen port (since nodes are frequently run on different hosts) - added some comment documentation - my_spawn doesn't interrupt other threads by default on exceptions - reworked the communication/synchronization between process_txns and recover_joiner - made channels safer - added full test/benchmark setup & deployment scripts - added patch to get clamp to build Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile Added Paths: ----------- ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/ ydb/trunk/tools/clamp.patch ydb/trunk/tools/test.bash Removed Paths: ------------- ydb/trunk/src/main.lzz Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/README 2008-12-11 19:16:31 UTC (rev 1098) @@ -29,6 +29,7 @@ - [boost] 1.37 - [C++ Commons] svn r1082 +- [clamp] 153 - [GCC] 4.3.2 - [Lazy C++] 2.8.0 - [Protocol Buffers] 2.0.0 @@ -36,6 +37,7 @@ [boost]: http://www.boost.org/ [C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ +[clamp]: http://home.clara.net/raoulgough/clamp/ [GCC]: http://gcc.gnu.org/ [Lazy C++]: http://www.lazycplusplus.com/ [Protocol Buffers]: http://code.google.com/p/protobuf/ @@ -67,8 +69,28 @@ To terminate the system, send a sigint (ctrl-c) to the leader, and a clean shutdown should take place. The replicas dump their DB state to a tmp file, -which you can then verify to be identical. +which you can then verify to be identical. You can also send a sigint to a +replica to stop just that node. If something goes awry, you can send a second +sigint to try to force all working threads to shut down (any node, including +replicas, respond to ctrl-c). +Full System Test +---------------- + + ./test.bash full + +will configure all the farm machines to (1) have my proper initial environment, +(2) have all the prerequisite software, and (3) build ydb. This may take a +long time (particularly the boost-building phase). + + range='10 13' wait=5 ./test.bash run + +will run a leader on farm10, replicas on farm11 and farm12, and a recovering +replica on farm13 after 5 seconds. Pipe several runs of this to some files +(`*.out`), and plot the results with + + ./test.bash plot *.out + Recovery Mechanisms ------------------- @@ -117,11 +139,7 @@ Todo ---- -- Expose program options. - -- Add test suite. - -- Add benchmarking hooks, e.g.: +- Add benchmarking/testing hooks, e.g.: - start the recovering joiner at a well-defined time (after a certain # txns or after the DB reaches a certain size) @@ -136,6 +154,9 @@ - Figure out why things are running so slowly with >2 replicas. +- Add a network recovery scheme that grabs state partitions in parallel from + all other replicas. + - Add a variant of the recovery scheme so that the standing replicas can just send any snapshot of their DB beyond a certain seqno. The joiner can simply discard from its leader-populated backlog any txns before the seqno of the Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/src/Makefile 2008-12-11 19:16:31 UTC (rev 1098) @@ -1,7 +1,7 @@ TARGET := ydb WTF := wtf -LZZS := $(wildcard *.lzz) +LZZS := $(patsubst %.clamp,%,$(wildcard *.lzz.clamp)) LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) @@ -51,12 +51,15 @@ %.pb.h: %.proto protoc --cpp_out=. $< +%.lzz: %.lzz.clamp + clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ + clean: - rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) + rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h doc: $(SRCS) $(HDRS) doxygen .PHONY: clean -.SECONDARY: $(SRCS) $(HDRS) $(OBJS) +.SECONDARY: $(SRCS) $(HDRS) $(OBJS) main.lzz Deleted: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/src/main.lzz 2008-12-11 19:16:31 UTC (rev 1098) @@ -1,712 +0,0 @@ -#hdr -#include <boost/bind.hpp> -#include <boost/foreach.hpp> -#include <boost/program_options.hpp> -#include <boost/scoped_array.hpp> -#include <commons/nullptr.h> -#include <commons/rand.h> -#include <commons/st/st.h> -#include <commons/time.h> -#include <csignal> // sigaction etc. -#include <cstdio> -#include <cstring> // strsignal -#include <iostream> -#include <fstream> -#include <map> -#include <netinet/in.h> // in_addr etc. -#include <set> -#include <sys/socket.h> // getpeername -#include <sys/types.h> // ssize_t -#include <unistd.h> // pipe, write -#include <vector> -#include "ydb.pb.h" -#define foreach BOOST_FOREACH -using namespace boost; -using namespace commons; -using namespace std; -#end - -typedef pair<int, int> pii; -st_utime_t timeout; -int chkpt; -bool verbose; -bool yield_during_build_up; -bool yield_during_catch_up; -st_intr_bool stop_hub, kill_hub; - -/** - * The list of all threads. Keep track of these so that we may cleanly shut - * down all threads. - */ -set<st_thread_t> threads; - -class thread_eraser -{ - public: - thread_eraser() { threads.insert(st_thread_self()); } - ~thread_eraser() { threads.erase(st_thread_self()); } -}; - -/** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (const std::exception &ex) { - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - */ -st_thread_t -my_spawn(const function0<void> &f, bool intr = true) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - return t; -} - -/** - * Used by the leader to bookkeep information about replicas. - */ -class replica_info -{ - public: - replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} - st_netfd_t fd() const { return fd_; } - /** The port on which the replica is listening. */ - uint16_t port() const { return port_; } -#hdr -#define GETSA sockaddr_in sa; sockaddr(sa); return sa -#end - /** The port on which the replica connected to us. */ - uint16_t local_port() const { GETSA.sin_port; } - uint32_t host() const { GETSA.sin_addr.s_addr; } - sockaddr_in sockaddr() const { GETSA; } - void sockaddr(sockaddr_in &sa) const { - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd_), - reinterpret_cast<struct sockaddr*>(&sa), - &salen)); - } - private: - st_netfd_t fd_; - uint16_t port_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all_infos -{ - public: - st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all_infos() { - foreach (replica_info r, rs_) - check0x(st_netfd_close(r.fd())); - } - private: - const vector<replica_info> &rs_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all -{ - public: - st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} - ~st_closing_all() { - foreach (st_netfd_t r, rs_) - check0x(st_netfd_close(r)); - } - private: - const vector<st_netfd_t> &rs_; -}; - -/** - * RAII for dumping the final state of the DB to a file on disk. - */ -class dump_state -{ - public: - dump_state(const map<int, int> &map, const int &seqno) - : map_(map), seqno_(seqno) {} - ~dump_state() { - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << seqno_ << endl; - foreach (const pii &p, map_) { - of << p.first << ": " << p.second << endl; - } - } - private: - const map<int, int> &map_; - const int &seqno_; -}; - -/** - * Send a message to some destinations (sequentially). - */ -template<typename T> -void -bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) -{ - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - foreach (st_netfd_t dst, dsts) { - checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), - static_cast<ssize_t>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(s.size())); - } -} - -/** - * Send a message to a single recipient. - */ -template<typename T> -void -sendmsg(st_netfd_t dst, const T &msg) -{ - vector<st_netfd_t> dsts(1, dst); - bcastmsg(dsts, msg); -} - -/** - * Read a message. - */ -template <typename T> -void -readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - // Read the message length. - uint32_t len; - checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - timeout), - static_cast<ssize_t>(sizeof len)); - len = ntohl(len); - -#define GETMSG(buf) \ - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ - check(msg.ParseFromArray(buf, len)); - - // Parse the message body. - if (len < 4096) { - char buf[len]; - GETMSG(buf); - } else { - cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; - scoped_array<char> buf(new char[len]); - GETMSG(buf.get()); - } -} - -/** - * Same as the above readmsg(), but returns an internally constructed message. - * This is a "higher-level" readmsg() that relies on return-value optimization - * for avoiding unnecessary copies. - */ -template <typename T> -T -readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - T msg; - readmsg(src, msg, timeout); - return msg; -} - -/** - * Keep issuing transactions to the replicas. - */ -void -issue_txns(st_channel<replica_info> &newreps, int &seqno) -{ - Op_OpType types[] = {Op::read, Op::write, Op::del}; - vector<st_netfd_t> fds; - - while (!stop_hub) { - // Did we get a new member? - if (!newreps.empty() && seqno > 0) { - sendmsg(fds[0], Txn()); - } - while (!newreps.empty()) { - fds.push_back(newreps.take().fd()); - } - - // Generate a random transaction. - Txn txn; - txn.set_seqno(seqno++); - int count = randint(5) + 1; - for (int o = 0; o < count; o++) { - Op *op = txn.add_op(); - int rtype = randint(3), rkey = randint(), rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } - - // Broadcast. - bcastmsg(fds, txn); - - // Checkpoint. - if (txn.seqno() % chkpt == 0) { - if (verbose) cout << "issued txn " << txn.seqno() << endl; - st_sleep(0); - } - } -} - -/** - * Process a transaction: update DB state (incl. seqno) and send response to - * leader. - */ -void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, - bool caught_up) -{ - checkeq(txn.seqno(), seqno + 1); - Response res; - res.set_seqno(txn.seqno()); - res.set_caught_up(caught_up); - seqno = txn.seqno(); - for (int o = 0; o < txn.op_size(); o++) { - const Op &op = txn.op(o); - switch (op.type()) { - case Op::read: - res.add_result(map[op.key()]); - break; - case Op::write: - map[op.key()] = op.value(); - break; - case Op::del: - map.erase(op.key()); - break; - } - } - sendmsg(leader, res); -} - -/** - * Actually do the work of executing a transaction and sending back the reply. - */ -void -process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, - st_bool &send_state, st_bool &sent_state, - st_channel<Txn*> &backlog) -{ - while (true) { - Txn txn; - { - st_intr intr(kill_hub); - readmsg(leader, txn); - } - - if (txn.has_seqno()) { - if (txn.seqno() == seqno + 1) { - process_txn(leader, map, txn, seqno, true); - } else { - // Queue up for later processing once a snapshot has been received. - backlog.push(new Txn(txn)); - } - - if (txn.seqno() % chkpt == 0) { - if (verbose) cout << "processed txn " << txn.seqno() << endl; - st_sleep(0); - } - } else { - // Wait for the snapshot to be generated. - send_state.set(); - cout << "waiting for state to be sent" << endl; - sent_state.waitset(); - sent_state.reset(); - cout << "state sent" << endl; - } - } -} - -/** - * Keep swallowing replica responses. - */ -void -handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) -{ - long long start_time = current_time_millis(); - while (true) { - Response res; - { - st_intr intr(kill_hub); - readmsg(replica, res); - } - if (!caught_up && res.caught_up()) { - caught_up = true; - cout << "recovering node caught up; took " - << current_time_millis() - start_time << "ms" << endl; - } - if (res.seqno() % chkpt == 0) { - if (verbose) - cout << "got response " << res.seqno() << " from " << replica << endl; - st_sleep(0); - } - if (stop_hub && res.seqno() + 1 == seqno) { - cout << "seqno = " << res.seqno() << endl; - break; - } - } -} - -/** - * Help the recovering node. - */ -void -recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, - st_bool &send_state, st_bool &sent_state) -{ - // Wait for the right time to generate the snapshot. - { - st_intr intr(stop_hub); - send_state.waitset(); - } - send_state.reset(); - - cout << "snapshotting state for recovery" << endl; - Recovery recovery; - foreach (const pii &p, map) { - Recovery_Pair *pair = recovery.add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - recovery.set_seqno(seqno); - - // Notify process_txns that it may continue processing. - sent_state.set(); - - // Wait for the new joiner. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - st_closing closing(joiner); - - cout << "got joiner's connection, sending recovery" << endl; - sendmsg(joiner, recovery); - cout << "sent" << endl; -} - -/** - * Run the leader. - */ -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - cout << "waiting for at least " << minreps << " replicas to join" << endl; - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - // TODO rename these - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - for (int i = 0; i < minreps; i++) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(fd); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - int seqno = 0; - st_channel<replica_info> newreps; - const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); - st_thread_t swallower = my_spawn(bind(swallow, f)); - foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_swallower(swallower); - - // Start handling responses. - st_thread_group handlers; - foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(joiner); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - sendmsg(joiner, init); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); -} - -/** - * Run a replica. - */ -void -run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - // Initialize database state. - map<int, int> map; - int seqno = -1; - dump_state ds(map, seqno); - st_bool send_state, sent_state; - - cout << "starting as replica on port " << listen_port << endl; - - // Listen for connections from other replicas. - st_netfd_t listener = - st_tcp_listen(listen_port); - st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_state), - ref(sent_state)))); - - // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, - timeout); - st_closing closing(leader); - Join join; - join.set_port(listen_port); - sendmsg(leader, join); - Init init = readmsg<Init>(leader); - uint32_t listen_host = init.yourhost(); - - // Display the info. - cout << "got init msg with txn seqno " << init.txnseqno() - << " and hosts:" << endl; - vector<st_netfd_t> replicas; - st_closing_all close_replicas(replicas); - for (uint16_t i = 0; i < init.node_size(); i++) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host = { sa.host() }; - bool is_self = sa.host() == listen_host && sa.port() == listen_port; - cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, - INET_ADDRSTRLEN)) - << ':' << sa.port() << (is_self ? " (self)" : "") << endl; - if (!is_self && init.txnseqno() > 0) { - replicas.push_back(st_tcp_connect(host, - static_cast<uint16_t>(sa.port()), - ST_UTIME_NO_TIMEOUT)); - } - } - - // Process txns. - st_channel<Txn*> backlog; - st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), - ref(seqno), ref(send_state), - ref(sent_state), ref(backlog)))); - - // If there's anything to recover. - if (init.txnseqno() > 0) { - cout << "waiting for recovery from " << replicas[0] << endl; - - // Read the recovery message. - Recovery recovery; - { - st_intr intr(stop_hub); - readmsg(replicas[0], recovery); - } - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); - if (i % chkpt == 0) { - if (yield_during_build_up) st_sleep(0); - } - } - assert(seqno == -1 && - static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered." << endl; - - while (!backlog.empty()) { - Txn *p = backlog.take(); - process_txn(leader, map, *p, seqno, false); - if (p->seqno() % chkpt == 0) { - cout << "processed txn " << p->seqno() << " off the backlog" << endl; - if (yield_during_catch_up) st_sleep(0); - } - delete p; - } - cout << "caught up." << endl; - } -} - -int sig_pipe[2]; - -/** - * Raw signal handler that triggers the (synchronous) handler. - */ -void handle_sig(int sig) { - int err = errno; - cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; - checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), - static_cast<ssize_t>(sizeof sig)); - errno = err; -} - -/** - * Synchronous part of the signal handler; cleanly interrrupts any threads that - * have marked themselves as interruptible. - */ -void handle_sig_sync() { - stfd fd = checkerr(st_netfd_open(sig_pipe[0])); - while (true) { - int sig; - checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(sizeof sig)); - if (sig == SIGINT) { - if (!stop_hub) stop_hub.set(); - else kill_hub.set(); - } else if (sig == SIGTERM) { - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - } - break; - } -} - -/** - * Initialization and command-line parsing. - */ -int -main(int argc, char **argv) -{ - namespace po = boost::program_options; - try { - GOOGLE_PROTOBUF_VERIFY_VERSION; - - bool is_leader, use_epoll; - int minreps; - uint16_t leader_port, listen_port; - string leader_host; - - // Parse options. - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "show this help message") - ("verbose,v", "enable periodic printing of txn processing progress") - ("epoll,e", po::bool_switch(&use_epoll), - "use epoll (select is used by default)") - ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery") - ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery") - ("leader,l", po::bool_switch(&is_leader), - "run the leader (run replica by default)") - ("leader-host,H", - po::value<string>(&leader_host)->default_value(string("localhost")), - "hostname or address of the leader") - ("leader-port,P", - po::value<uint16_t>(&leader_port)->default_value(7654), - "port the leader listens on") - ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), - "number of txns before yielding/verbose printing") - ("listen-port,p", po::value<uint16_t>(&listen_port), - "port to listen on (replicas only)") - ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), - "timeout for IO operations (in microseconds)") - ("minreps,n", po::value<int>(&minreps)->default_value(2), - "minimum number of replicas the system is willing to process txns on"); - - po::variables_map vm; - try { - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - - if (vm.count("help")) { - cout << desc << endl; - return 0; - } - if (!is_leader && !vm.count("listen-port")) { - class parse_exception : public std::exception { - virtual const char *what() const throw() { - return "running replica requires listen port to be specified"; - } - }; - throw parse_exception(); - } - } catch (std::exception &ex) { - cerr << ex.what() << endl << endl << desc << endl; - return 1; - } - - // Initialize support for ST working with asynchronous signals. - check0x(pipe(sig_pipe)); - struct sigaction sa; - sa.sa_handler = handle_sig; - check0x(sigemptyset(&sa.sa_mask)); - sa.sa_flags = 0; - check0x(sigaction(SIGINT, &sa, nullptr)); - - // Initialize ST. - if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); - check0x(st_init()); - st_spawn(bind(handle_sig_sync)); - - // Initialize thread manager for clean shutdown of all threads. - thread_eraser eraser; - threads.insert(st_thread_self()); - - // Which role are we? - if (is_leader) { - run_leader(minreps, leader_port); - } else { - run_replica(leader_host, leader_port, listen_port); - } - - return 0; - } catch (const std::exception &ex) { - // Must catch all exceptions at the top to make the stack unwind. - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; - return 1; - } -} Copied: ydb/trunk/src/main.lzz.clamp (from rev 1093, ydb/trunk/src/main.lzz) =================================================================== --- ydb/trunk/src/main.lzz.clamp (rev 0) +++ ydb/trunk/src/main.lzz.clamp 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,787 @@ +#hdr +#include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <boost/scoped_array.hpp> +#include <boost/shared_ptr.hpp> +#include <commons/nullptr.h> +#include <commons/rand.h> +#include <commons/st/st.h> +#include <commons/time.h> +#include <csignal> // sigaction etc. +#include <cstdio> +#include <cstring> // strsignal +#include <iostream> +#include <fstream> +#include <map> +#include <netinet/in.h> // in_addr etc. +#include <set> +#include <sys/socket.h> // getpeername +#include <sys/types.h> // ssize_t +#include <unistd.h> // pipe, write +#include <vector> +#include "ydb.pb.h" +#define foreach BOOST_FOREACH +using namespace boost; +using namespace commons; +using namespace std; +#end + +typedef pair<int, int> pii; +st_utime_t timeout; +int chkpt; +bool verbose, yield_during_build_up, yield_during_catch_up; +long long timelim; +st_intr_bool stop_hub, kill_hub; + +/** + * The list of all threads. Keep track of these so that we may cleanly shut + * down all threads. + */ +set<st_thread_t> threads; + +/** + * RAII for adding/removing the current thread from the global threads set. + */ +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. + */ +void +my_spawn_helper(const function0<void> f, bool intr) +{ + thread_eraser eraser; + try { + f(); + } catch (const std::exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() + << (intr ? "; interrupting!" : "") << endl; + if (intr) stop_hub.set(); + } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. Not actually + * used anywhere. + */ +st_thread_t +my_spawn(const function0<void> &f, bool intr = false) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); + threads.insert(t); + return t; +} + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } +#hdr +#define GETSA sockaddr_in sa; sockaddr(sa); return sa +#end + /** The port on which the replica connected to us. */ + uint16_t local_port() const { GETSA.sin_port; } + uint32_t host() const { GETSA.sin_addr.s_addr; } + sockaddr_in sockaddr() const { GETSA; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all_infos +{ + public: + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +/** + * RAII for dumping the final state of the DB to a file on disk. + */ +class dump_state +{ + public: + dump_state(const map<int, int> &map, const int &seqno) + : map_(map), seqno_(seqno) {} + ~dump_state() { + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << seqno_ << endl; + foreach (const pii &p, map_) { + of << p.first << ": " << p.second << endl; + } + } + private: + const map<int, int> &map_; + const int &seqno_; +}; + +/** + * Send a message to some destinations (sequentially). + */ +template<typename T> +void +bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + foreach (st_netfd_t dst, dsts) { + checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), + static_cast<ssize_t>(sizeof len)); + checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(s.size())); + } +} + +/** + * Send a message to a single recipient. + */ +template<typename T> +void +sendmsg(st_netfd_t dst, const T &msg) +{ + vector<st_netfd_t> dsts(1, dst); + bcastmsg(dsts, msg); +} + +/** + * Read a message. + */ +template <typename T> +void +readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + // Read the message length. + uint32_t len; + checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, + timeout), + static_cast<ssize_t>(sizeof len)); + len = ntohl(len); + +#define GETMSG(buf) \ + checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ + check(msg.ParseFromArray(buf, len)); + + // Parse the message body. + if (len < 4096) { + char buf[len]; + GETMSG(buf); + } else { + cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + scoped_array<char> buf(new char[len]); + GETMSG(buf.get()); + } +} + +/** + * Same as the above readmsg(), but returns an internally constructed message. + * This is a "higher-level" readmsg() that relies on return-value optimization + * for avoiding unnecessary copies. + */ +template <typename T> +T +readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + T msg; + readmsg(src, msg, timeout); + return msg; +} + +/** + * Keep issuing transactions to the replicas. + */ +void +issue_txns(st_channel<replica_info> &newreps, int &seqno) +{ + Op_OpType types[] = {Op::read, Op::write, Op::del}; + vector<st_netfd_t> fds; + long long start_time = current_time_millis(); + + finally f(lambda () { + showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), + 0); + }); + + while (!stop_hub) { + // Did we get a new member? + if (!newreps.empty() && seqno > 0) { + sendmsg(fds[0], Txn()); + } + while (!newreps.empty()) { + fds.push_back(newreps.take().fd()); + } + + // Generate a random transaction. + Txn txn; + txn.set_seqno(seqno++); + int count = randint(5) + 1; + for (int o = 0; o < count; o++) { + Op *op = txn.add_op(); + int rtype = randint(3), rkey = randint(), rvalue = randint(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } + + // Broadcast. + bcastmsg(fds, txn); + + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "issued txn " << txn.seqno() << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + st_sleep(0); + } + } +} + +/** + * Process a transaction: update DB state (incl. seqno) and send response to + * leader. + */ +void +process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, + bool caught_up) +{ + checkeq(txn.seqno(), seqno + 1); + Response res; + res.set_seqno(txn.seqno()); + res.set_caught_up(caught_up); + seqno = txn.seqno(); + for (int o = 0; o < txn.op_size(); o++) { + const Op &op = txn.op(o); + switch (op.type()) { + case Op::read: + res.add_result(map[op.key()]); + break; + case Op::write: + map[op.key()] = op.value(); + break; + case Op::del: + map.erase(op.key()); + break; + } + } + sendmsg(leader, res); +} + +void +showtput(const string &action, long long stop_time, long long start_time, + int stop_count, int start_count) +{ + long long time_diff = stop_time - start_time; + int count_diff = stop_count - start_count; + double rate = double(count_diff) * 1000 / time_diff; + cout << action << " " << count_diff << " txns in " << time_diff << " ms (" + << rate << "tps)" << endl; +} + +/** + * Actually do the work of executing a transaction and sending back the reply. + * + * \param[in] leader The connection to the leader. + * + * \param[in] map The data store. + * + * \param[in] seqno The sequence number last seen. This always starts at 0, + * but may be bumped up by the recovery procedure. + * + * \param[in] send_states Channel of snapshots of the database state to send to + * recovering nodes (sent to recover_joiner). + * + * \param[in] backlog The backlog of txns that need to be processed. + * + * \param[in] init_seqno The seqno that was sent in the Init message from the + * leader. Not entirely clear that this is necessary; could probably just go + * with seqno. + */ +void +process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, + st_channel<shared_ptr<Recovery> > &send_states, + st_channel<shared_ptr<Txn> > &backlog, int init_seqno) +{ + bool caught_up = init_seqno == 0; + long long start_time = current_time_millis(), + time_caught_up = caught_up ? start_time : -1; + int seqno_caught_up = caught_up ? seqno : -1; + + finally f(lambda () { + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); + + while (true) { + Txn txn; + { + st_intr intr(stop_hub); + readmsg(leader, txn); + } + + if (txn.has_seqno()) { + if (txn.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("backlogged", time_caught_up, start_time, seqno_caught_up, + init_seqno); + caught_up = true; + } + process_txn(leader, map, txn, seqno, true); + } else { + // Queue up for later processing once a snapshot has been received. + backlog.push(shared_ptr<Txn>(new Txn(txn))); + } + + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << txn.seqno() + << "; db size = " << map.size() << endl; + st_sleep(0); + } + } else { + // Generate a snapshot. + shared_ptr<Recovery> recovery(new Recovery); + foreach (const pii &p, map) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + recovery->set_seqno(seqno); + send_states.push(recovery); + } + } + +} + +/** + * Keep swallowing replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) +{ + long long start_time = current_time_millis(); + while (true) { + Response res; + { + st_intr intr(kill_hub); + readmsg(replica, res); + } + if (!caught_up && res.caught_up()) { + caught_up = true; + long long timediff = current_time_millis() - start_time; + cout << "recovering node caught up; took " + << timediff << "ms" << endl; + } + if (res.seqno() % chkpt == 0) { + if (verbose) + cout << "got response " << res.seqno() << " from " << replica << endl; + st_sleep(0); + } + if (stop_hub && res.seqno() + 1 == seqno) { + cout << "seqno = " << res.seqno() << endl; + break; + } + } +} + +/** + * Help the recovering node. + * + * \param[in] listener The connection on which we're listening for connections + * from recovering joiners. + * + * \param[in] map The database state. + * + * \param[in] seqno The sequence number. Always starts at 0. + * + * \param[in] send_states Channel of snapshots of the database state to receive + * from process_txns. + */ +void +recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, + st_channel<shared_ptr<Recovery> > &send_states) +{ + st_netfd_t joiner; + shared_ptr<Recovery> recovery; + { + st_intr intr(stop_hub); + // Wait for the snapshot. + recovery = send_states.take(); + if (recovery == nullptr) { + return; + } + // Wait for the new joiner. + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + + st_closing closing(joiner); + cout << "got joiner's connection, sending recovery" << endl; + sendmsg(joiner, *recovery); + cout << "sent" << endl; +} + +/** + * Run the leader. + */ +void +run_leader(int minreps, uint16_t leader_port) +{ + cout << "starting as leader" << endl; + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(leader_port); + st_closing close_listener(listener); + vector<replica_info> replicas; + st_closing_all_infos close_replicas(replicas); + cout << "waiting for at least " << minreps << " replicas to join" << endl; + for (int i = 0; i < minreps; i++) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join = readmsg<Join>(fd); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } + cout << "got all " << minreps << " replicas" << endl; + + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } + + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } + + // Start dispatching queries. + int seqno = 0; + st_channel<replica_info> newreps; + const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); + st_thread_t swallower = my_spawn(bind(swallow, f)); + foreach (const replica_info &r, replicas) newreps.push(r); + st_joining join_swallower(swallower); + + // Start handling responses. + st_thread_group handlers; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join = readmsg<Join>(joiner); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); +} + +/** + * Run a replica. + */ +void +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) +{ + // Initialize database state. + map<int, int> map; + int seqno = -1; + dump_state ds(map, seqno); + st_channel<shared_ptr<Recovery> > send_states; + + cout << "starting as replica on port " << listen_port << endl; + + // Listen for connections from other replicas. + st_netfd_t listener = st_tcp_listen(listen_port); + + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + { + st_intr intr(stop_hub); + readmsg(leader, init); + } + uint32_t listen_host = init.yourhost(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); + for (uint16_t i = 0; i < init.node_size(); i++) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (!is_self && init.txnseqno() > 0) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); + } + } + + // Process txns. + st_channel<shared_ptr<Txn> > backlog; + st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), + ref(seqno), ref(send_states), + ref(backlog), init.txnseqno()))); + st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_states)))); + + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery from " << replicas[0] << endl; + + // Read the recovery message. + Recovery recovery; + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); + } + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered." << endl; + + while (!backlog.empty()) { + shared_ptr<Txn> p = backlog.take(); + process_txn(leader, map, *p, seqno, false); + if (p->seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << p->seqno() << " off the backlog" << endl; + if (yield_during_catch_up) + st_sleep(0); + } + } + cout << "caught up." << endl; + } + + stop_hub.insert(st_thread_self()); +} + +int sig_pipe[2]; + +/** + * Raw signal handler that triggers the (synchronous) handler. + */ +void handle_sig(int sig) { + int err = errno; + cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), + static_cast<ssize_t>(sizeof sig)); + errno = err; +} + +/** + * Synchronous part of the signal handler; cleanly interrrupts any threads that + * have marked themselves as interruptible. + */ +void handle_sig_sync() { + stfd fd = checkerr(st_netfd_open(sig_pipe[0])); + while (true) { + int sig; + checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(sizeof sig)); + if (sig == SIGINT) { + if (!stop_hub) stop_hub.set(); + else kill_hub.set(); + } else if (sig == SIGTERM) { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + } + break; + } +} + +/** + * Initialization and command-line parsing. + */ +int +main(int argc, char **argv) +{ + namespace po = boost::program_options; + try { + GOOGLE_PROTOBUF_VERIFY_VERSION; + + bool is_leader, use_epoll; + int minreps; + uint16_t leader_port, listen_port; + string leader_host; + + // Parse options. + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("verbose,v", "enable periodic printing of txn processing progress") + ("epoll,e", po::bool_switch(&use_epoll), + "use epoll (select is used by default)") + ("yield-build-up", po::bool_switch(&yield_during_build_up), + "yield periodically during build-up phase of recovery") + ("yield-catch-up", po::bool_switch(&yield_during_catch_up), + "yield periodically during catch-up phase of recovery") + ("leader,l", po::bool_switch(&is_leader), + "run the leader (run replica by default)") + ("leader-host,H", + po::value<string>(&leader_host)->default_value(string("localhost")), + "hostname or address of the leader") + ("leader-port,P", + po::value<uint16_t>(&leader_port)->default_value(7654), + "port the leader listens on") + ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), + "number of txns before yielding/verbose printing") + ("timelim,T", po::value<long long>(&timelim)->default_value(0), + "time limit in milliseconds, or 0 for none") + ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), + "port to listen on (replicas only)") + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + "timeout for IO operations (in microseconds)") + ("minreps,n", po::value<int>(&minreps)->default_value(2), + "minimum number of replicas the system is willing to process txns on"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + } catch (std::exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + + // Initialize support for ST working with asynchronous signals. + check0x(pipe(sig_pipe)); + struct sigaction sa; + sa.sa_handler = handle_sig; + check0x(sigemptyset(&sa.sa_mask)); + sa.sa_flags = 0; + check0x(sigaction(SIGINT, &sa, nullptr)); + + // Initialize ST. + if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); + check0x(st_init()); + st_spawn(bind(handle_sig_sync)); + + // Initialize thread manager for clean shutdown of all threads. + thread_eraser eraser; + threads.insert(st_thread_self()); + + // Which role are we? + if (is_leader) { + run_leader(minreps, leader_port); + } else { + run_replica(leader_host, leader_port, listen_port); + } + + return 0; + } catch (const std::exception &ex) { + // Must catch all exceptions at the top to make the stack unwind. + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + return 1; + } +} Property changes on: ydb/trunk/src/main.lzz.clamp ___________________________________________________________________ Added: svn:mergeinfo + Added: ydb/trunk/tools/clamp.patch =================================================================== --- ydb/trunk/tools/clamp.patch (rev 0) +++ ydb/trunk/tools/clamp.patch 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,29 @@ +Only in clamp_053_new/: clamp +diff -u -r clamp_053/CodeGen.cc clamp_053_new/CodeGen.cc +--- clamp_053/CodeGen.cc 2003-09-30 18:44:04.000000000 -0400 ++++ clamp_053_new/CodeGen.cc 2008-12-11 01:25:30.000000000 -0500 +@@ -20,6 +20,7 @@ + + #include "CodeGen.hh" + ++#include <climits> + #include <sstream> + #include <cassert> + #include <iostream> +Binary files clamp_053/CodeGen.o and clamp_053_new/CodeGen.o differ +Only in clamp_053_new/: lambda_impl.clamp_h +diff -u -r clamp_053/Makefile clamp_053_new/Makefile +--- clamp_053/Makefile 2003-09-30 18:44:05.000000000 -0400 ++++ clamp_053_new/Makefile 2008-12-11 03:48:32.000000000 -0500 +@@ -27,9 +27,7 @@ + # pieces, depending on your set-up: CXX, CC, LEX and -I ...boost... + # + +-CXX = f:/mingw/bin/g++ +-CC = f:/mingw/bin/gcc +-LEX = f:/mingw/bin/flex ++LEX = flex + + CXXFLAGS = -g -Wall -Wno-unused -I d:/CVS/boost/boost + # Use -Wno-unused because lex.yy.c contains some unused labels and functions +Only in clamp_053_new/: test.cc Added: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash (rev 0) +++ ydb/trunk/tools/test.bash 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,274 @@ +#!/usr/bin/env bash + +set -o errexit -o nounset +if [[ "$1" != node-init-setup ]] +then . common.bash || exit 1 +fi + +script="$(basename "$0")" + +tagssh() { + ssh "$@" 2>&1 | sed "s/^/$1: /" +} + +check-remote() { + if [[ ${force:-asdf} != asdf && `hostname` == yang-xps410 ]] + then echo 'running a remote command on your pc!' 1>&2 && exit 1 + fi +} + +node-init-setup() { + check-remote + mkdir -p work + cd work + if [[ ! -d assorted ]] + then svn -q co https://assorted.svn.sourceforge.net/svnroot/assorted/ + fi + cd assorted/configs/trunk/ + ./bootstrap.bash local +} + +node-setup-lzz() { + check-remote + mkdir -p ~/.local/pkg/lzz/bin/ + mv /tmp/lzz.static ~/.local/pkg/lzz/bin/lzz + refresh-local +} + +node-setup-st() { + check-remote + mkdir -p ~/.local/pkg/st/{include,lib}/ + cd /tmp/ + tar xzf st-1.8.tar.gz + cd st-1.8 + CONFIG_GUESS_PATH=/tmp make -s default + make -C extensions -s + cp -f obj/st.h ~/.local/pkg/st/include/ + cp -f extensions/stx.h ~/.local/pkg/st/include/ + cp -f obj/{libst.{a,so*},libstx.a} ~/.local/pkg/st/lib/ + refresh-local +} + +node-setup-pb() { + check-remote + toast --quiet arm /tmp/protobuf-2.0.2.tar.bz2 +} + +node-setup-boost() { + check-remote + cd /tmp/ + tar xjf /tmp/boost_1_37_0.tar.bz2 + cd boost_1_37_0/ + ./configure --prefix=$HOME/.local/pkg/boost-1.37.0 + make -s install + ln -s ~/.local/pkg/boost-1.37.0/include/boost-1_37/boost/ ~/.local/pkg/boost-1.37.0/include/ + refresh-local +} + +node-setup-m4() { + check-remote + toast --quiet arm 'http://ftp.gnu.org/gnu/m4/m4-1.4.12.tar.bz2' +} + +node-setup-bison() { + check-remote + toast --quiet arm 'http://ftp.gnu.org/gnu/bison/bison-2.4.tar.bz2' +} + +node-setup-flex() { + check-remote + toast --quiet arm 'http://prdownloads.sourceforge.net/flex/flex-2.5.35.tar.bz2' +} + +node-setup-clamp() { + check-remote + cd /tmp/ + tar xzf clamp_053_src.tar.gz + cd clamp_053/ + chmod u+w * + patch -p1 < /tmp/clamp.patch + make -s clamp + mkdir -p ~/.local/pkg/clamp/bin/ + mv clamp ~/.local/pkg/clamp/bin/ + refresh-local +} + +node-setup-ydb-1() { + check-remote + if [[ ! -L ~/ydb ]] + then ln -s ~/work/assorted/ydb/trunk ~/ydb + fi + if [[ ! -L ~/ccom ]] + then ln -s ~/work/assorted/cpp-commons/trunk ~/ccom + fi +} + +node-setup-ydb-2() { + check-remote + cd ~/ccom/ + ./setup.bash -d -p ~/.local/pkg/cpp-commons + refresh-local + cd ~/ydb/src + make clean + make WTF= +} + +remote() { + local host="$1" + shift + scp -q "$(dirname "$0")/$script" "$host:" + tagssh "$host" "./$script" "$@" +} + +allhosts() { + if [[ ${host:-} ]] ; then + echo $host + elif [[ ${range:-} ]] ; then + seq $range | sed 's/^/farm/; s/$/.csail/' + else + cat << EOF +farm1.csail +farm2.csail +farm3.csail +farm4.csail +farm5.csail +farm6.csail +farm7.csail +farm8.csail +farm9.csail +farm10.csail +farm11.csail +farm12.csail +farm13.csail +farm14.csail +EOF + fi | xargs ${xargs--P9} -I^ "$@" +} + +allssh() { + allhosts ssh ^ "set -o errexit -o nounset; $@" +} + +allscp() { + allhosts scp -q "$@" +} + +allremote() { + allhosts "./$script" remote ^ "$@" +} + +init-setup() { + allremote node-init-setup +} + +get-deps() { + xargs -I_ -P9 wget -nv -P /tmp/ _ << EOF +http://www.lazycplusplus.com/lzz_2_8_0_linux.zip +http://downloads.sourceforge.net/state-threads/st-1.8.tar.gz +http://protobuf.googlecode.com/files/protobuf-2.0.2.tar.bz2 +http://downloads.sourceforge.net/boost/boost_1_37_0.tar.bz2 +http://home.clara.net/raoulgough/clamp/clamp_053_src.tar.gz +EOF + cd /tmp/ + unzip lzz_2_8_0_linux.zip lzz.static +} + +setup-deps() { + allscp \ + /usr/share/misc/config.guess \ + /tmp/lzz.static \ + /tmp/st-1.8.tar.gz \ + /tmp/protobuf-2.0.2.tar.bz2 \ + /tmp/boost_1_37_0.tar.bz2 \ + clamp.patch \ + ^:/tmp/ + + allremote node-setup-lzz + allremote node-setup-st + allremote node-setup-pb + allremote node-setup-boost + allremote node-setup-m4 + allremote node-setup-bison + allremote node-setup-clamp +} + +setup-ydb() { + allremote node-setup-ydb-1 + rm -r /tmp/{ydb,ccom}-src/ + svn export ~/ydb/src /tmp/ydb-src/ + svn export ~/ccom/src /tmp/ccom-src/ + allscp -r /tmp/ydb-src/* ^:ydb/src/ + allscp -r /tmp/ccom-src/* ^:ccom/src/ + allremote node-setup-ydb-2 +} + +full() { + init-setup + setup-deps + setup-ydb +} + +hostinfos() { + xargs= allssh " + echo + hostname + echo ===== + fgrep 'model name' /proc/cpuinfo + head -2 /proc/meminfo + " +} + +hosttops() { + xargs= allssh " + echo + hostname + echo ===== + top -b -n 1 | fgrep -A3 COMMAND + " +} + +run-helper() { + tagssh $1 "ydb/src/ydb -l" & + sleep .1 + tagssh $2 "ydb/src/ydb -H $1" & + tagssh $3 "ydb/src/ydb -H $1" & + sleep ${wait:-10} + tagssh $4 "ydb/src/ydb -H $1" & + read + kill %1 +} + +range2args() { + "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') +} + +run() { + range2args run-helper +} + +stop-helper() { + tagssh $1 'pkill ydb' +} + +stop() { + range2args stop-helper +} + +kill-helper() { + tagssh $1 'pkill ydb' + tagssh $2 'pkill ydb' + tagssh $3 'pkill ydb' + tagssh $4 'pkill ydb' +} + +kill() { + range2args kill-helper +} + +#plot() { +# for i in "$@" ; do +# sed "s/farm$i.csail//" < "$i" +# done +#} + +"$@" Property changes on: ydb/trunk/tools/test.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |