assorted-commits Mailing List for Assorted projects (Page 33)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2008-12-19 23:20:43
|
Revision: 1102 http://assorted.svn.sourceforge.net/assorted/?rev=1102&view=rev Author: yangzhang Date: 2008-12-19 23:20:40 +0000 (Fri, 19 Dec 2008) Log Message: ----------- added farm machines, synced up home/zs Modified Paths: -------------- configs/trunk/src/ssh/config Modified: configs/trunk/src/ssh/config =================================================================== --- configs/trunk/src/ssh/config 2008-12-19 23:18:49 UTC (rev 1101) +++ configs/trunk/src/ssh/config 2008-12-19 23:20:40 UTC (rev 1102) @@ -5,6 +5,66 @@ # StrictHostKeyChecking no +Host farm1 + HostName farm1.csail.mit.edu + User yang + +Host farm2 + HostName farm2.csail.mit.edu + User yang + +Host farm3 + HostName farm3.csail.mit.edu + User yang + +Host farm4 + HostName farm4.csail.mit.edu + User yang + +Host farm5 + HostName farm5.csail.mit.edu + User yang + +Host farm6 + HostName farm6.csail.mit.edu + User yang + +Host farm7 + HostName farm7.csail.mit.edu + User yang + +Host farm8 + HostName farm8.csail.mit.edu + User yang + +Host farm9 + HostName farm9.csail.mit.edu + User yang + +Host farm10 + HostName farm10.csail.mit.edu + User yang + +Host farm11 + HostName farm11.csail.mit.edu + User yang + +Host farm12 + HostName farm12.csail.mit.edu + User yang + +Host farm13 + HostName farm13.csail.mit.edu + User yang + +Host farm14 + HostName farm14.csail.mit.edu + User yang + +Host farm15 + HostName farm15.csail.mit.edu + User yang + Host hz HostName hzproject.com User yzhang @@ -78,6 +138,7 @@ HostName zs.ath.cx User yang LocalForward 5901 localhost:5900 + ForwardX11 yes # Rohan K Host kr @@ -135,6 +196,7 @@ Host home HostName zs.ath.cx User yang + LocalForward 5901 localhost:5900 ForwardX11 yes Host ocf This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-19 23:18:58
|
Revision: 1101 http://assorted.svn.sourceforge.net/assorted/?rev=1101&view=rev Author: yangzhang Date: 2008-12-19 23:18:49 +0000 (Fri, 19 Dec 2008) Log Message: ----------- updated personal website Modified Paths: -------------- personal-site/trunk/src/index.txt personal-site/trunk/src/links.txt Modified: personal-site/trunk/src/index.txt =================================================================== --- personal-site/trunk/src/index.txt 2008-12-11 21:21:42 UTC (rev 1100) +++ personal-site/trunk/src/index.txt 2008-12-19 23:18:49 UTC (rev 1101) @@ -60,7 +60,7 @@ management system for on-line transaction processing workloads. - Invirt: [SIPB]'s virtual machine management system. The name of the MIT instance of this service is [XVM]. -- Corey: an exokernel operating system for many-core architectures. +- [Corey]: an exokernel operating system for many-core architectures. - [WebTables]: web-scale information retrieval over structured data. - [Scalable Resilient Overlay Networks]: distributed algorithms and systems to enable RONs to scale to thousands of nodes. @@ -95,6 +95,7 @@ [SIPB]: http://sipb.mit.edu/ [XVM]: http://xvm.mit.edu/ +[Corey]: http://pdos.csail.mit.edu/~sbw/corey/ [VoltDB]: http://db.cs.yale.edu/hstore/ [WebTables]: slides/webtables-presentation-google07.pdf @@ -162,6 +163,7 @@ Ming Wu (Microsoft Research Asia), Yuehua Dai (Xi'an Jiaotong University), Yang Zhang (MIT), Zheng Zhang (Microsoft Research Asia). _Corey: an operating system for many cores_. OSDI 2008. + [PDF](http://pdos.csail.mit.edu/papers/corey:osdi08.pdf) - Michael Cafarella, Alon Halevy, Daisy Wang, Eugene Wu, Yang Zhang. _WebTables: Exploring the Power of Tables on the Web_. VLDB 2008. Modified: personal-site/trunk/src/links.txt =================================================================== --- personal-site/trunk/src/links.txt 2008-12-11 21:21:42 UTC (rev 1100) +++ personal-site/trunk/src/links.txt 2008-12-19 23:18:49 UTC (rev 1101) @@ -29,18 +29,19 @@ Selected Blogroll ----------------- -- [A Neighborhood of Infinity]: Lightweight theoretical topics on everything - from category and type theory to quantum computing. Occasional Literate - Haskell posts. Low frequency. - [All Things Distributed]: Werner Vogels' blog (Amazon CTO and former CMU prof). Focus on distributed systems. Low frequency. - [Ars Technica]: Jon "Hannibal" Stokes' great explanatory articles on computer architecture topics. Low frequency. +- [C++ Reference Guide]: Details, details, details. Also covers C++0x. Low + frequency. - [Evan Jones' Scratch Pad]: Labmate's blog on systems topics with occasional low-level articles on programming. Low frequency. - [Geeking with Greg]: A blog by Greg Linden, the guy behind Amazon's recommendation and personalization systems, now at MS Live Labs. Lots of good discussion on IR research. Low frequency. +- [Haskell Weekly News]: Briefs about updates in the wonderful world of + Haskell. - [High Scalability]: Blog on systems and scalability, particularly in the web applications/datacenter universe. The author frequently provides useful summarizing notes. Medium frequency. @@ -56,15 +57,15 @@ - [p2p-hackers]: Discussion forum on anything related to networking and distributed systems, with no shortage of discussion on research in these fields. Medium frequency. -- [PBF Comics]: Sick, twisted humor. Now on hiatus, sadly. - [research!rsc]: Labmate Russ Cox's blog. Lots of focus on history of computing. Low frequency. -[A Neighborhood of Infinity]: http://sigfpe.blogspot.com/ [All Things Distributed]: http://www.allthingsdistributed.com/ [Ars Technica]: http://arstechnica.com/index.ars +[C++ Reference Guide]: http://www.informit.com/guides/guide.aspx?g=cplusplus [Evan Jones' Scratch Pad]: http://evanjones.ca/ [Geeking with Greg]: http://glinden.blogspot.com/ +[Haskell Weekly News]: http://sequence.complete.org/hwn/ [High Scalability]: http://www.highscalability.com/ [Kernel Trap]: http://kerneltrap.org/ [Linux kernel mailing list]: http://lkml.org/ @@ -72,7 +73,6 @@ [Linux Weekly News]: http://www.lwn.net/ [The Monad Reader]: http://www.haskell.org/haskellwiki/The_Monad.Reader [p2p-hackers]: http://lists.zooko.com/mailman/listinfo/p2p-hackers -[PBF Comics]: http://www.pbfcomics.com/ [research!rsc]: http://research.swtch.com/ Favorite Conferences This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-11 21:21:55
|
Revision: 1100 http://assorted.svn.sourceforge.net/assorted/?rev=1100&view=rev Author: yangzhang Date: 2008-12-11 21:21:42 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - added --debug-threads - added leader throughput measurements for before, during, and after recovery Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:13 UTC (rev 1099) +++ ydb/trunk/src/main.lzz.clamp 2008-12-11 21:21:42 UTC (rev 1100) @@ -76,10 +76,11 @@ * used anywhere. */ st_thread_t -my_spawn(const function0<void> &f, bool intr = false) +my_spawn(const function0<void> &f, string name, bool intr = false) { st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); threads.insert(t); + threadnames[t] = name; return t; } @@ -425,18 +426,41 @@ * Keep swallowing replica responses. */ void -handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) +handle_responses(st_netfd_t replica, const int &seqno, + st_multichannel<long long> &recover_signals, bool caught_up) { - long long start_time = current_time_millis(); + st_channel<long long> &sub = recover_signals.subscribe(); + long long start_time = current_time_millis(), + recovery_start_time = caught_up ? -1 : start_time, + recovery_end_time = -1; + int recovery_start_seqno = caught_up ? -1 : seqno, + recovery_end_seqno = -1; + finally f(lambda () { + long long end_time = current_time_millis(); + showtput("after recovery, finished", end_time, __ref(recovery_end_time), + __ref(seqno), __ref(recovery_end_seqno)); + }); while (true) { Response res; { st_intr intr(kill_hub); readmsg(replica, res); } + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = seqno; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = seqno; + showtput("during recovery, finished", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } if (!caught_up && res.caught_up()) { + long long t = current_time_millis(), timediff = t - start_time; caught_up = true; - long long timediff = current_time_millis() - start_time; + recover_signals.push(t); cout << "recovering node caught up; took " << timediff << "ms" << endl; } @@ -445,8 +469,9 @@ cout << "got response " << res.seqno() << " from " << replica << endl; st_sleep(0); } + // This is OK since the seqno will never grow again if stop_hub is set. if (stop_hub && res.seqno() + 1 == seqno) { - cout << "seqno = " << res.seqno() << endl; + cout << "stopping seqno = " << res.seqno() << endl; break; } } @@ -496,6 +521,7 @@ run_leader(int minreps, uint16_t leader_port) { cout << "starting as leader" << endl; + st_multichannel<long long> recover_signals; // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -534,14 +560,16 @@ int seqno = 0; st_channel<replica_info> newreps; const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); - st_thread_t swallower = my_spawn(bind(swallow, f)); + st_thread_t swallower = my_spawn(bind(swallow, f), "issue_txns"); foreach (const replica_info &r, replicas) newreps.push(r); st_joining join_swallower(swallower); // Start handling responses. st_thread_group handlers; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), + ref(recover_signals), true), + "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -555,12 +583,15 @@ cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); sendmsg(joiner, init); + recover_signals.push(current_time_millis()); // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), + ref(recover_signals), false), + "handle_responses")); } /** @@ -618,9 +649,11 @@ st_channel<shared_ptr<Txn> > backlog; st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), ref(seqno), ref(send_states), - ref(backlog), init.txnseqno()))); + ref(backlog), init.txnseqno()), + "process_txns")); st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_states)))); + ref(seqno), ref(send_states)), + "recover_joiner")); // If there's anything to recover. if (init.txnseqno() > 0) { @@ -695,6 +728,17 @@ } } +map<st_thread_t, string> threadnames; + +void cb() +{ + if (threadnames.find(st_thread_self()) != threadnames.end()) { + cout << "switched to: " << threadnames[st_thread_self()] << endl; + } else { + cout << "switched to: " << st_thread_self() << endl; + } +} + /** * Initialization and command-line parsing. */ @@ -705,7 +749,7 @@ try { GOOGLE_PROTOBUF_VERIFY_VERSION; - bool is_leader, use_epoll; + bool is_leader, use_epoll, debug_threads; int minreps; uint16_t leader_port, listen_port; string leader_host; @@ -714,6 +758,8 @@ po::options_description desc("Allowed options"); desc.add_options() ("help,h", "show this help message") + ("debug-threads,d",po::bool_switch(&debug_threads), + "enable context switch debug outputs") ("verbose,v", "enable periodic printing of txn processing progress") ("epoll,e", po::bool_switch(&use_epoll), "use epoll (select is used by default)") @@ -766,10 +812,14 @@ if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); st_spawn(bind(handle_sig_sync)); + if (debug_threads) { + st_set_switch_in_cb(cb); + } // Initialize thread manager for clean shutdown of all threads. thread_eraser eraser; threads.insert(st_thread_self()); + threadnames[st_thread_self()] = "main"; // Which role are we? if (is_leader) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-11 21:21:19
|
Revision: 1099 http://assorted.svn.sourceforge.net/assorted/?rev=1099&view=rev Author: yangzhang Date: 2008-12-11 21:21:13 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - added st_multichannel - fixed namespace issues Modified Paths: -------------- cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2008-12-11 19:16:31 UTC (rev 1098) +++ cpp-commons/trunk/src/commons/st/st.h 2008-12-11 21:21:13 UTC (rev 1099) @@ -20,6 +20,7 @@ namespace commons { using namespace boost; + using namespace std; enum { default_stack_size = 65536 }; @@ -239,6 +240,28 @@ }; /** + * An unbounded FIFO multi-cast channel, i.e. publish-subscribe. + * \todo Use shared_ptr. + */ + template <typename T> + class st_multichannel + { + public: + void push(const T &x) { + foreach (st_channel<T> *q, qs) { + q->push(x); + } + } + st_channel<T> &subscribe() { + st_channel<T>* q = new st_channel<T>; + qs.push_back(q); + return *q; + } + private: + vector<st_channel<T>*> qs; + }; + + /** * A hub is a single point to signal to wake up a set of threads. Threads * join the hub before calling a blocking operation if they want to make * themselves interruptible on this hub. @@ -266,7 +289,7 @@ threads.clear(); } private: - set<st_thread_t> threads; + std::set<st_thread_t> threads; }; /** @@ -366,7 +389,7 @@ } void insert(st_thread_t t) { ts.insert(t); } private: - set<st_thread_t> ts; + std::set<st_thread_t> ts; }; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-11 19:16:42
|
Revision: 1098 http://assorted.svn.sourceforge.net/assorted/?rev=1098&view=rev Author: yangzhang Date: 2008-12-11 19:16:31 +0000 (Thu, 11 Dec 2008) Log Message: ----------- - started using clamp - added benchmark performance summary output - fixed unnecessary premature spawn and final join with recover_joiner - fixed issue where stop_hub wasn't being interrupted - added optional time limit as an option, causing issue_txns to stop after some time - added default value for listen port (since nodes are frequently run on different hosts) - added some comment documentation - my_spawn doesn't interrupt other threads by default on exceptions - reworked the communication/synchronization between process_txns and recover_joiner - made channels safer - added full test/benchmark setup & deployment scripts - added patch to get clamp to build Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile Added Paths: ----------- ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/ ydb/trunk/tools/clamp.patch ydb/trunk/tools/test.bash Removed Paths: ------------- ydb/trunk/src/main.lzz Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/README 2008-12-11 19:16:31 UTC (rev 1098) @@ -29,6 +29,7 @@ - [boost] 1.37 - [C++ Commons] svn r1082 +- [clamp] 153 - [GCC] 4.3.2 - [Lazy C++] 2.8.0 - [Protocol Buffers] 2.0.0 @@ -36,6 +37,7 @@ [boost]: http://www.boost.org/ [C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ +[clamp]: http://home.clara.net/raoulgough/clamp/ [GCC]: http://gcc.gnu.org/ [Lazy C++]: http://www.lazycplusplus.com/ [Protocol Buffers]: http://code.google.com/p/protobuf/ @@ -67,8 +69,28 @@ To terminate the system, send a sigint (ctrl-c) to the leader, and a clean shutdown should take place. The replicas dump their DB state to a tmp file, -which you can then verify to be identical. +which you can then verify to be identical. You can also send a sigint to a +replica to stop just that node. If something goes awry, you can send a second +sigint to try to force all working threads to shut down (any node, including +replicas, respond to ctrl-c). +Full System Test +---------------- + + ./test.bash full + +will configure all the farm machines to (1) have my proper initial environment, +(2) have all the prerequisite software, and (3) build ydb. This may take a +long time (particularly the boost-building phase). + + range='10 13' wait=5 ./test.bash run + +will run a leader on farm10, replicas on farm11 and farm12, and a recovering +replica on farm13 after 5 seconds. Pipe several runs of this to some files +(`*.out`), and plot the results with + + ./test.bash plot *.out + Recovery Mechanisms ------------------- @@ -117,11 +139,7 @@ Todo ---- -- Expose program options. - -- Add test suite. - -- Add benchmarking hooks, e.g.: +- Add benchmarking/testing hooks, e.g.: - start the recovering joiner at a well-defined time (after a certain # txns or after the DB reaches a certain size) @@ -136,6 +154,9 @@ - Figure out why things are running so slowly with >2 replicas. +- Add a network recovery scheme that grabs state partitions in parallel from + all other replicas. + - Add a variant of the recovery scheme so that the standing replicas can just send any snapshot of their DB beyond a certain seqno. The joiner can simply discard from its leader-populated backlog any txns before the seqno of the Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/src/Makefile 2008-12-11 19:16:31 UTC (rev 1098) @@ -1,7 +1,7 @@ TARGET := ydb WTF := wtf -LZZS := $(wildcard *.lzz) +LZZS := $(patsubst %.clamp,%,$(wildcard *.lzz.clamp)) LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) @@ -51,12 +51,15 @@ %.pb.h: %.proto protoc --cpp_out=. $< +%.lzz: %.lzz.clamp + clamp < $< | sed "`echo -e '1i#src\n1a#end'`" > $@ + clean: - rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) + rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h doc: $(SRCS) $(HDRS) doxygen .PHONY: clean -.SECONDARY: $(SRCS) $(HDRS) $(OBJS) +.SECONDARY: $(SRCS) $(HDRS) $(OBJS) main.lzz Deleted: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-08 07:57:26 UTC (rev 1097) +++ ydb/trunk/src/main.lzz 2008-12-11 19:16:31 UTC (rev 1098) @@ -1,712 +0,0 @@ -#hdr -#include <boost/bind.hpp> -#include <boost/foreach.hpp> -#include <boost/program_options.hpp> -#include <boost/scoped_array.hpp> -#include <commons/nullptr.h> -#include <commons/rand.h> -#include <commons/st/st.h> -#include <commons/time.h> -#include <csignal> // sigaction etc. -#include <cstdio> -#include <cstring> // strsignal -#include <iostream> -#include <fstream> -#include <map> -#include <netinet/in.h> // in_addr etc. -#include <set> -#include <sys/socket.h> // getpeername -#include <sys/types.h> // ssize_t -#include <unistd.h> // pipe, write -#include <vector> -#include "ydb.pb.h" -#define foreach BOOST_FOREACH -using namespace boost; -using namespace commons; -using namespace std; -#end - -typedef pair<int, int> pii; -st_utime_t timeout; -int chkpt; -bool verbose; -bool yield_during_build_up; -bool yield_during_catch_up; -st_intr_bool stop_hub, kill_hub; - -/** - * The list of all threads. Keep track of these so that we may cleanly shut - * down all threads. - */ -set<st_thread_t> threads; - -class thread_eraser -{ - public: - thread_eraser() { threads.insert(st_thread_self()); } - ~thread_eraser() { threads.erase(st_thread_self()); } -}; - -/** - * Delegate for running thread targets. - * \param[in] f The function to execute. - * \param[in] intr Whether to signal stop_hub on an exception. - */ -void -my_spawn_helper(const function0<void> f, bool intr) -{ - thread_eraser eraser; - try { - f(); - } catch (const std::exception &ex) { - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; - if (intr) stop_hub.set(); - } -} - -/** - * Spawn a thread using ST but wrap it in an exception handler that interrupts - * all other threads (hopefully causing them to unwind). - */ -st_thread_t -my_spawn(const function0<void> &f, bool intr = true) -{ - st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); - threads.insert(t); - return t; -} - -/** - * Used by the leader to bookkeep information about replicas. - */ -class replica_info -{ - public: - replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} - st_netfd_t fd() const { return fd_; } - /** The port on which the replica is listening. */ - uint16_t port() const { return port_; } -#hdr -#define GETSA sockaddr_in sa; sockaddr(sa); return sa -#end - /** The port on which the replica connected to us. */ - uint16_t local_port() const { GETSA.sin_port; } - uint32_t host() const { GETSA.sin_addr.s_addr; } - sockaddr_in sockaddr() const { GETSA; } - void sockaddr(sockaddr_in &sa) const { - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(fd_), - reinterpret_cast<struct sockaddr*>(&sa), - &salen)); - } - private: - st_netfd_t fd_; - uint16_t port_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all_infos -{ - public: - st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all_infos() { - foreach (replica_info r, rs_) - check0x(st_netfd_close(r.fd())); - } - private: - const vector<replica_info> &rs_; -}; - -/** - * RAII to close all contained netfds. - */ -class st_closing_all -{ - public: - st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} - ~st_closing_all() { - foreach (st_netfd_t r, rs_) - check0x(st_netfd_close(r)); - } - private: - const vector<st_netfd_t> &rs_; -}; - -/** - * RAII for dumping the final state of the DB to a file on disk. - */ -class dump_state -{ - public: - dump_state(const map<int, int> &map, const int &seqno) - : map_(map), seqno_(seqno) {} - ~dump_state() { - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << seqno_ << endl; - foreach (const pii &p, map_) { - of << p.first << ": " << p.second << endl; - } - } - private: - const map<int, int> &map_; - const int &seqno_; -}; - -/** - * Send a message to some destinations (sequentially). - */ -template<typename T> -void -bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) -{ - // Serialize message to a buffer. - string s; - check(msg.SerializeToString(&s)); - const char *buf = s.c_str(); - - // Prefix the message with a four-byte length. - uint32_t len = htonl(static_cast<uint32_t>(s.size())); - - // Broadcast the length-prefixed message to replicas. - foreach (st_netfd_t dst, dsts) { - checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), - static_cast<ssize_t>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(s.size())); - } -} - -/** - * Send a message to a single recipient. - */ -template<typename T> -void -sendmsg(st_netfd_t dst, const T &msg) -{ - vector<st_netfd_t> dsts(1, dst); - bcastmsg(dsts, msg); -} - -/** - * Read a message. - */ -template <typename T> -void -readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - // Read the message length. - uint32_t len; - checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - timeout), - static_cast<ssize_t>(sizeof len)); - len = ntohl(len); - -#define GETMSG(buf) \ - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ - check(msg.ParseFromArray(buf, len)); - - // Parse the message body. - if (len < 4096) { - char buf[len]; - GETMSG(buf); - } else { - cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; - scoped_array<char> buf(new char[len]); - GETMSG(buf.get()); - } -} - -/** - * Same as the above readmsg(), but returns an internally constructed message. - * This is a "higher-level" readmsg() that relies on return-value optimization - * for avoiding unnecessary copies. - */ -template <typename T> -T -readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) -{ - T msg; - readmsg(src, msg, timeout); - return msg; -} - -/** - * Keep issuing transactions to the replicas. - */ -void -issue_txns(st_channel<replica_info> &newreps, int &seqno) -{ - Op_OpType types[] = {Op::read, Op::write, Op::del}; - vector<st_netfd_t> fds; - - while (!stop_hub) { - // Did we get a new member? - if (!newreps.empty() && seqno > 0) { - sendmsg(fds[0], Txn()); - } - while (!newreps.empty()) { - fds.push_back(newreps.take().fd()); - } - - // Generate a random transaction. - Txn txn; - txn.set_seqno(seqno++); - int count = randint(5) + 1; - for (int o = 0; o < count; o++) { - Op *op = txn.add_op(); - int rtype = randint(3), rkey = randint(), rvalue = randint(); - op->set_type(types[rtype]); - op->set_key(rkey); - op->set_value(rvalue); - } - - // Broadcast. - bcastmsg(fds, txn); - - // Checkpoint. - if (txn.seqno() % chkpt == 0) { - if (verbose) cout << "issued txn " << txn.seqno() << endl; - st_sleep(0); - } - } -} - -/** - * Process a transaction: update DB state (incl. seqno) and send response to - * leader. - */ -void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, - bool caught_up) -{ - checkeq(txn.seqno(), seqno + 1); - Response res; - res.set_seqno(txn.seqno()); - res.set_caught_up(caught_up); - seqno = txn.seqno(); - for (int o = 0; o < txn.op_size(); o++) { - const Op &op = txn.op(o); - switch (op.type()) { - case Op::read: - res.add_result(map[op.key()]); - break; - case Op::write: - map[op.key()] = op.value(); - break; - case Op::del: - map.erase(op.key()); - break; - } - } - sendmsg(leader, res); -} - -/** - * Actually do the work of executing a transaction and sending back the reply. - */ -void -process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, - st_bool &send_state, st_bool &sent_state, - st_channel<Txn*> &backlog) -{ - while (true) { - Txn txn; - { - st_intr intr(kill_hub); - readmsg(leader, txn); - } - - if (txn.has_seqno()) { - if (txn.seqno() == seqno + 1) { - process_txn(leader, map, txn, seqno, true); - } else { - // Queue up for later processing once a snapshot has been received. - backlog.push(new Txn(txn)); - } - - if (txn.seqno() % chkpt == 0) { - if (verbose) cout << "processed txn " << txn.seqno() << endl; - st_sleep(0); - } - } else { - // Wait for the snapshot to be generated. - send_state.set(); - cout << "waiting for state to be sent" << endl; - sent_state.waitset(); - sent_state.reset(); - cout << "state sent" << endl; - } - } -} - -/** - * Keep swallowing replica responses. - */ -void -handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) -{ - long long start_time = current_time_millis(); - while (true) { - Response res; - { - st_intr intr(kill_hub); - readmsg(replica, res); - } - if (!caught_up && res.caught_up()) { - caught_up = true; - cout << "recovering node caught up; took " - << current_time_millis() - start_time << "ms" << endl; - } - if (res.seqno() % chkpt == 0) { - if (verbose) - cout << "got response " << res.seqno() << " from " << replica << endl; - st_sleep(0); - } - if (stop_hub && res.seqno() + 1 == seqno) { - cout << "seqno = " << res.seqno() << endl; - break; - } - } -} - -/** - * Help the recovering node. - */ -void -recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, - st_bool &send_state, st_bool &sent_state) -{ - // Wait for the right time to generate the snapshot. - { - st_intr intr(stop_hub); - send_state.waitset(); - } - send_state.reset(); - - cout << "snapshotting state for recovery" << endl; - Recovery recovery; - foreach (const pii &p, map) { - Recovery_Pair *pair = recovery.add_pair(); - pair->set_key(p.first); - pair->set_value(p.second); - } - recovery.set_seqno(seqno); - - // Notify process_txns that it may continue processing. - sent_state.set(); - - // Wait for the new joiner. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - st_closing closing(joiner); - - cout << "got joiner's connection, sending recovery" << endl; - sendmsg(joiner, recovery); - cout << "sent" << endl; -} - -/** - * Run the leader. - */ -void -run_leader(int minreps, uint16_t leader_port) -{ - cout << "starting as leader" << endl; - cout << "waiting for at least " << minreps << " replicas to join" << endl; - - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(leader_port); - st_closing close_listener(listener); - // TODO rename these - vector<replica_info> replicas; - st_closing_all_infos close_replicas(replicas); - for (int i = 0; i < minreps; i++) { - st_netfd_t fd; - { - st_intr intr(stop_hub); - fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(fd); - replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); - } - - // Construct the initialization message. - Init init; - init.set_txnseqno(0); - foreach (replica_info r, replicas) { - SockAddr *psa = init.add_node(); - psa->set_host(r.host()); - psa->set_port(r.port()); - } - - // Send init to each initial replica. - foreach (replica_info r, replicas) { - init.set_yourhost(r.host()); - sendmsg(r.fd(), init); - } - - // Start dispatching queries. - int seqno = 0; - st_channel<replica_info> newreps; - const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); - st_thread_t swallower = my_spawn(bind(swallow, f)); - foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_swallower(swallower); - - // Start handling responses. - st_thread_group handlers; - foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); - } - - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner; - { - st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - } - Join join = readmsg<Join>(joiner); - cout << "setting seqno to " << seqno << endl; - init.set_txnseqno(seqno); - sendmsg(joiner, init); - - // Start streaming txns to joiner. - cout << "start streaming txns to joiner" << endl; - replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); - newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); -} - -/** - * Run a replica. - */ -void -run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) -{ - // Initialize database state. - map<int, int> map; - int seqno = -1; - dump_state ds(map, seqno); - st_bool send_state, sent_state; - - cout << "starting as replica on port " << listen_port << endl; - - // Listen for connections from other replicas. - st_netfd_t listener = - st_tcp_listen(listen_port); - st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_state), - ref(sent_state)))); - - // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, - timeout); - st_closing closing(leader); - Join join; - join.set_port(listen_port); - sendmsg(leader, join); - Init init = readmsg<Init>(leader); - uint32_t listen_host = init.yourhost(); - - // Display the info. - cout << "got init msg with txn seqno " << init.txnseqno() - << " and hosts:" << endl; - vector<st_netfd_t> replicas; - st_closing_all close_replicas(replicas); - for (uint16_t i = 0; i < init.node_size(); i++) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host = { sa.host() }; - bool is_self = sa.host() == listen_host && sa.port() == listen_port; - cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, - INET_ADDRSTRLEN)) - << ':' << sa.port() << (is_self ? " (self)" : "") << endl; - if (!is_self && init.txnseqno() > 0) { - replicas.push_back(st_tcp_connect(host, - static_cast<uint16_t>(sa.port()), - ST_UTIME_NO_TIMEOUT)); - } - } - - // Process txns. - st_channel<Txn*> backlog; - st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), - ref(seqno), ref(send_state), - ref(sent_state), ref(backlog)))); - - // If there's anything to recover. - if (init.txnseqno() > 0) { - cout << "waiting for recovery from " << replicas[0] << endl; - - // Read the recovery message. - Recovery recovery; - { - st_intr intr(stop_hub); - readmsg(replicas[0], recovery); - } - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); - if (i % chkpt == 0) { - if (yield_during_build_up) st_sleep(0); - } - } - assert(seqno == -1 && - static_cast<typeof(seqno)>(recovery.seqno()) > seqno); - seqno = recovery.seqno(); - cout << "recovered." << endl; - - while (!backlog.empty()) { - Txn *p = backlog.take(); - process_txn(leader, map, *p, seqno, false); - if (p->seqno() % chkpt == 0) { - cout << "processed txn " << p->seqno() << " off the backlog" << endl; - if (yield_during_catch_up) st_sleep(0); - } - delete p; - } - cout << "caught up." << endl; - } -} - -int sig_pipe[2]; - -/** - * Raw signal handler that triggers the (synchronous) handler. - */ -void handle_sig(int sig) { - int err = errno; - cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; - checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), - static_cast<ssize_t>(sizeof sig)); - errno = err; -} - -/** - * Synchronous part of the signal handler; cleanly interrrupts any threads that - * have marked themselves as interruptible. - */ -void handle_sig_sync() { - stfd fd = checkerr(st_netfd_open(sig_pipe[0])); - while (true) { - int sig; - checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(sizeof sig)); - if (sig == SIGINT) { - if (!stop_hub) stop_hub.set(); - else kill_hub.set(); - } else if (sig == SIGTERM) { - foreach (st_thread_t t, threads) { - st_thread_interrupt(t); - } - } - break; - } -} - -/** - * Initialization and command-line parsing. - */ -int -main(int argc, char **argv) -{ - namespace po = boost::program_options; - try { - GOOGLE_PROTOBUF_VERIFY_VERSION; - - bool is_leader, use_epoll; - int minreps; - uint16_t leader_port, listen_port; - string leader_host; - - // Parse options. - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "show this help message") - ("verbose,v", "enable periodic printing of txn processing progress") - ("epoll,e", po::bool_switch(&use_epoll), - "use epoll (select is used by default)") - ("yield-build-up", po::bool_switch(&yield_during_build_up), - "yield periodically during build-up phase of recovery") - ("yield-catch-up", po::bool_switch(&yield_during_catch_up), - "yield periodically during catch-up phase of recovery") - ("leader,l", po::bool_switch(&is_leader), - "run the leader (run replica by default)") - ("leader-host,H", - po::value<string>(&leader_host)->default_value(string("localhost")), - "hostname or address of the leader") - ("leader-port,P", - po::value<uint16_t>(&leader_port)->default_value(7654), - "port the leader listens on") - ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), - "number of txns before yielding/verbose printing") - ("listen-port,p", po::value<uint16_t>(&listen_port), - "port to listen on (replicas only)") - ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), - "timeout for IO operations (in microseconds)") - ("minreps,n", po::value<int>(&minreps)->default_value(2), - "minimum number of replicas the system is willing to process txns on"); - - po::variables_map vm; - try { - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - - if (vm.count("help")) { - cout << desc << endl; - return 0; - } - if (!is_leader && !vm.count("listen-port")) { - class parse_exception : public std::exception { - virtual const char *what() const throw() { - return "running replica requires listen port to be specified"; - } - }; - throw parse_exception(); - } - } catch (std::exception &ex) { - cerr << ex.what() << endl << endl << desc << endl; - return 1; - } - - // Initialize support for ST working with asynchronous signals. - check0x(pipe(sig_pipe)); - struct sigaction sa; - sa.sa_handler = handle_sig; - check0x(sigemptyset(&sa.sa_mask)); - sa.sa_flags = 0; - check0x(sigaction(SIGINT, &sa, nullptr)); - - // Initialize ST. - if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); - check0x(st_init()); - st_spawn(bind(handle_sig_sync)); - - // Initialize thread manager for clean shutdown of all threads. - thread_eraser eraser; - threads.insert(st_thread_self()); - - // Which role are we? - if (is_leader) { - run_leader(minreps, leader_port); - } else { - run_replica(leader_host, leader_port, listen_port); - } - - return 0; - } catch (const std::exception &ex) { - // Must catch all exceptions at the top to make the stack unwind. - cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; - return 1; - } -} Copied: ydb/trunk/src/main.lzz.clamp (from rev 1093, ydb/trunk/src/main.lzz) =================================================================== --- ydb/trunk/src/main.lzz.clamp (rev 0) +++ ydb/trunk/src/main.lzz.clamp 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,787 @@ +#hdr +#include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/program_options.hpp> +#include <boost/scoped_array.hpp> +#include <boost/shared_ptr.hpp> +#include <commons/nullptr.h> +#include <commons/rand.h> +#include <commons/st/st.h> +#include <commons/time.h> +#include <csignal> // sigaction etc. +#include <cstdio> +#include <cstring> // strsignal +#include <iostream> +#include <fstream> +#include <map> +#include <netinet/in.h> // in_addr etc. +#include <set> +#include <sys/socket.h> // getpeername +#include <sys/types.h> // ssize_t +#include <unistd.h> // pipe, write +#include <vector> +#include "ydb.pb.h" +#define foreach BOOST_FOREACH +using namespace boost; +using namespace commons; +using namespace std; +#end + +typedef pair<int, int> pii; +st_utime_t timeout; +int chkpt; +bool verbose, yield_during_build_up, yield_during_catch_up; +long long timelim; +st_intr_bool stop_hub, kill_hub; + +/** + * The list of all threads. Keep track of these so that we may cleanly shut + * down all threads. + */ +set<st_thread_t> threads; + +/** + * RAII for adding/removing the current thread from the global threads set. + */ +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. + */ +void +my_spawn_helper(const function0<void> f, bool intr) +{ + thread_eraser eraser; + try { + f(); + } catch (const std::exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() + << (intr ? "; interrupting!" : "") << endl; + if (intr) stop_hub.set(); + } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. Not actually + * used anywhere. + */ +st_thread_t +my_spawn(const function0<void> &f, bool intr = false) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); + threads.insert(t); + return t; +} + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } +#hdr +#define GETSA sockaddr_in sa; sockaddr(sa); return sa +#end + /** The port on which the replica connected to us. */ + uint16_t local_port() const { GETSA.sin_port; } + uint32_t host() const { GETSA.sin_addr.s_addr; } + sockaddr_in sockaddr() const { GETSA; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all_infos +{ + public: + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +/** + * RAII for dumping the final state of the DB to a file on disk. + */ +class dump_state +{ + public: + dump_state(const map<int, int> &map, const int &seqno) + : map_(map), seqno_(seqno) {} + ~dump_state() { + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << seqno_ << endl; + foreach (const pii &p, map_) { + of << p.first << ": " << p.second << endl; + } + } + private: + const map<int, int> &map_; + const int &seqno_; +}; + +/** + * Send a message to some destinations (sequentially). + */ +template<typename T> +void +bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + foreach (st_netfd_t dst, dsts) { + checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), + static_cast<ssize_t>(sizeof len)); + checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(s.size())); + } +} + +/** + * Send a message to a single recipient. + */ +template<typename T> +void +sendmsg(st_netfd_t dst, const T &msg) +{ + vector<st_netfd_t> dsts(1, dst); + bcastmsg(dsts, msg); +} + +/** + * Read a message. + */ +template <typename T> +void +readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + // Read the message length. + uint32_t len; + checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, + timeout), + static_cast<ssize_t>(sizeof len)); + len = ntohl(len); + +#define GETMSG(buf) \ + checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ + check(msg.ParseFromArray(buf, len)); + + // Parse the message body. + if (len < 4096) { + char buf[len]; + GETMSG(buf); + } else { + cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + scoped_array<char> buf(new char[len]); + GETMSG(buf.get()); + } +} + +/** + * Same as the above readmsg(), but returns an internally constructed message. + * This is a "higher-level" readmsg() that relies on return-value optimization + * for avoiding unnecessary copies. + */ +template <typename T> +T +readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) +{ + T msg; + readmsg(src, msg, timeout); + return msg; +} + +/** + * Keep issuing transactions to the replicas. + */ +void +issue_txns(st_channel<replica_info> &newreps, int &seqno) +{ + Op_OpType types[] = {Op::read, Op::write, Op::del}; + vector<st_netfd_t> fds; + long long start_time = current_time_millis(); + + finally f(lambda () { + showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), + 0); + }); + + while (!stop_hub) { + // Did we get a new member? + if (!newreps.empty() && seqno > 0) { + sendmsg(fds[0], Txn()); + } + while (!newreps.empty()) { + fds.push_back(newreps.take().fd()); + } + + // Generate a random transaction. + Txn txn; + txn.set_seqno(seqno++); + int count = randint(5) + 1; + for (int o = 0; o < count; o++) { + Op *op = txn.add_op(); + int rtype = randint(3), rkey = randint(), rvalue = randint(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } + + // Broadcast. + bcastmsg(fds, txn); + + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "issued txn " << txn.seqno() << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << txn.seqno() << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + st_sleep(0); + } + } +} + +/** + * Process a transaction: update DB state (incl. seqno) and send response to + * leader. + */ +void +process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, + bool caught_up) +{ + checkeq(txn.seqno(), seqno + 1); + Response res; + res.set_seqno(txn.seqno()); + res.set_caught_up(caught_up); + seqno = txn.seqno(); + for (int o = 0; o < txn.op_size(); o++) { + const Op &op = txn.op(o); + switch (op.type()) { + case Op::read: + res.add_result(map[op.key()]); + break; + case Op::write: + map[op.key()] = op.value(); + break; + case Op::del: + map.erase(op.key()); + break; + } + } + sendmsg(leader, res); +} + +void +showtput(const string &action, long long stop_time, long long start_time, + int stop_count, int start_count) +{ + long long time_diff = stop_time - start_time; + int count_diff = stop_count - start_count; + double rate = double(count_diff) * 1000 / time_diff; + cout << action << " " << count_diff << " txns in " << time_diff << " ms (" + << rate << "tps)" << endl; +} + +/** + * Actually do the work of executing a transaction and sending back the reply. + * + * \param[in] leader The connection to the leader. + * + * \param[in] map The data store. + * + * \param[in] seqno The sequence number last seen. This always starts at 0, + * but may be bumped up by the recovery procedure. + * + * \param[in] send_states Channel of snapshots of the database state to send to + * recovering nodes (sent to recover_joiner). + * + * \param[in] backlog The backlog of txns that need to be processed. + * + * \param[in] init_seqno The seqno that was sent in the Init message from the + * leader. Not entirely clear that this is necessary; could probably just go + * with seqno. + */ +void +process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, + st_channel<shared_ptr<Recovery> > &send_states, + st_channel<shared_ptr<Txn> > &backlog, int init_seqno) +{ + bool caught_up = init_seqno == 0; + long long start_time = current_time_millis(), + time_caught_up = caught_up ? start_time : -1; + int seqno_caught_up = caught_up ? seqno : -1; + + finally f(lambda () { + long long now = current_time_millis(); + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << + endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(shared_ptr<Recovery>()); + }); + + while (true) { + Txn txn; + { + st_intr intr(stop_hub); + readmsg(leader, txn); + } + + if (txn.has_seqno()) { + if (txn.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("backlogged", time_caught_up, start_time, seqno_caught_up, + init_seqno); + caught_up = true; + } + process_txn(leader, map, txn, seqno, true); + } else { + // Queue up for later processing once a snapshot has been received. + backlog.push(shared_ptr<Txn>(new Txn(txn))); + } + + if (txn.seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << txn.seqno() + << "; db size = " << map.size() << endl; + st_sleep(0); + } + } else { + // Generate a snapshot. + shared_ptr<Recovery> recovery(new Recovery); + foreach (const pii &p, map) { + Recovery_Pair *pair = recovery->add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + recovery->set_seqno(seqno); + send_states.push(recovery); + } + } + +} + +/** + * Keep swallowing replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) +{ + long long start_time = current_time_millis(); + while (true) { + Response res; + { + st_intr intr(kill_hub); + readmsg(replica, res); + } + if (!caught_up && res.caught_up()) { + caught_up = true; + long long timediff = current_time_millis() - start_time; + cout << "recovering node caught up; took " + << timediff << "ms" << endl; + } + if (res.seqno() % chkpt == 0) { + if (verbose) + cout << "got response " << res.seqno() << " from " << replica << endl; + st_sleep(0); + } + if (stop_hub && res.seqno() + 1 == seqno) { + cout << "seqno = " << res.seqno() << endl; + break; + } + } +} + +/** + * Help the recovering node. + * + * \param[in] listener The connection on which we're listening for connections + * from recovering joiners. + * + * \param[in] map The database state. + * + * \param[in] seqno The sequence number. Always starts at 0. + * + * \param[in] send_states Channel of snapshots of the database state to receive + * from process_txns. + */ +void +recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, + st_channel<shared_ptr<Recovery> > &send_states) +{ + st_netfd_t joiner; + shared_ptr<Recovery> recovery; + { + st_intr intr(stop_hub); + // Wait for the snapshot. + recovery = send_states.take(); + if (recovery == nullptr) { + return; + } + // Wait for the new joiner. + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + + st_closing closing(joiner); + cout << "got joiner's connection, sending recovery" << endl; + sendmsg(joiner, *recovery); + cout << "sent" << endl; +} + +/** + * Run the leader. + */ +void +run_leader(int minreps, uint16_t leader_port) +{ + cout << "starting as leader" << endl; + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(leader_port); + st_closing close_listener(listener); + vector<replica_info> replicas; + st_closing_all_infos close_replicas(replicas); + cout << "waiting for at least " << minreps << " replicas to join" << endl; + for (int i = 0; i < minreps; i++) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join = readmsg<Join>(fd); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } + cout << "got all " << minreps << " replicas" << endl; + + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } + + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } + + // Start dispatching queries. + int seqno = 0; + st_channel<replica_info> newreps; + const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); + st_thread_t swallower = my_spawn(bind(swallow, f)); + foreach (const replica_info &r, replicas) newreps.push(r); + st_joining join_swallower(swallower); + + // Start handling responses. + st_thread_group handlers; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join = readmsg<Join>(joiner); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); + + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); +} + +/** + * Run a replica. + */ +void +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) +{ + // Initialize database state. + map<int, int> map; + int seqno = -1; + dump_state ds(map, seqno); + st_channel<shared_ptr<Recovery> > send_states; + + cout << "starting as replica on port " << listen_port << endl; + + // Listen for connections from other replicas. + st_netfd_t listener = st_tcp_listen(listen_port); + + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + { + st_intr intr(stop_hub); + readmsg(leader, init); + } + uint32_t listen_host = init.yourhost(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); + for (uint16_t i = 0; i < init.node_size(); i++) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (!is_self && init.txnseqno() > 0) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); + } + } + + // Process txns. + st_channel<shared_ptr<Txn> > backlog; + st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), + ref(seqno), ref(send_states), + ref(backlog), init.txnseqno()))); + st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_states)))); + + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery from " << replicas[0] << endl; + + // Read the recovery message. + Recovery recovery; + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); + } + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered." << endl; + + while (!backlog.empty()) { + shared_ptr<Txn> p = backlog.take(); + process_txn(leader, map, *p, seqno, false); + if (p->seqno() % chkpt == 0) { + if (verbose) + cout << "processed txn " << p->seqno() << " off the backlog" << endl; + if (yield_during_catch_up) + st_sleep(0); + } + } + cout << "caught up." << endl; + } + + stop_hub.insert(st_thread_self()); +} + +int sig_pipe[2]; + +/** + * Raw signal handler that triggers the (synchronous) handler. + */ +void handle_sig(int sig) { + int err = errno; + cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), + static_cast<ssize_t>(sizeof sig)); + errno = err; +} + +/** + * Synchronous part of the signal handler; cleanly interrrupts any threads that + * have marked themselves as interruptible. + */ +void handle_sig_sync() { + stfd fd = checkerr(st_netfd_open(sig_pipe[0])); + while (true) { + int sig; + checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(sizeof sig)); + if (sig == SIGINT) { + if (!stop_hub) stop_hub.set(); + else kill_hub.set(); + } else if (sig == SIGTERM) { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); + } + } + break; + } +} + +/** + * Initialization and command-line parsing. + */ +int +main(int argc, char **argv) +{ + namespace po = boost::program_options; + try { + GOOGLE_PROTOBUF_VERIFY_VERSION; + + bool is_leader, use_epoll; + int minreps; + uint16_t leader_port, listen_port; + string leader_host; + + // Parse options. + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("verbose,v", "enable periodic printing of txn processing progress") + ("epoll,e", po::bool_switch(&use_epoll), + "use epoll (select is used by default)") + ("yield-build-up", po::bool_switch(&yield_during_build_up), + "yield periodically during build-up phase of recovery") + ("yield-catch-up", po::bool_switch(&yield_during_catch_up), + "yield periodically during catch-up phase of recovery") + ("leader,l", po::bool_switch(&is_leader), + "run the leader (run replica by default)") + ("leader-host,H", + po::value<string>(&leader_host)->default_value(string("localhost")), + "hostname or address of the leader") + ("leader-port,P", + po::value<uint16_t>(&leader_port)->default_value(7654), + "port the leader listens on") + ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), + "number of txns before yielding/verbose printing") + ("timelim,T", po::value<long long>(&timelim)->default_value(0), + "time limit in milliseconds, or 0 for none") + ("listen-port,p", po::value<uint16_t>(&listen_port)->default_value(7654), + "port to listen on (replicas only)") + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + "timeout for IO operations (in microseconds)") + ("minreps,n", po::value<int>(&minreps)->default_value(2), + "minimum number of replicas the system is willing to process txns on"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + } catch (std::exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + + // Initialize support for ST working with asynchronous signals. + check0x(pipe(sig_pipe)); + struct sigaction sa; + sa.sa_handler = handle_sig; + check0x(sigemptyset(&sa.sa_mask)); + sa.sa_flags = 0; + check0x(sigaction(SIGINT, &sa, nullptr)); + + // Initialize ST. + if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); + check0x(st_init()); + st_spawn(bind(handle_sig_sync)); + + // Initialize thread manager for clean shutdown of all threads. + thread_eraser eraser; + threads.insert(st_thread_self()); + + // Which role are we? + if (is_leader) { + run_leader(minreps, leader_port); + } else { + run_replica(leader_host, leader_port, listen_port); + } + + return 0; + } catch (const std::exception &ex) { + // Must catch all exceptions at the top to make the stack unwind. + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + return 1; + } +} Property changes on: ydb/trunk/src/main.lzz.clamp ___________________________________________________________________ Added: svn:mergeinfo + Added: ydb/trunk/tools/clamp.patch =================================================================== --- ydb/trunk/tools/clamp.patch (rev 0) +++ ydb/trunk/tools/clamp.patch 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,29 @@ +Only in clamp_053_new/: clamp +diff -u -r clamp_053/CodeGen.cc clamp_053_new/CodeGen.cc +--- clamp_053/CodeGen.cc 2003-09-30 18:44:04.000000000 -0400 ++++ clamp_053_new/CodeGen.cc 2008-12-11 01:25:30.000000000 -0500 +@@ -20,6 +20,7 @@ + + #include "CodeGen.hh" + ++#include <climits> + #include <sstream> + #include <cassert> + #include <iostream> +Binary files clamp_053/CodeGen.o and clamp_053_new/CodeGen.o differ +Only in clamp_053_new/: lambda_impl.clamp_h +diff -u -r clamp_053/Makefile clamp_053_new/Makefile +--- clamp_053/Makefile 2003-09-30 18:44:05.000000000 -0400 ++++ clamp_053_new/Makefile 2008-12-11 03:48:32.000000000 -0500 +@@ -27,9 +27,7 @@ + # pieces, depending on your set-up: CXX, CC, LEX and -I ...boost... + # + +-CXX = f:/mingw/bin/g++ +-CC = f:/mingw/bin/gcc +-LEX = f:/mingw/bin/flex ++LEX = flex + + CXXFLAGS = -g -Wall -Wno-unused -I d:/CVS/boost/boost + # Use -Wno-unused because lex.yy.c contains some unused labels and functions +Only in clamp_053_new/: test.cc Added: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash (rev 0) +++ ydb/trunk/tools/test.bash 2008-12-11 19:16:31 UTC (rev 1098) @@ -0,0 +1,274 @@ +#!/usr/bin/env bash + +set -o errexit -o nounset +if [[ "$1" != node-init-setup ]] +then . common.bash || exit 1 +fi + +script="$(basename "$0")" + +tagssh() { + ssh "$@" 2>&1 | sed "s/^/$1: /" +} + +check-remote() { + if [[ ${force:-asdf} != asdf && `hostname` == yang-xps410 ]] + then echo 'running a remote command on your pc!' 1>&2 && exit 1 + fi +} + +node-init-setup() { + check-remote + mkdir -p work + cd work + if [[ ! -d assorted ]] + then svn -q co https://assorted.svn.sourceforge.net/svnroot/assorted/ + fi + cd assorted/configs/trunk/ + ./bootstrap.bash local +} + +node-setup-lzz() { + check-remote + mkdir -p ~/.local/pkg/lzz/bin/ + mv /tmp/lzz.static ~/.local/pkg/lzz/bin/lzz + refresh-local +} + +node-setup-st() { + check-remote + mkdir -p ~/.local/pkg/st/{include,lib}/ + cd /tmp/ + tar xzf st-1.8.tar.gz + cd st-1.8 + CONFIG_GUESS_PATH=/tmp make -s default + make -C extensions -s + cp -f obj/st.h ~/.local/pkg/st/include/ + cp -f extensions/stx.h ~/.local/pkg/st/include/ + cp -f obj/{libst.{a,so*},libstx.a} ~/.local/pkg/st/lib/ + refresh-local +} + +node-setup-pb() { + check-remote + toast --quiet arm /tmp/protobuf-2.0.2.tar.bz2 +} + +node-setup-boost() { + check-remote + cd /tmp/ + tar xjf /tmp/boost_1_37_0.tar.bz2 + cd boost_1_37_0/ + ./configure --prefix=$HOME/.local/pkg/boost-1.37.0 + make -s install + ln -s ~/.local/pkg/boost-1.37.0/include/boost-1_37/boost/ ~/.local/pkg/boost-1.37.0/include/ + refresh-local +} + +node-setup-m4() { + check-remote + toast --quiet arm 'http://ftp.gnu.org/gnu/m4/m4-1.4.12.tar.bz2' +} + +node-setup-bison() { + check-remote + toast --quiet arm 'http://ftp.gnu.org/gnu/bison/bison-2.4.tar.bz2' +} + +node-setup-flex() { + check-remote + toast --quiet arm 'http://prdownloads.sourceforge.net/flex/flex-2.5.35.tar.bz2' +} + +node-setup-clamp() { + check-remote + cd /tmp/ + tar xzf clamp_053_src.tar.gz + cd clamp_053/ + chmod u+w * + patch -p1 < /tmp/clamp.patch + make -s clamp + mkdir -p ~/.local/pkg/clamp/bin/ + mv clamp ~/.local/pkg/clamp/bin/ + refresh-local +} + +node-setup-ydb-1() { + check-remote + if [[ ! -L ~/ydb ]] + then ln -s ~/work/assorted/ydb/trunk ~/ydb + fi + if [[ ! -L ~/ccom ]] + then ln -s ~/work/assorted/cpp-commons/trunk ~/ccom + fi +} + +node-setup-ydb-2() { + check-remote + cd ~/ccom/ + ./setup.bash -d -p ~/.local/pkg/cpp-commons + refresh-local + cd ~/ydb/src + make clean + make WTF= +} + +remote() { + local host="$1" + shift + scp -q "$(dirname "$0")/$script" "$host:" + tagssh "$host" "./$script" "$@" +} + +allhosts() { + if [[ ${host:-} ]] ; then + echo $host + elif [[ ${range:-} ]] ; then + seq $range | sed 's/^/farm/; s/$/.csail/' + else + cat << EOF +farm1.csail +farm2.csail +farm3.csail +farm4.csail +farm5.csail +farm6.csail +farm7.csail +farm8.csail +farm9.csail +farm10.csail +farm11.csail +farm12.csail +farm13.csail +farm14.csail +EOF + fi | xargs ${xargs--P9} -I^ "$@" +} + +allssh() { + allhosts ssh ^ "set -o errexit -o nounset; $@" +} + +allscp() { + allhosts scp -q "$@" +} + +allremote() { + allhosts "./$script" remote ^ "$@" +} + +init-setup() { + allremote node-init-setup +} + +get-deps() { + xargs -I_ -P9 wget -nv -P /tmp/ _ << EOF +http://www.lazycplusplus.com/lzz_2_8_0_linux.zip +http://downloads.sourceforge.net/state-threads/st-1.8.tar.gz +http://protobuf.googlecode.com/files/protobuf-2.0.2.tar.bz2 +http://downloads.sourceforge.net/boost/boost_1_37_0.tar.bz2 +http://home.clara.net/raoulgough/clamp/clamp_053_src.tar.gz +EOF + cd /tmp/ + unzip lzz_2_8_0_linux.zip lzz.static +} + +setup-deps() { + allscp \ + /usr/share/misc/config.guess \ + /tmp/lzz.static \ + /tmp/st-1.8.tar.gz \ + /tmp/protobuf-2.0.2.tar.bz2 \ + /tmp/boost_1_37_0.tar.bz2 \ + clamp.patch \ + ^:/tmp/ + + allremote node-setup-lzz + allremote node-setup-st + allremote node-setup-pb + allremote node-setup-boost + allremote node-setup-m4 + allremote node-setup-bison + allremote node-setup-clamp +} + +setup-ydb() { + allremote node-setup-ydb-1 + rm -r /tmp/{ydb,ccom}-src/ + svn export ~/ydb/src /tmp/ydb-src/ + svn export ~/ccom/src /tmp/ccom-src/ + allscp -r /tmp/ydb-src/* ^:ydb/src/ + allscp -r /tmp/ccom-src/* ^:ccom/src/ + allremote node-setup-ydb-2 +} + +full() { + init-setup + setup-deps + setup-ydb +} + +hostinfos() { + xargs= allssh " + echo + hostname + echo ===== + fgrep 'model name' /proc/cpuinfo + head -2 /proc/meminfo + " +} + +hosttops() { + xargs= allssh " + echo + hostname + echo ===== + top -b -n 1 | fgrep -A3 COMMAND + " +} + +run-helper() { + tagssh $1 "ydb/src/ydb -l" & + sleep .1 + tagssh $2 "ydb/src/ydb -H $1" & + tagssh $3 "ydb/src/ydb -H $1" & + sleep ${wait:-10} + tagssh $4 "ydb/src/ydb -H $1" & + read + kill %1 +} + +range2args() { + "$@" $(seq $range | sed 's/^/farm/; s/$/.csail/') +} + +run() { + range2args run-helper +} + +stop-helper() { + tagssh $1 'pkill ydb' +} + +stop() { + range2args stop-helper +} + +kill-helper() { + tagssh $1 'pkill ydb' + tagssh $2 'pkill ydb' + tagssh $3 'pkill ydb' + tagssh $4 'pkill ydb' +} + +kill() { + range2args kill-helper +} + +#plot() { +# for i in "$@" ; do +# sed "s/farm$i.csail//" < "$i" +# done +#} + +"$@" Property changes on: ydb/trunk/tools/test.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-08 07:57:34
|
Revision: 1097 http://assorted.svn.sourceforge.net/assorted/?rev=1097&view=rev Author: yangzhang Date: 2008-12-08 07:57:26 +0000 (Mon, 08 Dec 2008) Log Message: ----------- removed bootstrap; added bootstrap-cabal Modified Paths: -------------- shell-tools/trunk/README Added Paths: ----------- shell-tools/trunk/src/bootstrap-cabal.bash Removed Paths: ------------- shell-tools/trunk/src/bootstrap.bash Modified: shell-tools/trunk/README =================================================================== --- shell-tools/trunk/README 2008-12-07 08:18:08 UTC (rev 1096) +++ shell-tools/trunk/README 2008-12-08 07:57:26 UTC (rev 1097) @@ -98,6 +98,9 @@ `refresh-links` Kept solely as a reference, since it uses a bash bunch of interesting pipe redirection. + +`bootstrap- Fetch, build, and install cabal. bash, ghc6 +cabal` -------------------------------------------------------------------------------- [HSH]: http://software.complete.org/hsh/ Added: shell-tools/trunk/src/bootstrap-cabal.bash =================================================================== --- shell-tools/trunk/src/bootstrap-cabal.bash (rev 0) +++ shell-tools/trunk/src/bootstrap-cabal.bash 2008-12-08 07:57:26 UTC (rev 1097) @@ -0,0 +1,56 @@ +#!/usr/bin/env bash + +# From +# <http://ghcmutterings.wordpress.com/2008/11/10/bootstrapping-cabal-install/> + +set -e + +for i in $TMPDIR /tmp c:/temp; do + if test -d $i; then + dir=$i + break + fi + dir=$HOME +done + +cd $dir +wget http://hackage.haskell.org/packages/archive/zlib/0.5.0.0/zlib-0.5.0.0.tar.gz +tar xzf zlib-0.5.0.0.tar.gz +cd zlib-0.5.0.0 +ghc --make Setup +./Setup configure --user +./Setup build +./Setup register --inplace --user + +cd $dir +wget http://hackage.haskell.org/packages/archive/HTTP/3001.1.4/HTTP-3001.1.4.tar.gz +tar xzf HTTP-3001.1.4.tar.gz +cd HTTP-3001.1.4 +ghc --make Setup +./Setup configure --user +./Setup build +./Setup register --inplace --user + +cd $dir +wget http://hackage.haskell.org/packages/archive/cabal-install/0.6.0/cabal-install-0.6.0.tar.gz +tar xzf cabal-install-0.6.0.tar.gz +cd cabal-install-0.6.0 +ghc --make Setup +./Setup configure --user +./Setup build + +# Don't need these libs any more... +ghc-pkg unregister zlib-0.5.0.0 +ghc-pkg unregister HTTP-3001.1.4 + +# Now use cabal-install to install itself and its dependencies +./dist/build/cabal/cabal update +./dist/build/cabal/cabal install cabal-install + +# Clean up... +cd $dir +rm -rf zlib-0.5.0.0* +rm -rf HTTP-3001.1.4* +rm -rf cabal-install-0.6.0* + +echo "ALL DONE!" Deleted: shell-tools/trunk/src/bootstrap.bash =================================================================== --- shell-tools/trunk/src/bootstrap.bash 2008-12-07 08:18:08 UTC (rev 1096) +++ shell-tools/trunk/src/bootstrap.bash 2008-12-08 07:57:26 UTC (rev 1097) @@ -1,57 +0,0 @@ -#!/usr/bin/env bash -# vim:et:sw=4 - -# Sets up the current user account for synchronizing with your home directory. - -set -o errexit - -link_dir=~/local -pkg_dir=~/local -do_unison=1 -usage="$cmd_name [OPTIONS] -Options: - -g use /opt/links as link dir and /opt as crawl dir - -h display this help message - -l dir set the link dir - -p dir set the pkg dir - -r set the ssh target that serves as the central repo - -u do not deploy or use unison" - -while getopts 'ghl:p:r:u' opt ; do - case "$opt" in - g ) link_dir=/opt/links ; pkg_dir=/opt ;; - h ) echo "$usage" ; exit ;; - l ) link_dir="$OPTARG" ;; - p ) pkg_dir="$OPTARG" ;; - r ) home_ssh="$OPTARG" ;; - u ) do_unison= ;; - \? ) errexit ;; - esac -done - -if [[ $do_unison ]] ; then -# if [[ ! -f common.bash ]] ; then -# pushd "$( dirname "$0" )" -# scp "$home_ssh:local/assorted/src/released/bash/common.bash" . -# . common.bash -# rm common.bash -# popd -# else -# . common.bash -# fi - mkdir -p "$pkg_dir/unison/bin" ~/.unison - wget -O "$pkg_dir/unison/bin/unison" 'http://www.cis.upenn.edu/~bcpierce/unison/download/stable/unison-2.9.1/unison.linux-static-textui' - chmod +x "$pkg_dir/unison/bin/unison" - scp -q "$home_ssh:.unison/common.prf .unison/apps.prf .unison/home.prf" ~/.unison - ln -s ~/.unison/home.prf ~/.unison/default.prf 2> /dev/null || true - "$pkg_dir/unison/bin/unison" -root "$HOME" -root "ssh://$home_ssh/" -logfile "$HOME/.unison/unison.log" - chmod +x "$pkg_dir/assorted/src/released/bash/"*.bash - ln -s ~/vimfiles ~/.vim || true -else - mkdir -p "$pkg_dir" - scp -C -o CompressionLevel=9 -r "$home_ssh:local/assorted" "$pkg_dir" -fi - -export PATH="$pkg_dir/assorted/src/released/bash/:$PATH" -refresh-assorted.bash "$pkg_dir/assorted" -refresh-links.bash -l "$link_dir" "$pkg_dir" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-07 08:18:15
|
Revision: 1096 http://assorted.svn.sourceforge.net/assorted/?rev=1096&view=rev Author: yangzhang Date: 2008-12-07 08:18:08 +0000 (Sun, 07 Dec 2008) Log Message: ----------- added finally Modified Paths: -------------- cpp-commons/trunk/src/commons/boost/delegates.h Modified: cpp-commons/trunk/src/commons/boost/delegates.h =================================================================== --- cpp-commons/trunk/src/commons/boost/delegates.h 2008-12-06 23:04:40 UTC (rev 1095) +++ cpp-commons/trunk/src/commons/boost/delegates.h 2008-12-07 08:18:08 UTC (rev 1096) @@ -48,6 +48,20 @@ // catch (std::exception &ex) { return std::exception(ex); } // } + /** + * A class that takes a functor and calls it on destruction, effectively + * giving you a `finally` clause. Particularly useful with clamp. + */ + //template<typename T> + typedef function0<void> T; + class finally { + public: + finally(T f): f_(f) {} + ~finally() { f_(); } + private: + T f_; + }; + } #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-06 23:04:48
|
Revision: 1095 http://assorted.svn.sourceforge.net/assorted/?rev=1095&view=rev Author: yangzhang Date: 2008-12-06 23:04:40 +0000 (Sat, 06 Dec 2008) Log Message: ----------- new style Modified Paths: -------------- personal-site/trunk/static/plain.css Modified: personal-site/trunk/static/plain.css =================================================================== --- personal-site/trunk/static/plain.css 2008-12-04 21:54:06 UTC (rev 1094) +++ personal-site/trunk/static/plain.css 2008-12-06 23:04:40 UTC (rev 1095) @@ -6,18 +6,20 @@ } body { - margin:1em 5% 1em 5%; - padding:0; - background-color: white; - color: black; - font-family: Georgia, Verdana, sans-serif; - font-size: medium; - line-height: 1.3em; - color: #333; + margin:1em 5% 1em 5%; + padding:0; + background-color: white; + color: black; + font-family: arial; + font-size: 10pt; + /*font-size: medium;*/ + line-height: 1.3em; + color: #333; + margin-left: auto; + margin-right: auto; + min-width: 200px; + max-width: 600px; } -/*body { - margin: 1em 5% 1em 5%; -}*/ a { color: #0000bb; @@ -41,6 +43,8 @@ h1, h2, h3, h4, h5, h6, h1 a, h2 a { color: gray; /* #527bbd; */ + color: gray; + font-size: 18pt; font-weight: normal; font-family: arial; text-transform: uppercase; @@ -212,7 +216,9 @@ } ul, ol { - list-style-position: outside; + list-style-position: outside; + margin-left: 1.5em; + padding-left: 0px; } ol.olist2 { list-style-type: lower-alpha; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-04 22:22:20
|
Revision: 1094 http://assorted.svn.sourceforge.net/assorted/?rev=1094&view=rev Author: yangzhang Date: 2008-12-04 21:54:06 +0000 (Thu, 04 Dec 2008) Log Message: ----------- added zppw; moved refresh-* to common.bash; added WTF option to zpp Modified Paths: -------------- shell-tools/trunk/src/bash-commons/bashrc.bash shell-tools/trunk/src/bash-commons/common.bash Modified: shell-tools/trunk/src/bash-commons/bashrc.bash =================================================================== --- shell-tools/trunk/src/bash-commons/bashrc.bash 2008-12-04 10:24:36 UTC (rev 1093) +++ shell-tools/trunk/src/bash-commons/bashrc.bash 2008-12-04 21:54:06 UTC (rev 1094) @@ -318,18 +318,6 @@ apg -M SNCL -n 3 -m 8 -x 8 } -function refresh-local { - ( mkdir -p "$USER_PREFIX" && - cd "$USER_PKG" && - stow -t "$USER_PREFIX" "$@" * ) -} - -function refresh-opt { - ( sudo -u pkg mkdir -p "$GLOBAL_PREFIX" && - cd "$GLOBAL_PKG" && - sudo -u pkg stow -t "$GLOBAL_PREFIX" "$@" !(armed) ) -} - # TODO fix function g { #grep --color=always -r "$@" | sed "$( echo -e "s/\([^:]*\):/$bright_blue\1$normal_color:/" )" | less -F @@ -638,9 +626,19 @@ zpp() { local file="$1" shift - wtf g++ -Wall -g3 -o "${file%.*}" "$file" "$@" + ${WTF-wtf} g++ -Wall -g3 -o "${file%.*}" "$file" "$@" } +zppw() { + local file="$1" + shift + zpp "$file" -Wextra -Woverloaded-virtual -Wconversion \ + -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ + -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ + -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ + -Winline -Wsynth "$@" +} + #function set_title() { # if [ $# -eq 0 ] ; then # eval set -- "$PWD" Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2008-12-04 10:24:36 UTC (rev 1093) +++ shell-tools/trunk/src/bash-commons/common.bash 2008-12-04 21:54:06 UTC (rev 1094) @@ -7,6 +7,7 @@ if [[ "${1:-}" != '-n' ]] ; then set -o errexit set -o nounset + shopt -s extglob fi if [[ "${xtrace:-}" ]] ; then @@ -473,6 +474,18 @@ find "${1:-.}" -type d -empty -print0 | xargs -0r rmdir -p } +refresh-local() { + ( mkdir -p "$USER_PREFIX" && + cd "$USER_PKG" && + stow -t "$USER_PREFIX" "$@" * ) +} + +refresh-opt() { + ( sudo -u pkg mkdir -p "$GLOBAL_PREFIX" && + cd "$GLOBAL_PKG" && + sudo -u pkg stow -t "$GLOBAL_PREFIX" "$@" !(armed) ) +} + cabal-install() { for i in "${@:-}" ; do pushd "$i" && This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-04 10:24:43
|
Revision: 1093 http://assorted.svn.sourceforge.net/assorted/?rev=1093&view=rev Author: yangzhang Date: 2008-12-04 10:24:36 +0000 (Thu, 04 Dec 2008) Log Message: ----------- - added command-line options - added a "higher-level" readmsg() that relies on RVO - fixed and lifted random number generator - cleaned up includes, RAII - updated doc Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/README 2008-12-04 10:24:36 UTC (rev 1093) @@ -10,12 +10,11 @@ Currently, the only recovery implemented mechanism is to have the first-joining replica serialize its entire database state and send that to the joining node. -If you start a system of $n$ replicas, then the leader will wait for $n-1$ of -them to join before it starts issuing transactions. (Think of $n-1$ as the -minimum number of replicas the system requires before it is willing to process -transactions.) Then when replica $n$ joins, it will need to catch up to the -current state of the system, and it will do so by contacting that first replica -and receiving a complete dump of its DB state. +If you start a system with a minimum of $n$ replicas, then the leader will wait +for that many to them to join before it starts issuing transactions. Then when +replica $n+1$ joins, it will need to catch up to the current state of the +system; it will do so by contacting the first-joining replica and receiving a +complete dump of its DB state. The leader will report the current txn seqno to the joiner, and start streaming txns beyond that seqno to the joiner, which the joiner will push onto its @@ -45,22 +44,23 @@ Usage ----- -To start a leader to manage 3 replicas, run: +To start a leader, run: - ./ydb 3 + ./ydb -l -This will listen on port 7654. Then to start the first two replicas, run: +Then to start the first two replicas, run: - ./ydb localhost 7654 7655 - ./ydb localhost 7654 7656 + ./ydb -p 7655 + ./ydb -p 7656 -This means "connect to the leader at localhost:7654, and listen on port 7655." -The replicas have to listen for connections from other replicas (namely the -recovering replica). +This means "connect to the leader at localhost:7654, and listen on port +7655/7656." The replicas have to listen for connections from other replicas +(namely the recovering replica). The leader waits for the minimum number +(default of 2) of replicas to join before beginning to issue transactions. The recovering replica then joins: - ./ydb localhost 7654 7657 + ./ydb -p 7657 It will connect to the first replica (on port 7655) and receive a DB dump from it. Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/src/Makefile 2008-12-04 10:24:36 UTC (rev 1093) @@ -19,8 +19,9 @@ SRCS := $(GENSRCS) OBJS := $(GENOBJS) -LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf -CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ +LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf \ + -lboost_program_options-gcc43-mt +CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion -Wno-conversion -Wno-ignored-qualifiers \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-04 10:24:21 UTC (rev 1092) +++ ydb/trunk/src/main.lzz 2008-12-04 10:24:36 UTC (rev 1093) @@ -1,22 +1,23 @@ #hdr #include <boost/bind.hpp> #include <boost/foreach.hpp> -#include <boost/lambda/lambda.hpp> +#include <boost/program_options.hpp> #include <boost/scoped_array.hpp> #include <commons/nullptr.h> +#include <commons/rand.h> #include <commons/st/st.h> #include <commons/time.h> -#include <csignal> +#include <csignal> // sigaction etc. #include <cstdio> -#include <cstdlib> -#include <cstring> +#include <cstring> // strsignal #include <iostream> #include <fstream> #include <map> +#include <netinet/in.h> // in_addr etc. #include <set> -#include <sstream> -#include <sys/types.h> -#include <unistd.h> +#include <sys/socket.h> // getpeername +#include <sys/types.h> // ssize_t +#include <unistd.h> // pipe, write #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH @@ -26,15 +27,11 @@ #end typedef pair<int, int> pii; - -// Why does just timeout require the `extern`? -extern const st_utime_t timeout = 1000000; -const int chkpt = 10000; -const bool verbose = true; -const bool yield_during_recovery = false; -const bool yield_during_catch_up = false; -const bool use_epoll = false; -const uint16_t base_port = 7654; +st_utime_t timeout; +int chkpt; +bool verbose; +bool yield_during_build_up; +bool yield_during_catch_up; st_intr_bool stop_hub, kill_hub; /** @@ -110,11 +107,11 @@ /** * RAII to close all contained netfds. */ -class st_closing_all +class st_closing_all_infos { public: - st_closing_all(const vector<replica_info>& rs) : rs_(rs) {} - ~st_closing_all() { + st_closing_all_infos(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all_infos() { foreach (replica_info r, rs_) check0x(st_netfd_close(r.fd())); } @@ -123,6 +120,21 @@ }; /** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<st_netfd_t>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (st_netfd_t r, rs_) + check0x(st_netfd_close(r)); + } + private: + const vector<st_netfd_t> &rs_; +}; + +/** * RAII for dumping the final state of the DB to a file on disk. */ class dump_state @@ -131,10 +143,9 @@ dump_state(const map<int, int> &map, const int &seqno) : map_(map), seqno_(seqno) {} ~dump_state() { - stringstream fname; - fname << "/tmp/ydb" << getpid(); - cout << "dumping DB state (" << seqno_ << ") to " << fname.str() << endl; - ofstream of(fname.str().c_str()); + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + cout << "dumping DB state (" << seqno_ << ") to " << fname << endl; + ofstream of(fname.c_str()); of << "seqno: " << seqno_ << endl; foreach (const pii &p, map_) { of << p.first << ": " << p.second << endl; @@ -209,10 +220,18 @@ } } -inline int -rand32(int max = RAND_MAX) +/** + * Same as the above readmsg(), but returns an internally constructed message. + * This is a "higher-level" readmsg() that relies on return-value optimization + * for avoiding unnecessary copies. + */ +template <typename T> +T +readmsg(st_netfd_t src, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { - return static_cast<int>( random() / ( RAND_MAX / max ) ); + T msg; + readmsg(src, msg, timeout); + return msg; } /** @@ -236,10 +255,10 @@ // Generate a random transaction. Txn txn; txn.set_seqno(seqno++); - int count = rand32(5) + 1; + int count = randint(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); + int rtype = randint(3), rkey = randint(), rvalue = randint(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); @@ -339,7 +358,8 @@ } if (!caught_up && res.caught_up()) { caught_up = true; - cout << "recovering node caught up; took " << current_time_millis() - start_time << "ms" << endl; + cout << "recovering node caught up; took " + << current_time_millis() - start_time << "ms" << endl; } if (res.seqno() % chkpt == 0) { if (verbose) @@ -397,26 +417,25 @@ * Run the leader. */ void -run_leader(int nreps) +run_leader(int minreps, uint16_t leader_port) { cout << "starting as leader" << endl; + cout << "waiting for at least " << minreps << " replicas to join" << endl; // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(base_port); + st_netfd_t listener = st_tcp_listen(leader_port); st_closing close_listener(listener); // TODO rename these - int min_reps = nreps - 1; vector<replica_info> replicas; - st_closing_all close_replicas(replicas); - for (int i = 0; i < min_reps; i++) { + st_closing_all_infos close_replicas(replicas); + for (int i = 0; i < minreps; i++) { st_netfd_t fd; { st_intr intr(stop_hub); fd = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join; - readmsg(fd, join); + Join join = readmsg<Join>(fd); replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); } @@ -456,8 +475,7 @@ joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } - Join join; - readmsg(joiner, join); + Join join = readmsg<Join>(joiner); cout << "setting seqno to " << seqno << endl; init.set_txnseqno(seqno); sendmsg(joiner, init); @@ -473,7 +491,7 @@ * Run a replica. */ void -run_replica(char *leader_host, uint16_t leader_port, uint16_t listen_port) +run_replica(string leader_host, uint16_t leader_port, uint16_t listen_port) { // Initialize database state. map<int, int> map; @@ -481,28 +499,30 @@ dump_state ds(map, seqno); st_bool send_state, sent_state; - cout << "starting as replica" << endl; + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. st_netfd_t listener = st_tcp_listen(listen_port); - st_thread_t rec = my_spawn(bind(recover_joiner, listener, ref(map), - ref(seqno), ref(send_state), - ref(sent_state))); + st_joining join_rec(my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_state), + ref(sent_state)))); // Connect to the leader and join the system. - st_netfd_t leader = st_tcp_connect(leader_host, leader_port, timeout); + st_netfd_t leader = st_tcp_connect(leader_host.c_str(), leader_port, + timeout); + st_closing closing(leader); Join join; join.set_port(listen_port); sendmsg(leader, join); - Init init; - readmsg(leader, init); + Init init = readmsg<Init>(leader); uint32_t listen_host = init.yourhost(); // Display the info. cout << "got init msg with txn seqno " << init.txnseqno() << " and hosts:" << endl; vector<st_netfd_t> replicas; + st_closing_all close_replicas(replicas); for (uint16_t i = 0; i < init.node_size(); i++) { const SockAddr &sa = init.node(i); char buf[INET_ADDRSTRLEN]; @@ -520,9 +540,9 @@ // Process txns. st_channel<Txn*> backlog; - st_thread_t proc = my_spawn(bind(process_txns, leader, ref(map), ref(seqno), - ref(send_state), ref(sent_state), - ref(backlog))); + st_joining join_proc(my_spawn(bind(process_txns, leader, ref(map), + ref(seqno), ref(send_state), + ref(sent_state), ref(backlog)))); // If there's anything to recover. if (init.txnseqno() > 0) { @@ -530,12 +550,15 @@ // Read the recovery message. Recovery recovery; - readmsg(replicas[0], recovery); + { + st_intr intr(stop_hub); + readmsg(replicas[0], recovery); + } for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); if (i % chkpt == 0) { - if (yield_during_recovery) st_sleep(0); + if (yield_during_build_up) st_sleep(0); } } assert(seqno == -1 && @@ -554,13 +577,6 @@ } cout << "caught up." << endl; } - - st_join(proc); - st_join(rec); - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); - } - check0x(st_netfd_close(leader)); } int sig_pipe[2]; @@ -604,9 +620,65 @@ int main(int argc, char **argv) { + namespace po = boost::program_options; try { GOOGLE_PROTOBUF_VERIFY_VERSION; + bool is_leader, use_epoll; + int minreps; + uint16_t leader_port, listen_port; + string leader_host; + + // Parse options. + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("verbose,v", "enable periodic printing of txn processing progress") + ("epoll,e", po::bool_switch(&use_epoll), + "use epoll (select is used by default)") + ("yield-build-up", po::bool_switch(&yield_during_build_up), + "yield periodically during build-up phase of recovery") + ("yield-catch-up", po::bool_switch(&yield_during_catch_up), + "yield periodically during catch-up phase of recovery") + ("leader,l", po::bool_switch(&is_leader), + "run the leader (run replica by default)") + ("leader-host,H", + po::value<string>(&leader_host)->default_value(string("localhost")), + "hostname or address of the leader") + ("leader-port,P", + po::value<uint16_t>(&leader_port)->default_value(7654), + "port the leader listens on") + ("chkpt,c", po::value<int>(&chkpt)->default_value(10000), + "number of txns before yielding/verbose printing") + ("listen-port,p", po::value<uint16_t>(&listen_port), + "port to listen on (replicas only)") + ("timeout,t", po::value<st_utime_t>(&timeout)->default_value(1000000), + "timeout for IO operations (in microseconds)") + ("minreps,n", po::value<int>(&minreps)->default_value(2), + "minimum number of replicas the system is willing to process txns on"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + if (!is_leader && !vm.count("listen-port")) { + class parse_exception : public std::exception { + virtual const char *what() const throw() { + return "running replica requires listen port to be specified"; + } + }; + throw parse_exception(); + } + } catch (std::exception &ex) { + cerr << ex.what() << endl << endl << desc << endl; + return 1; + } + // Initialize support for ST working with asynchronous signals. check0x(pipe(sig_pipe)); struct sigaction sa; @@ -624,19 +696,11 @@ thread_eraser eraser; threads.insert(st_thread_self()); - // Parse command-line arguments. - if (argc != 2 && argc != 4) - die("leader: ydb <nreplicas>\n" - "replica: ydb <leaderhost> <leaderport> <listenport>\n"); - bool is_leader = argc == 2; - // Which role are we? if (is_leader) { - run_leader(atoi(argv[1])); + run_leader(minreps, leader_port); } else { - run_replica(argv[1], - static_cast<uint16_t>(atoi(argv[2])), - static_cast<uint16_t>(atoi(argv[3]))); + run_replica(leader_host, leader_port, listen_port); } return 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-04 10:24:24
|
Revision: 1092 http://assorted.svn.sourceforge.net/assorted/?rev=1092&view=rev Author: yangzhang Date: 2008-12-04 10:24:21 +0000 (Thu, 04 Dec 2008) Log Message: ----------- added randint() Modified Paths: -------------- cpp-commons/trunk/src/commons/rand.h Modified: cpp-commons/trunk/src/commons/rand.h =================================================================== --- cpp-commons/trunk/src/commons/rand.h 2008-12-04 08:13:56 UTC (rev 1091) +++ cpp-commons/trunk/src/commons/rand.h 2008-12-04 10:24:21 UTC (rev 1092) @@ -1,6 +1,9 @@ #ifndef COMMONS_RAND_H_ #define COMMONS_RAND_H_ +#include <cassert> +#include <cstdlib> // random, RAND_MAX + namespace commons { /** @@ -19,6 +22,86 @@ inline long draw() { return randx = randx * 1103515245 + 12345; } inline long operator()() { return abs(draw()); } }; + + /** + * Generate a random int up to but excluding max. + * \param[in] max Must be greater than one. + */ + inline int + randint(int max = RAND_MAX) + { + // My flow of thought: + // + // It seems simplest to just mod. But what if not each bit was + // identically random? What if the randomness was only a uniform spread + // over RAND_MAX, but all the integers are even (and we only ended up + // selecting the lowest bit because max = 2)? + // + // We want to solve for x: + // + // random() ./ RAND_MAX = x ./ max + // x = max * random() ./ RAND_MAX + // + // But the multiplication could overflow if RAND_MAX is near the max of + // long int (random()'s return type), so re-write: + // + // random() / (RAND_MAX / max) + // + // Now nothing overflows, but information is lost. Is that OK? No! + // If RAND_MAX = 5 and max = 3 and random() = 4, then + // + // 4 / ((5 / 3 = 1) + 1) = 4 which is >= 3! + // + // Makes sense, since: + // + // RAND_MAX / max <= RAND_MAX ./ max + // + // so + // + // random() / quotient >= random() / real_quotient + // + // More "spatious" example: + // + // (random() = 20000) / ( ( (RAND_MAX = 32768) / (max = 16637) ) = 1 ) = + // 20000 which is > max. + // + // Let's tweak that formula by incrementing the denominator so that it's + // impossible for the quotient to exceed max. + // + // random() / (RAND_MAX / max + 1) + // + // How does this fare? + // + // 4 / ((5 / 2 = 2) + 1 = 3) = 1, which is < 2 + // 4 / ((5 / 5 = 1) + 1 = 2) = 2, which is < 5, but too much less + // 32767 / ((32768 / 32760 = 1) + 1 = 2) = 16633, + // which is < 32760, but too much less + // + // Think of the problem in terms of pixel line drawing algorithms; we can + // only take steps up at fixed periods, so it's impossible to choose a + // period that will span enough of the space up to max (in the worst case, + // you'll span nearly half the space). + // + // 0 / ((5 / 4 = 1) + 1 = 2) = 0, which is < 4 + // 1 / ((5 / 4 = 1) + 1 = 2) = 0, which is < 4 + // 2 / ((5 / 4 = 1) + 1 = 2) = 1, which is < 4 + // 3 / ((5 / 4 = 1) + 1 = 2) = 1, which is < 4 + // 4 / ((5 / 4 = 1) + 1 = 2) = 2, which is < 4 + // 4 / ((5 / 5 = 1) + 1 = 2) = 2, which is < 4 + // + // random() / (RAND_MAX / (max - 1)) doesn't work either (it's even worse): + // + // 4 / (5 / (3 - 1 = 2) = 2) = 2, which is < 3 + // 4 / (5 / (5 - 1 = 4) = 1) = 4, which is < 5 + // 4 / (5 / (2 - 1 = 1) = 5) = 0, which is < 2, but too much less + // 5 / (6 / (3 - 1 = 2) = 3) = 1, which is < 3, but too much less + // + // OK, this path isn't taking us anywhere. Let's go back to the original + // formula. Sure, it exceeds max. But what if we just wrap it back around + // when it does? We can do so using %max. Success! + + return static_cast<int>( random() / ( RAND_MAX / max ) % max ); + } } #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-04 08:14:00
|
Revision: 1091 http://assorted.svn.sourceforge.net/assorted/?rev=1091&view=rev Author: yangzhang Date: 2008-12-04 08:13:56 +0000 (Thu, 04 Dec 2008) Log Message: ----------- added small useless benchmark Added Paths: ----------- sandbox/trunk/src/cc/st/bench.cc sandbox/trunk/src/cc/st/bench.mk Added: sandbox/trunk/src/cc/st/bench.cc =================================================================== --- sandbox/trunk/src/cc/st/bench.cc (rev 0) +++ sandbox/trunk/src/cc/st/bench.cc 2008-12-04 08:13:56 UTC (rev 1091) @@ -0,0 +1,37 @@ +// Results on 1.8G Core 2 Duo: +// 2892 +// 5142 +// +// ST thread creation scales linearly. But still no feel for just how +// practically expensive/cheap it is. + +#include <st.h> +#include <commons/time.h> +#include <iostream> +using namespace commons; +using namespace std; + +void *f(void *p) { return p; } + +int main() { + st_init(); + { + long long start = current_time_millis(); + for (int i = 0; i < 9999999; i++) { + st_thread_t t = st_thread_create(f, NULL, 1, 0); + st_thread_join(t, NULL); + } + cout << current_time_millis() - start << endl; + } + { + long long start = current_time_millis(); + for (int i = 0; i < 9999999; i++) { + st_thread_t t1 = st_thread_create(f, NULL, 1, 0); + st_thread_t t2 = st_thread_create(f, NULL, 1, 0); + st_thread_join(t1, NULL); + st_thread_join(t2, NULL); + } + cout << current_time_millis() - start << endl; + } + return 0; +} Added: sandbox/trunk/src/cc/st/bench.mk =================================================================== --- sandbox/trunk/src/cc/st/bench.mk (rev 0) +++ sandbox/trunk/src/cc/st/bench.mk 2008-12-04 08:13:56 UTC (rev 1091) @@ -0,0 +1,2 @@ +all: + g++ -Wall -o bench bench.cc -lst This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-04 06:40:25
|
Revision: 1090 http://assorted.svn.sourceforge.net/assorted/?rev=1090&view=rev Author: yangzhang Date: 2008-12-04 06:40:22 +0000 (Thu, 04 Dec 2008) Log Message: ----------- added demos of Boost.Program_options Added Paths: ----------- sandbox/trunk/src/cc/boost_program_options.cc sandbox/trunk/src/cc/boost_program_options_warning.cc Added: sandbox/trunk/src/cc/boost_program_options.cc =================================================================== --- sandbox/trunk/src/cc/boost_program_options.cc (rev 0) +++ sandbox/trunk/src/cc/boost_program_options.cc 2008-12-04 06:40:22 UTC (rev 1090) @@ -0,0 +1,48 @@ +#include <boost/program_options.hpp> +#include <iostream> +using namespace std; +namespace po = boost::program_options; + +int main(int argc, char **argv) { + string leader_host; + uint16_t leader_port; + bool epoll; + + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "show this help message") + ("verbose,v", "enable periodic printing of txn processing progress") + ("epoll,e", + po::bool_switch(&epoll), + "use epoll (select is used by default)") + ("leader-host,H", + po::value<string>(&leader_host)->default_value(string("localhost")), + "hostname or address of the leader") + ("leader-port,P", + po::value<uint16_t>(&leader_port)->default_value(7654), + "port the leader listens on"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + } catch (exception &ex) { + cerr << ex.what() << endl << desc << endl; + return 1; + } + po::notify(vm); + + if (vm.count("help")) { + cout << desc << endl; + return 0; + } + + if (vm.count("verbose")) { + cout << "verbose" << endl; + } + if (epoll) { + cout << "epoll" << endl; + } + cout << "leader host " << leader_host << endl; + cout << "leader port " << leader_port << endl; + return 0; +} Added: sandbox/trunk/src/cc/boost_program_options_warning.cc =================================================================== --- sandbox/trunk/src/cc/boost_program_options_warning.cc (rev 0) +++ sandbox/trunk/src/cc/boost_program_options_warning.cc 2008-12-04 06:40:22 UTC (rev 1090) @@ -0,0 +1,10 @@ +// https://svn.boost.org/trac/boost/ticket/2562 +// With -Wignored-qualifiers I get: warning: type qualifiers ignored on function return type +#include <boost/program_options.hpp> +namespace po = boost::program_options; +int main() { + int x; + po::options_description desc(""); + desc.add_options()("x,x", po::value<int>(&x)->default_value(2), "x"); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-03 18:52:46
|
Revision: 1089 http://assorted.svn.sourceforge.net/assorted/?rev=1089&view=rev Author: yangzhang Date: 2008-12-03 18:52:39 +0000 (Wed, 03 Dec 2008) Log Message: ----------- added syntax files: thrift, proto, pandoc, javafx Modified Paths: -------------- configs/trunk/src/vim/plugin/_yang.vim Added Paths: ----------- configs/trunk/src/vim/ftplugin/proto.vim configs/trunk/src/vim/syntax/fx.vim configs/trunk/src/vim/syntax/pdc.vim configs/trunk/src/vim/syntax/proto.vim configs/trunk/src/vim/syntax/thrift.vim Added: configs/trunk/src/vim/ftplugin/proto.vim =================================================================== --- configs/trunk/src/vim/ftplugin/proto.vim (rev 0) +++ configs/trunk/src/vim/ftplugin/proto.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -0,0 +1,4 @@ +augroup filetype + au BufRead,BufNewFile *.proto setfiletype proto +augroup end + Modified: configs/trunk/src/vim/plugin/_yang.vim =================================================================== --- configs/trunk/src/vim/plugin/_yang.vim 2008-12-03 18:39:47 UTC (rev 1088) +++ configs/trunk/src/vim/plugin/_yang.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -186,6 +186,12 @@ au BufWritePost *.bin set nomod | endif augroup END +augroup filetype + au BufRead,BufNewFile *.proto setf proto + au BufRead,BufNewFile *.thrift set ft=thrift + au BufRead,BufNewFile *.pandoc setf pdc +augroup end + au BufNewFile,BufRead *.fx setf fx " Java/Eclim "au BufNewFile,BufRead *.java iunmap <tab> Added: configs/trunk/src/vim/syntax/fx.vim =================================================================== --- configs/trunk/src/vim/syntax/fx.vim (rev 0) +++ configs/trunk/src/vim/syntax/fx.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -0,0 +1,90 @@ +" Vim syntax file +" Language: JavaFX Script +" Maintainer: Yang Zhang <com.gmail@yaaang> +" URL: http://assorted.sf.net/jfx-vim +" Last Change: 2007 Jun 03 + +" Quit when a syntax file was already loaded +if version < 600 + syntax clear +elseif exists("b:current_syntax") + finish +endif + +syn case match +syn sync minlines=50 + +" most JFX keywords +syn keyword jfxKeyword after as before bind class else extends first for foreach from if in into last new on private protected return select switch super then trigger var where while insert delete +syn keyword jfxImport import nextgroup=scalaFqn skipwhite +syn match jfxFqn "\<[._$a-zA-Z0-9,*]*" contained nextgroup=jfxFqnSet + +" booleans +syn keyword jfxBoolean true false + +" null +syn keyword jfxNull null + +" package and import statements +syn keyword jfxPackage package nextgroup=jfxFqn skipwhite +syn keyword jfxImport import nextgroup=jfxFqn skipwhite +syn match jfxFqn "\<[._$a-zA-Z0-9,]*" contained + +"" type declarations in val/var/def +syn match jfxType ":\s*[._$a-zA-Z0-9]\+[+*?]\?" contained +":\s*\(=>\s*\)\?[._$a-zA-Z0-9]\+\(\[[^]]*\]\+\)\?\(\s*\(<:\|>:\|#\|=>\)\s*[._$a-zA-Z0-9]\+\(\[[^]]*\]\+\)*\)*"ms=s+1 contained + +" definitions +syn keyword jfxDef function operation nextgroup=jfxDefName skipwhite +syn keyword jfxVar var nextgroup=jfxVarName skipwhite +syn keyword jfxClass class nextgroup=jfxClassName skipwhite +syn keyword jfxAttribute attribute nextgroup=jfxAttributeName skipwhite +syn match jfxAttributeName "[^ =:;([]\+" contained nextgroup=jfxType skipwhite +syn match jfxDefName "[^ =:;([]\+" contained nextgroup=jfxDefArgs nextgroup=jfxType skipwhite +syn match jfxVarName "[^ =:;([]\+" contained nextgroup=jfxType skipwhite +syn match jfxClassName "[^ =:;(\[]\+" contained skipwhite +syn region jfxDefArgs start="(" end=")" contained contains=jfxDefArg skipwhite +" TODO fixme +"syn match jfxDefArg "[^ =:;([]\+" contained nextgroup=jfxType skipwhite + +" comments +syn match jfxTodo "[tT][oO][dD][oO]\|[xX][xX][xX]" contained +syn match jfxLineComment "//.*" contains=jfxTodo +syn region jfxComment start="/\*" end="\*/" contains=jfxTodo + +" string literals with escapes +syn region jfxString start="\"" skip="\\\"" end="\"" contains=jfxStringEscape " TODO end \n or not? +syn match jfxStringEscape "\\u[0-9a-fA-F]\{4}" contained +syn match jfxStringEscape "\\[nrfvb\\\"]" contained +syn match jfxString "'[_a-zA-Z][_a-zA-Z0-9]*\>" +syn match jfxString "'[^'\\]'\|'\\.'" + +" number literals +syn match jfxNumber "\<\(0[0-7]*\|0[xX]\x\+\|\d\+\)[lL]\=\>" +syn match jfxNumber "\(\<\d\+\.\d*\|\.\d\+\)\([eE][-+]\=\d\+\)\=[fFdD]\=" +syn match jfxNumber "\<\d\+[eE][-+]\=\d\+[fFdD]\=\>" +syn match jfxNumber "\<\d\+\([eE][-+]\=\d\+\)\=[fFdD]\>" + +syn sync fromstart + +hi link jfxAttribute StorageClass +hi link jfxKeyword Keyword +hi link jfxPackage Include +hi link jfxImport Include +hi link jfxBoolean Boolean +hi link jfxNull Constant +hi link jfxNumber Number +hi link jfxString String +hi link jfxStringEscape Special +hi link jfxComment Comment +hi link jfxLineComment Comment +hi link jfxTodo Todo +hi link jfxDef Keyword +hi link jfxVar Keyword +hi link jfxClass Keyword +hi link jfxDefName Function +hi link jfxDefSpecializer Function +hi link jfxClassName Special +hi link jfxType Type + +let b:current_syntax = "jfx" Added: configs/trunk/src/vim/syntax/pdc.vim =================================================================== --- configs/trunk/src/vim/syntax/pdc.vim (rev 0) +++ configs/trunk/src/vim/syntax/pdc.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -0,0 +1,330 @@ +" Vim syntax file +" Language: Pandoc (superset of Markdown) +" Maintainer: Jeremy Schultz <ta...@gm...> +" URL: +" Version: 2 +" Changes: 2008-11-04 +" - Fixed an issue with Block elements (header) not being highlighted when +" placed on the first or second line of the file +" - Fixed multi line HTML comment block +" - Fixed lowercase list items +" - Fixed list items gobbling to many empty lines +" - Added highlight support to identify newline (2 spaces) +" - Fixed HTML highlight, ignore if the first character in the +" angle brackets is not a letter +" - Fixed Emphasis highlighting when it contained multiple +" spaces +" Remark: Uses HTML and TeX syntax file + +if version < 600 + syntax clear +elseif exists("b:current_syntax") + finish +endif + +syn spell toplevel +syn case ignore +syn sync linebreaks=1 + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Set embedded HTML highlighting +syn include @HTML syntax/html.vim +syn match pdcHTML /<\a[^>]\+>/ contains=@HTML + +" Support HTML multi line comments +syn region pdcHTMLComment start=/<!--/ end=/-->/ + + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Set embedded LaTex (pandox extension) highlighting +" Unset current_syntax so the 2nd include will work +unlet b:current_syntax +syn include @LATEX syntax/tex.vim + +" Single Tex command +syn match pdcLatex /\\\w\+{[^}]\+}/ contains=@LATEX + +" Tex Block (begin-end) +syn region pdcLatex start=/\\begin{[^}]\+}\ze/ end=/\ze\\end{[^}]\+}/ contains=@LATEX + +" Math Tex +syn match pdcLatex /$[^$]\+\$/ contains=@LATEX + + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Block Elements +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + +" Needed by other elements +syn match pdcBlankLine /\(^\s*\n\|\%^\)/ nextgroup=pdcHeader,pdcCodeBlock,pdcListItem,pdcListItem1,pdcHRule,pdcTableHeader,pdcTableMultiStart,pdcBlockquote transparent + + +""""""""""""""""""""""""""""""""""""""" +" Title Block: +syn match pandocTitleBlock /\%^\(%.*\n\)\{1,3}$/ + + +""""""""""""""""""""""""""""""""""""""" +" Headers: + +" Underlined, using == or -- +syn match pdcHeader /^.\+\n[=-]\+$/ contains=@Spell nextgroup=pdcHeader contained skipnl + +" Atx-style, Hash marks +syn region pdcHeader start="^\s*#\{1,6}[^#]*" end="\($\|#\+\)" contains=@Spell contained nextgroup=pdcHeader skipnl + + +""""""""""""""""""""""""""""""""""""""" +" Blockquotes: + +syn match pdcBlockquote /\s*>.*$/ nextgroup=pdcBlockquote,pdcBlockquote2 contained skipnl +syn match pdcBlockquote2 /[^>].*/ nextgroup=pdcBlockquote2 skipnl contained + + +""""""""""""""""""""""""""""""""""""""" +" Code Blocks: + +" Indent with at least 4 space or 1 tab +" This rule must appear for pdcListItem, or highlighting gets messed up +syn match pdcCodeBlock /\(\s\{2,}\|\t\{1,}\).*\n/ contained nextgroup=pdcCodeBlock + +" HTML code blocks, pre and code +syn match pdcCodeStartPre /<pre>/ nextgroup=pdcCodeHTMLPre skipnl transparent +syn match pdcCodeHTMLPre /.*/ contained nextgroup=pdcCodeHTMLPre,pdcCodeEndPre skipnl +syn match pdcCodeEndPre /\s*<\/pre>/ contained transparent + +" HTML code blocks, code +syn match pdcCodeStartCode /<code>/ nextgroup=pdcCodeHTMLCode skipnl transparent +syn match pdcCodeHTMLCode /.*/ contained nextgroup=pdcCodeHTMLCode,pdcCodeEndCode skipnl +syn match pdcCodeEndCode /\s*<\/code>/ contained transparent + + +""""""""""""""""""""""""""""""""""""""" +" Lists: + +" These first two rules need to be first or the highlighting will be +" incorrect + +" Continue a list on the next line +syn match pdcListCont /\s*[^-+*].*\n/ contained nextgroup=pdcListCont,pdcListItem,pdcListSkipNL transparent + +" Skip empty lines +syn match pdcListSkipNL /\s*\n/ contained nextgroup=pdcListItem,pdcListSkipNL + +" Unorder list +syn match pdcListItem /\s*[-*+]\s\+/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + +" Order list, numeric +syn match pdcListItem /\s*(\?\(\d\+\|#\)[\.)]\s\+/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + +" Order list, roman numerals (does not guarantee correct roman numerals) +syn match pdcListItem /\s*(\?[ivxlcdm]\+[\.)]\s\+/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + +" Order list, lowercase letters +syn match pdcListItem /\s*(\?\l[\.)]\s\+/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + +" Order list, uppercase letters, does not include '.' +syn match pdcListItem /\s*(\?\u[\)]\s\+/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + +" Order list, uppercase letters, special case using '.' and two or more spaces +syn match pdcListItem /\s*\u\.\([ ]\{2,}\|\t\+\)/ contained nextgroup=pdcListSkipNL,pdcListCont skipnl + + +""""""""""""""""""""""""""""""""""""""" +" Horizontal Rules: + +" 3 or more * on a line +syn match pdcHRule /\s\{0,3}\(-\s*\)\{3,}\n/ contained nextgroup=pdcHRule + +" 3 or more - on a line +syn match pdcHRule /\s\{0,3}\(\*\s*\)\{3,}\n/ contained nextgroup=pdcHRule + + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Span Elements +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + +""""""""""""""""""""""""""""""""""""""" +" Links: + +" Link Text +syn match pdcLinkText /\[\zs[^\]]*\ze\]/ contains=@Spell + +" Link ID +syn match pdcLinkID /\][ ]\{0,1}\[\zs[^\]]*\ze\]/ + +" Skip [ so we do not highlight it +syn match pdcSkip /^[ ]\{0,3}\[/ nextgroup=pdcLinkID + +" Link ID - definition +syn match pdcLinkID /[^\]]*\ze\]:/ nextgroup=pdcSkip skipwhite contained + +" Skip ]: so we do not highlight it +syn match pdcSkip /\]:/ contained nextgroup=pdcLinkURL skipwhite + +" Link URL +syn region pdcLinkURL start=/\](\zs/ end=/)/me=e-1 + +" Link URL on ID definition line +syn match pdcLinkURL /\s\+.*\s\+\ze[("']/ nextgroup=pdcLinkTitle skipwhite contained +syn match pdcLinkURL /\s*.*\s*[^)"']\s*$/ contained +syn match pdcLinkURL /\s*.*\s*[^)"']\s*\n\s*\ze[("']/ contained nextgroup=pdcLinkTitle skipwhite + +" Link URL for inline <> links +syn match pdcLinkURL /<http[^>]*>/ +syn match pdcLinkURL /<[^>]*@[^>]*.[^>]*>/ + +" Link Title +syn match pdcLinkTitle /\s*[("'].*[)"']/ contained contains=@Spell + + +""""""""""""""""""""""""""""""""""""""" +" Emphasis: + +" Using underscores +syn match pdcEmphasis / \(_\|__\)\([^_ ]\|[^_]\( [^_]\)\+\)\+\1/ contains=@Spell + +" Using Asterisks +syn match pdcEmphasis / \(\*\|\*\*\)\([^\* ]\|[^\*]\( [^\*]\)\+\)\+\1/ contains=@Spell + + +""""""""""""""""""""""""""""""""""""""" +" Inline Code: + +" Using single back ticks +syn region pdcCode start=/`/ end=/`\|^\s*$/ + +" Using double back ticks +syn region pdcCode start=/``[^`]*/ end=/``\|^\s*$/ + + +""""""""""""""""""""""""""""""""""""""" +" Images: +" Handled by link syntax + + +""""""""""""""""""""""""""""""""""""""" +" Misc: + +" Pandoc escapes all characters after a backslash +syn match NONE /\\\W/ + + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Span Elements +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + +""""""""""""""""""""""""""""""""""""""" +" Subscripts: +syn match pdcSubscript /\~\([^\~\\ ]\|\(\\ \)\)\+\~/ contains=@Spell + +""""""""""""""""""""""""""""""""""""""" +" Superscript: +syn match pdcSuperscript /\^\([^\^\\ ]\|\(\\ \)\)\+\^/ contains=@Spell + +""""""""""""""""""""""""""""""""""""""" +" Strikeout: +syn match pdcStrikeout /\~\~[^\~ ]\([^\~]\|\~ \)*\~\~/ contains=@Spell + + +""""""""""""""""""""""""""""""""""""""" +" Definitions: +syn match pdcDefinitions /:\(\t\|[ ]\{3,}\)/ nextgroup=pdcListItem,pdcCodeBlock,pdcBlockquote,pdcHRule + +""""""""""""""""""""""""""""""""""""""" +" Footnote: +syn match pdcFootnoteID /\[\^[^\]]\+\]/ nextgroup=pdcFootnoteDef + +" This does not work correctly +syn region pdcFootnoteDef start=/:/ end=/^\n\+\(\(\t\+\|[ ]\{4,}\)\S\)\@!/ contained contains=pdcFootnoteDef + +" Inline footnotes +syn region pdcFootnoteDef matchgroup=pdcFootnoteID start=/\^\[/ matchgroup=pdcFootnoteID end=/\]/ + + +""""""""""""""""""""""""""""""""""""""" +" Tables: +" +" Regular Table +syn match pdcTableHeader /\s*\w\+\(\s\+\w\+\)\+\s*\n\s*-\+\(\s\+-\+\)\+\s*\n/ contained nextgroup=pdcTableBody +syn match pdcTableBody /\s*\w\+\(\s\+\w\+\)\+\s*\n/ contained nextgroup=pdcTableBody,pdcTableCaption skipnl +syn match pdcTableCaption /\n\+\s*Table.*\n/ contained nextgroup=pdcTableCaptionCont +syn match pdcTableCaptionCont /\s*\S.\+\n/ contained nextgroup=pdcTableCaptionCont + +" Multi-line Table +syn match pdcTableMultiStart /^\s\{0,3}-\+\s*\n\ze\(\s*\w\+\(\s\+\w\+\)\+\s*\n\)\+\s*-\+\(\s\+-\+\)\+\s*\n/ contained nextgroup=pdcTableMultiHeader +syn match pdcTableMultiEnd /^\s\{0,3}-\+/ contained nextgroup=pdcTableMultiCaption skipnl +syn match pdcTableMultiHeader /\(\s*\w\+\(\s\+\w\+\)\+\s*\n\)\+\s*-\+\(\s\+-\+\)\+\s*\n/ contained nextgroup=pdcTableMultiBody +syn match pdcTableMultiBody /^\(\s\{3,}[^-]\|[^-\s]\).*$/ contained nextgroup=pdcTableMultiBody,pdcTableMultiSkipNL,pdcTableMultiEnd skipnl +syn match pdcTableMultiSkipNL /^\s*\n/ contained nextgroup=pdcTableMultiBody,pdcTableMultiEnd skipnl +syn match pdcTableMultiCaption /\n*\s*Table.*\n/ contained nextgroup=pdcTableCaptionCont + + + +""""""""""""""""""""""""""""""""""""""" +" Delimited Code Block: (added in 1.0) +syn region pdcCodeBlock matchgroup=pdcCodeStart start=/^\z(\~\{3,}\) \( {[^}]\+}\)\?/ matchgroup=pdcCodeEnd end=/^\z1\~*/ + + +""""""""""""""""""""""""""""""""""""""" +" Newline, 2 spaces at the end of line means newline +syn match pdcNewLine / $/ + + +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +" Highlight groups +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +hi link pdcHeader Title +hi link pdcBlockquote Comment +hi link pdcBlockquote2 Comment + +hi link pdcHTMLComment Comment + +hi link pdcHRule Underlined +"hi link pdcHRule Special + +hi link pdcListItem Operator +hi link pdcDefinitions Operator + +hi link pdcEmphasis Special +hi link pdcSubscript Special +hi link pdcSuperscript Special +hi link pdcStrikeout Special + +hi link pdcLinkText Underlined +hi link pdcLinkID Identifier +hi link pdcLinkURL Type +hi link pdcLinkTitle Comment + +hi link pdcFootnoteID Identifier +hi link pdcFootnoteDef Comment +hi link pandocFootnoteCont Error + +hi link pdcCodeBlock String +hi link pdcCodeHTMLPre String +hi link pdcCodeHTMLCode String +hi link pdcCode String +hi link pdcCodeStart Comment +hi link pdcCodeEnd Comment + +hi link pandocTitleBlock Comment + +hi link pdcTableMultiStart Comment +hi link pdcTableMultiEnd Comment +hi link pdcTableHeader Define +hi link pdcTableMultiHeader Define +hi link pdcTableBody Identifier +hi link pdcTableMultiBody Identifier +hi link pdcTableCaption Label +hi link pdcTableMultiCaption Label +hi link pdcTableCaptionCont Label + +hi link pdcNewLine Error + + +" For testing +hi link pdctest Error + + +let b:current_syntax = "pandoc" + Added: configs/trunk/src/vim/syntax/proto.vim =================================================================== --- configs/trunk/src/vim/syntax/proto.vim (rev 0) +++ configs/trunk/src/vim/syntax/proto.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -0,0 +1,106 @@ +" Protocol Buffers - Google's data interchange format +" Copyright 2008 Google Inc. All rights reserved. +" http://code.google.com/p/protobuf/ +" +" Redistribution and use in source and binary forms, with or without +" modification, are permitted provided that the following conditions are +" met: +" +" * Redistributions of source code must retain the above copyright +" notice, this list of conditions and the following disclaimer. +" * Redistributions in binary form must reproduce the above +" copyright notice, this list of conditions and the following disclaimer +" in the documentation and/or other materials provided with the +" distribution. +" * Neither the name of Google Inc. nor the names of its +" contributors may be used to endorse or promote products derived from +" this software without specific prior written permission. +" +" THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +" "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +" LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +" A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +" OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +" SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +" LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +" DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +" THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +" (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +" OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +" This is the Vim syntax file for Google Protocol Buffers. +" +" Usage: +" +" 1. cp proto.vim ~/.vim/syntax/ +" 2. Add the following to ~/.vimrc: +" +" augroup filetype +" au! BufRead,BufNewFile *.proto setfiletype proto +" augroup end +" +" Or just create a new file called ~/.vim/ftdetect/proto.vim with the +" previous lines on it. + +if version < 600 + syntax clear +elseif exists("b:current_syntax") + finish +endif + +syn case match + +syn keyword pbTodo contained TODO FIXME XXX +syn cluster pbCommentGrp contains=pbTodo + +syn keyword pbSyntax syntax import option +syn keyword pbStructure package message group +syn keyword pbRepeat optional required repeated +syn keyword pbDefault default +syn keyword pbExtend extend extensions to max +syn keyword pbRPC service rpc returns + +syn keyword pbType int32 int64 uint32 uint64 sint32 sint64 +syn keyword pbType fixed32 fixed64 sfixed32 sfixed64 +syn keyword pbType float double bool string bytes +syn keyword pbTypedef enum +syn keyword pbBool true false + +syn match pbInt /-\?\<\d\+\>/ +syn match pbInt /\<0[xX]\x+\>/ +syn match pbFloat /\<-\?\d*\(\.\d*\)\?/ +" TODO: .proto also supports C-style block comments; +" see /usr/share/vim/vim70/syntax/c.vim for how it's done. +syn region pbComment start="//" skip="\\$" end="$" keepend contains=@pbCommentGrp +syn region pbString start=/"/ skip=/\\"/ end=/"/ +syn region pbString start=/'/ skip=/\\'/ end=/'/ + +if version >= 508 || !exists("did_proto_syn_inits") + if version < 508 + let did_proto_syn_inits = 1 + command -nargs=+ HiLink hi link <args> + else + command -nargs=+ HiLink hi def link <args> + endif + + HiLink pbTodo Todo + + HiLink pbSyntax Include + HiLink pbStructure Structure + HiLink pbRepeat Repeat + HiLink pbDefault Keyword + HiLink pbExtend Keyword + HiLink pbRPC Keyword + HiLink pbType Type + HiLink pbTypedef Typedef + HiLink pbBool Boolean + + HiLink pbInt Number + HiLink pbFloat Float + HiLink pbComment Comment + HiLink pbString String + + delcommand HiLink +endif + +let b:current_syntax = "proto" Added: configs/trunk/src/vim/syntax/thrift.vim =================================================================== --- configs/trunk/src/vim/syntax/thrift.vim (rev 0) +++ configs/trunk/src/vim/syntax/thrift.vim 2008-12-03 18:52:39 UTC (rev 1089) @@ -0,0 +1,74 @@ +" Vim syntax file +" Language: Thrift +" Maintainer: Martin Smith <ma...@fa...> +" Last Change: $Date: $ +" Copy to ~/.vim/ +" Add to ~/.vimrc +" au BufRead,BufNewFile *.thrift set filetype=thrift +" au! Syntax thrift source ~/.vim/thrift.vim +" +" $Id: $ + +if version < 600 + syntax clear +elseif exists("b:current_syntax") + finish +endif + +" Todo +syn keyword thriftTodo TODO todo FIXME fixme XXX xxx contained + +" Comments +syn match thriftComment "#.*" contains=thriftTodo +syn region thriftComment start="/\*" end="\*/" contains=thriftTodo +syn match thriftComment "//.\{-}\(?>\|$\)\@=" + +" String +syn region thriftStringDouble matchgroup=None start=+"+ end=+"+ + +" Number +syn match thriftNumber "-\=\<\d\+\>" contained + +" Keywords +syn keyword thriftKeyword namespace +syn keyword thriftKeyword php_namespace +syn keyword thriftKeyword xsd_all xsd_optional xsd_nillable xsd_namespace xsd_attrs +syn keyword thriftKeyword include cpp_include cpp_type const optional required +syn keyword thriftBasicTypes void bool byte i16 i32 i64 double string binary +syn keyword thriftStructure map list set struct typedef exception enum throws + +" Special +syn match thriftSpecial "\d\+:" + +" Structure +syn keyword thriftStructure service async extends +"async" { return tok_async; } +"exception" { return tok_xception; } +"extends" { return tok_extends; } +"throws" { return tok_throws; } +"service" { return tok_service; } +"enum" { return tok_enum; } +"const" { return tok_const; } + +if version >= 508 || !exists("did_thrift_syn_inits") + if version < 508 + let did_thrift_syn_inits = 1 + command! -nargs=+ HiLink hi link <args> + else + command! -nargs=+ HiLink hi def link <args> + endif + + HiLink thriftComment Comment + HiLink thriftKeyword Special + HiLink thriftBasicTypes Type + HiLink thriftStructure StorageClass + HiLink thriftTodo Todo + HiLink thriftString String + HiLink thriftNumber Number + HiLink thriftSpecial Special + HiLink thriftStructure Structure + + delcommand HiLink +endif + +let b:current_syntax = "thrift" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-03 18:39:54
|
Revision: 1088 http://assorted.svn.sourceforge.net/assorted/?rev=1088&view=rev Author: yangzhang Date: 2008-12-03 18:39:47 +0000 (Wed, 03 Dec 2008) Log Message: ----------- fixed example Modified Paths: -------------- wp-easy-filter/trunk/README Modified: wp-easy-filter/trunk/README =================================================================== --- wp-easy-filter/trunk/README 2008-12-03 11:21:30 UTC (rev 1087) +++ wp-easy-filter/trunk/README 2008-12-03 18:39:47 UTC (rev 1088) @@ -4,7 +4,7 @@ This is a simple, general filter plugin for [WordPress]. You specify a mapping from tags to commands, such as: - $tag2cmd = array('pandoc' => '/usr/bin/pandoc -s --tab-stop=2'); + $tag2cmd = array('pandoc' => '/usr/bin/pandoc -S --tab-stop=2'); Then, for any posts which start with a shebang line containing that tag, as in: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-03 11:21:36
|
Revision: 1087 http://assorted.svn.sourceforge.net/assorted/?rev=1087&view=rev Author: yangzhang Date: 2008-12-03 11:21:30 +0000 (Wed, 03 Dec 2008) Log Message: ----------- updated projects; updated & published style! Modified Paths: -------------- assorted-site/trunk/index.txt assorted-site/trunk/main.css Modified: assorted-site/trunk/index.txt =================================================================== --- assorted-site/trunk/index.txt 2008-12-03 07:19:15 UTC (rev 1086) +++ assorted-site/trunk/index.txt 2008-12-03 11:21:30 UTC (rev 1087) @@ -12,11 +12,11 @@ - ZDB: object database with an emphasis on semantics (passive) - General-purpose libraries ("commons") for various languages or platforms - - [Python Commons](python-commons) (active) + - [Python Commons](python-commons) (passive) - [Scala Commons](scala-commons) (passive) - [Java Reactor](java-reactor): simple event loop for single-threaded asynchronous IO and task scheduling (done) - - [C++ Commons](cpp-commons) (passive) + - [C++ Commons](cpp-commons) (active) - Haskell Commons (hiatus) - TeX Commons (hiatus) - [Shell Tools](shell-tools): programs written in a variety of languages and @@ -61,8 +61,8 @@ ratings on [Rotten Tomatoes], sort the movies by score, and aggregate the show times for those movies based on the schedule (done) - [Google File Search](http://y_z.scripts.mit.edu/gfs/): a simple web - frontend to Google Web Search for finding files in web directory listings - (done) + frontend to Google Web Search for finding files on services like Rapidshare + and Megaupload (done) - [WordPress EasyFilter](wp-easy-filter): a small, simple-to-setup plug-in to allow you to use any custom command-line text filter on your WordPress posts (done) @@ -74,6 +74,8 @@ - Wallpaper Tools: tools for managing wallpapers as they are being rotated through (done) - Exploration, experimentation, research + - [YDB](ydb): simple memory store that serves as a research testbed for approaches + to recovery in [VOLTDB] (H-Store) (active) - TCQ Wavelets: wavelet domain stream query processing for the Data Triage project in TelegraphCQ (done) - [Hash distribution](hash-dist): for observing the distribution of hash @@ -94,7 +96,7 @@ - [Picard Plugins](http://wiki.musicbrainz.org/PicardPlugins): simple tools for [Picard Tagger] (done) - [Sharing Gateway](sharing-gateway): tools for consolidating and - re-exporting file shares (active) + re-exporting file shares (passive) - Bibliography: my pan-paper BibTeX; i.e., stalling for ZDB (passive) - Subtitle adjuster: for time-shifting SRTs (done) - Javascript Beautifier: a thin [Tamarin] wrapper for [js_beautify]. @@ -134,6 +136,7 @@ [js_beautify]: http://elfz.laacz.lv/beautify/ [Rotten Tomatoes]: http://www.rottentomatoes.com/ [Picard Tagger]: http://wiki.musicbrainz.org/PicardTagger +[VOLTDB]: http://db.cs.yale.edu/hstore/ What the statuses mean: Modified: assorted-site/trunk/main.css =================================================================== --- assorted-site/trunk/main.css 2008-12-03 07:19:15 UTC (rev 1086) +++ assorted-site/trunk/main.css 2008-12-03 11:21:30 UTC (rev 1087) @@ -5,19 +5,25 @@ */ } +pre { + border-left: 2px dotted #bbbbbb; + padding-left: 12px; +} + body { - margin:1em 5% 1em 5%; - padding:0; - background-color: white; - color: black; - font-family: Verdana, sans-serif; - font-size: medium; - line-height: 1.3em; - color: #333; + margin: 1em 5% 1em 5%; + padding: 0; + background-color: white; + color: black; + font-family: arial, sans-serif; + font-size: 10pt; + line-height: 1.3em; + color: #333; + margin-left: auto; + margin-right: auto; + min-width: 200px; + max-width: 600px; } -/*body { - margin: 1em 5% 1em 5%; -}*/ a { color: #0000bb; @@ -40,12 +46,16 @@ } h1, h2, h3, h4, h5, h6, h1 a, h2 a { + color: #999999; + color: #8099bd; color: #527bbd; font-family: sans-serif; margin-top: 1.2em; margin-bottom: 0.5em; line-height: 1.3; text-decoration: none; + font-weight: normal; + /* text-transform: uppercase; */ } h1 { @@ -56,6 +66,11 @@ padding-top: 0.5em; } +h1.title { + border-bottom: 2px double gray; + font-size: 20pt; +} + div.sectionbody { font-family: serif; margin-left: 0; @@ -70,12 +85,6 @@ margin-bottom: 0.5em; } -/* TODO: make this larger? */ -pre { - padding: 0; - margin: 0; -} - span#author { color: #527bbd; font-family: sans-serif; @@ -209,7 +218,9 @@ } ul, ol { - list-style-position: outside; + list-style-position: outside; + margin-left: 1.5em; + padding-left: 0em; } ol.olist2 { list-style-type: lower-alpha; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-03 07:19:19
|
Revision: 1086 http://assorted.svn.sourceforge.net/assorted/?rev=1086&view=rev Author: yangzhang Date: 2008-12-03 07:19:15 +0000 (Wed, 03 Dec 2008) Log Message: ----------- publishing for blog post! Modified Paths: -------------- sandbox/trunk/src/cc/copies.cc Modified: sandbox/trunk/src/cc/copies.cc =================================================================== --- sandbox/trunk/src/cc/copies.cc 2008-12-03 06:28:53 UTC (rev 1085) +++ sandbox/trunk/src/cc/copies.cc 2008-12-03 07:19:15 UTC (rev 1086) @@ -20,6 +20,7 @@ A h() { return A(); } void i(A a) { a.x = 1; } A j(A a) { cout << "j()" << endl; return a; } +A k() { A a; f(a); return a; } int main() { // A() is called once as expected. @@ -152,5 +153,22 @@ // ~A() this=0x7fff386527c0 // ~A() this=0x7fff386527d0 + // Sanity-check: k() should behave the same as g(). + { + A a = k(); + } + cout << endl; + // Prints: + // A() this=0x7fff17102280 + // ~A() this=0x7fff17102280 + // + // With -fno-elide-constructors: + // A() this=0x7fff85c43d30 + // A(A) this=0x7fff85c43d70 + // ~A() this=0x7fff85c43d30 + // A(A) this=0x7fff85c43d80 + // ~A() this=0x7fff85c43d70 + // ~A() this=0x7fff85c43d80 + return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-03 06:29:00
|
Revision: 1085 http://assorted.svn.sourceforge.net/assorted/?rev=1085&view=rev Author: yangzhang Date: 2008-12-03 06:28:53 +0000 (Wed, 03 Dec 2008) Log Message: ----------- huge dump of c++ toys Modified Paths: -------------- sandbox/trunk/src/cc/boost_lexical_cast.cc sandbox/trunk/src/cc/boost_serialization.cc sandbox/trunk/src/cc/boost_serialization.mk sandbox/trunk/src/cc/exceptions.cc sandbox/trunk/src/cc/st/basics.cc sandbox/trunk/src/cc/streams.cc Added Paths: ----------- sandbox/trunk/src/cc/boost_bind_optargs.cc sandbox/trunk/src/cc/boost_bind_refs.cc sandbox/trunk/src/cc/boost_function.cc sandbox/trunk/src/cc/boost_lambda.cc sandbox/trunk/src/cc/copies.cc sandbox/trunk/src/cc/copies2.cc sandbox/trunk/src/cc/inline.cc sandbox/trunk/src/cc/names2.cc sandbox/trunk/src/cc/protobuf.cc sandbox/trunk/src/cc/protobuf.proto sandbox/trunk/src/cc/rtti.cc sandbox/trunk/src/cc/set.cc sandbox/trunk/src/cc/st/basics.mk sandbox/trunk/src/cc/st/exceptions.cc sandbox/trunk/src/cc/st/exceptions.mk sandbox/trunk/src/cc/st/exceptions2.cc sandbox/trunk/src/cc/st/exceptions2.mk sandbox/trunk/src/cc/strings.cc sandbox/trunk/src/cc/thrift.cc sandbox/trunk/src/cc/thrift.mk sandbox/trunk/src/cc/thrift.thrift sandbox/trunk/src/cc/unwinding.cc Added: sandbox/trunk/src/cc/boost_bind_optargs.cc =================================================================== --- sandbox/trunk/src/cc/boost_bind_optargs.cc (rev 0) +++ sandbox/trunk/src/cc/boost_bind_optargs.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,16 @@ +// There doesn't seem to be a way to bind with default arguments. +#include <boost/bind.hpp> +#include <iostream> +using namespace boost; +using namespace std; +int f(int x, int y = 0) { return x + y; } +// Commented lines below don't compile. +int main() { + cout << bind(f, 1, 2)() << endl; + // cout << bind(f, 1)() << endl; + int (*p)(int, int) = f; + cout << bind(p, 1, 2)() << endl; + // int (*q)(int) = f; + // cout << bind(q, 1)() << endl; + return 0; +} Added: sandbox/trunk/src/cc/boost_bind_refs.cc =================================================================== --- sandbox/trunk/src/cc/boost_bind_refs.cc (rev 0) +++ sandbox/trunk/src/cc/boost_bind_refs.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,20 @@ +#include <boost/bind.hpp> +#include <iostream> +#include <vector> + +using namespace std; +using namespace boost; + +void f(int &x) { + cout << "inside: x = " << &x << endl; +} + +int main() { + int x; + cout << "outside: x = " << &x << endl; + // This will create a copy. (No idea where, though.) + bind(f, x)(); + // This will display the same address. + bind(f, ref(x))(); + return 0; +} Added: sandbox/trunk/src/cc/boost_function.cc =================================================================== --- sandbox/trunk/src/cc/boost_function.cc (rev 0) +++ sandbox/trunk/src/cc/boost_function.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,15 @@ +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <iostream> +using namespace boost; +using namespace std; +void show(int x) { cout << x << endl; } +void twice(function0<void> f) { f(); f(); } +int main() { + // This works... + function<void()> f = bind(show, 0); + bind(twice, f)(); + // ...but this does not compile. + bind(twice, bind(show, 0))(); + return 0; +} Added: sandbox/trunk/src/cc/boost_lambda.cc =================================================================== --- sandbox/trunk/src/cc/boost_lambda.cc (rev 0) +++ sandbox/trunk/src/cc/boost_lambda.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,17 @@ +#include <algorithm> +#include <boost/bind.hpp> +#include <boost/lambda/lambda.hpp> +#include <iostream> +#include <vector> +using namespace boost; +using namespace std; +class C { + public: int f() const { return 0; } +}; +int main() { + vector<C> cs(10); + vector<int> is; + transform(cs.begin(), cs.end(), back_inserter(is), bind(&C::f, _1)); + //for_each(is.begin(), is.end(), cout << _1 << constant(endl)); + return 0; +} Modified: sandbox/trunk/src/cc/boost_lexical_cast.cc =================================================================== --- sandbox/trunk/src/cc/boost_lexical_cast.cc 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/boost_lexical_cast.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -15,7 +15,7 @@ si = "asdf"; try { i = lexical_cast<int>(si); } - catch (exception& ex) { cout << "got an exception: " << ex.what() << endl; } + catch (std::exception& ex) { cout << "got an exception: " << ex.what() << endl; } //catch (...) { cout << "got an exception" << endl; } // Doesn't work. @@ -32,5 +32,12 @@ ss >> b; cout << b << endl; + // If you compile with -Wconversion against boost 1.35, you'll see: + // warning: conversion to ‘char’ from ‘int’ may alter its value + string s2 = lexical_cast<string>(3); + cout << s2 << endl; + const char *s = lexical_cast<string>(3).c_str(); + cout << s << endl; + return 0; } Modified: sandbox/trunk/src/cc/boost_serialization.cc =================================================================== --- sandbox/trunk/src/cc/boost_serialization.cc 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/boost_serialization.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -26,19 +26,51 @@ } int main() { - stringstream ss; long long x = currentTimeMillis(); + cout << x << endl; - //string s = ss.str(); - //s.c_str(); + { + stringstream ss; - binary_oarchive oa(ss); - oa & x; + // Using: + // + // stringstream ss(stringstream::binary); + // + // gives me: + // + // terminate called after throwing an instance of 'boost::archive::archive_exception' + // what(): stream error + // Aborted - binary_iarchive ia(ss); - ia & x; + //string s = ss.str(); + //s.c_str(); - cout << x << endl; + binary_oarchive oa(ss); + oa & x; + cout << ss.tellp() << endl; // something huge, like 47 bytes + + stringstream ss2(ss.str()); + binary_iarchive ia(ss2); + ia & x; + + cout << x << endl; + } + + { + stringbuf sb; + + // Same as above exception for: + // stringbuf sb(ios_base::binary); + + binary_oarchive oa(sb); + oa & x; + + binary_iarchive ia(sb); + ia & x; + + cout << x << endl; + } + return 0; } Modified: sandbox/trunk/src/cc/boost_serialization.mk =================================================================== --- sandbox/trunk/src/cc/boost_serialization.mk 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/boost_serialization.mk 2008-12-03 06:28:53 UTC (rev 1085) @@ -1,8 +1,8 @@ all: boost_serialization boost_serialization: boost_serialization.cc - g++ -O3 -Wall -pipe -o boost_serialization \ - -I/opt/boost-head-2007.03.25/include \ - -L/opt/boost-head-2007.03.25/lib \ - -lboost_serialization-gcc41-mt \ - boost_serialization.cc + g++ -O3 -Wall -pipe -o boost_serialization \ + -I/opt/boost-head-2007.03.25/include \ + -L/opt/boost-head-2007.03.25/lib \ + -lboost_serialization-gcc41-mt \ + boost_serialization.cc Added: sandbox/trunk/src/cc/copies.cc =================================================================== --- sandbox/trunk/src/cc/copies.cc (rev 0) +++ sandbox/trunk/src/cc/copies.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,156 @@ +// When are copies optimized out? +// +// This is called Named Return Value Optimization (NRVO) and is described well +// here: <http://msdn.microsoft.com/en-us/library/ms364057(VS.80).aspx> + +#include <iostream> +using namespace std; + +class A { + public: + A() { cout << "A() this=" << this << endl; } + A(const A &a) { cout << "A(A) this=" << this << endl; } + ~A() { cout << "~A() this=" << this << endl; } + void operator=(const A& a) { cout << "A=A this=" << this << endl; } + int x; +}; + +void f(A &a) { a.x = 1; } +A g() { A a; a.x = 1; return a; } +A h() { return A(); } +void i(A a) { a.x = 1; } +A j(A a) { cout << "j()" << endl; return a; } + +int main() { + // A() is called once as expected. + { + A a; + f(a); + } + cout << endl; + // Prints: + // A() this=0x7fff82dbdfa0 + // ~A() this=0x7fff82dbdfa0 + + // A() is only called once! A(A) is not called. + { + A a = g(); + } + cout << endl; + // Prints: + // A() this=0x7fff82dbdfa0 + // ~A() this=0x7fff82dbdfa0 + // + // With -fno-elide-constructors: + // A() this=0x7fff997af930 + // A(A) this=0x7fff997af990 + // ~A() this=0x7fff997af930 + // A(A) this=0x7fff997af980 + // ~A() this=0x7fff997af990 + // ~A() this=0x7fff997af980 + + // NRVO is still happening, even though there are two objects created here. + { + A a; + a = g(); + } + cout << endl; + // Prints: + // A() this=0x7fff82dbdfa0 + // A() this=0x7fff82dbdf90 + // A=A this=0x7fff82dbdfa0 + // ~A() this=0x7fff82dbdf90 + // ~A() this=0x7fff82dbdfa0 + // + // With -fno-elide-constructors: + // A() this=0x7fff997af980 + // A() this=0x7fff997af930 + // A(A) this=0x7fff997af970 This is the temporary object. + // ~A() this=0x7fff997af930 + // A=A this=0x7fff997af980 + // ~A() this=0x7fff997af970 + // ~A() this=0x7fff997af980 + + // Same as above. + { + A a; + a = h(); + } + cout << endl; + // Prints: + // A() this=0x7fff8bf1d0f0 + // A() this=0x7fff8bf1d0e0 + // A=A this=0x7fff8bf1d0f0 + // ~A() this=0x7fff8bf1d0e0 + // ~A() this=0x7fff8bf1d0f0 + // + // With -fno-elide-constructors: + // A() this=0x7fff788dbaa0 + // A() this=0x7fff788dba50 + // A(A) this=0x7fff788dba90 + // ~A() this=0x7fff788dba50 + // A=A this=0x7fff788dbaa0 + // ~A() this=0x7fff788dba90 + // ~A() this=0x7fff788dbaa0 + + // So far, only looked at return values. What about arguments? These can + // also be optimized! + { + i(A()); + } + cout << endl; + // Prints: + // A() this=0x7fffca8b7a70 + // ~A() this=0x7fffca8b7a70 + // + // With -fno-elide-constructors: + // A() this=0x7ffffaea7050 + // A(A) this=0x7ffffaea7040 + // ~A() this=0x7ffffaea7040 + // ~A() this=0x7ffffaea7050 + + // Again, cannot just separate the temp object creation from the expression. + { + A a; + i(a); + } + cout << endl; + // Prints: + // A() this=0x7fff3b6dc890 + // A(A) this=0x7fff3b6dc880 + // ~A() this=0x7fff3b6dc880 + // ~A() this=0x7fff3b6dc890 + // + // With -fno-elide-constructors: + // A() this=0x7fff4b27a410 + // A(A) this=0x7fff4b27a400 + // ~A() this=0x7fff4b27a400 + // ~A() this=0x7fff4b27a410 + + // Try both return values and arguments: there's a copy between the argument + // and return value (but again, with NRVO you get rid of the arg-to-arg copy + // and the ret-to-ret copy). + { + A a = j(A()); + } + cout << endl; + // Prints: + // A() this=0x7fffc77108a0 + // j() + // A(A) this=0x7fffc77108b0 This copy is from argument to return value + // ~A() this=0x7fffc77108a0 + // ~A() this=0x7fffc77108b0 + // + // With -fno-elide-constructors: + // A() this=0x7fff386527c0 + // A(A) this=0x7fff386527b0 Arg-to-arg copy + // j() + // A(A) this=0x7fff386527a0 + // A(A) this=0x7fff386527d0 Ret-to-ret copy + // ~A() this=0x7fff386527a0 + // ~A() this=0x7fff386527b0 + // ~A() this=0x7fff386527c0 + // ~A() this=0x7fff386527d0 + + return 0; +} Added: sandbox/trunk/src/cc/copies2.cc =================================================================== --- sandbox/trunk/src/cc/copies2.cc (rev 0) +++ sandbox/trunk/src/cc/copies2.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,39 @@ +// Puzzler involving return-value optimization. + +#include <iostream> +using namespace std; +class c { + public: + c() : c_(NULL) { cout << "c(" << c_ << ") this=" << this << endl; } + c(const c& c) : c_(&c) { cout << "c(" << c_ << ") this=" << this << endl; } + ~c() { cout << "~c(" << c_ << ") this=" << this << endl; } + int f() { cout << "c(" << c_ << ").f() this=" << this << endl; return 0; } + private: + const c* c_; +}; +template<typename T> void nop(const T &x) { cout << "nop()" << endl; } +int main() { + nop(c(c()).f()); + return 0; +} + +/* +I get: + + c(0) this=0x7fff13f56120 + c(0).f() this=0x7fff13f56120 + nop() + ~c(0) this=0x7fff13f56120 + +I'm expecting (where .... are non-zero values): + + c(0) this=... + c(...) this=... + c(...).f() this=... + nop() + ~c(...) this=... + ~c(0) this=... + +What's happening is RVO: +<http://msdn.microsoft.com/en-us/library/ms364057(VS.80).aspx> +*/ Modified: sandbox/trunk/src/cc/exceptions.cc =================================================================== --- sandbox/trunk/src/cc/exceptions.cc 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/exceptions.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -1,3 +1,6 @@ +// Wrapping my head around exceptions. + +#include <cassert> #include <iostream> #include <stdexcept> @@ -5,13 +8,116 @@ class e1 : public exception {}; +class e2 : public exception { + public: + const char *what() const throw() { return "happy"; } +}; + +class e3 : public exception { + public: + e3() { cout << "e3()" << endl; } + e3(const e3 &e) { cout << "e3(e)" << endl; } + virtual ~e3() throw() { cout << "~e3()" << endl; } +}; + +void +test_scope() +{ + int x; + { + int y; + cout << "&x = " << &x << endl; + cout << "&y = " << &y << endl; + assert(&x > &y); + + cout << endl; + + // Exceptions are allocated at a special place on the heap. + try { throw e3(); } + catch (exception &e) { cout << "&e = " << &e << endl; } + + cout << endl; + + // This results in a copy being made onto the stack, below the above locals + // x,y. + try { throw e3(); } + catch (e3 e) { cout << "&e = " << &e << endl; } + } +} + int main() { + // + // Uncomment the following to see an exception thrown. + // + // $ ./exceptions + // terminate called after throwing an instance of 'e2' + // what(): happy + // Aborted + // + + // throw e2(); + + // + // Throwing e1: these all catch fine. + // + try { throw e1(); } catch (e1 & e) { cout << "got " << e.what() << endl; } - // This catches fine. try { throw e1(); } catch (exception & e) { cout << "got " << e.what() << endl; } - // This doesn't work. - try { throw e1(); } catch (exception * e) { cout << "got " << e->what() << endl; } + try { throw e1(); } catch (exception e) { cout << "got " << e.what() << endl; } + cout << endl; + + // + // Throwing e2. + // + + // This doesn't catch (of course). + // try { throw e2(); } catch (e1 & e) { cout << "got " << e.what() << endl; } + + // This catches. + try { throw e2(); } catch (exception & e) { cout << "got " << e.what() << endl; } + + // This *converts* the e2 to std::exception! (Or perhaps a more appropriate + // word is *wraps*.) + try { throw e2(); } catch (exception e) { cout << "got " << e.what() << endl; } + + cout << endl; + + // + // Catching by ptr. + // + + // These don't catch. Think of the catch argument as parameters in a + // function signature. + // try { throw e2(); } catch (e2 * e) { cout << "got " << e->what() << endl; } + // try { throw e2(); } catch (exception * e) { cout << "got " << e->what() << endl; } + + // This doesn't catch. + // try { throw e2(); } catch (void * p) { cout << "got " << p << endl; } + + // This doesn't catch. + // try { throw "abc"; } catch (void * p) { cout << "got " << p << endl; } + + // This catches. + try { throw new e2(); } catch (exception *e) { cout << "got " << e->what() << endl; } + + // This catches. + try { throw new e2(); } catch (void *p) { cout << "got " << p << endl; } + + cout << endl; + + // + // C++ has no `finally`, and probably won't; this is a MS extension. + // + + // try {} finally {} + + // + // What is the scope/lifetime of an exception? Where are they allocated? + // + + test_scope(); + return 0; } Added: sandbox/trunk/src/cc/inline.cc =================================================================== --- sandbox/trunk/src/cc/inline.cc (rev 0) +++ sandbox/trunk/src/cc/inline.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,20 @@ +// Play around with compiling inlined functions. +// +// g++ -g3 will leave the original inlined function in, so that you can do +// (e.g.) `print f()`. +// +// However, I can't interactively use the inlined functions of the STL; I get: +// +// (gdb) p xs.at(0) +// Cannot evaluate function -- may be inlined +// +// See relevant post: <http://ubuntuforums.org/showthread.php?p=6280135> + +#include <vector> +using namespace std; +inline int f() { return 0; } +int main() { + vector<int> xs; + xs.push_back(0); + return f(); +} Added: sandbox/trunk/src/cc/names2.cc =================================================================== --- sandbox/trunk/src/cc/names2.cc (rev 0) +++ sandbox/trunk/src/cc/names2.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,28 @@ +template<typename T> class a { public: int f() { return 0; } }; +template<typename T> class b : public a<T> {}; +typedef a<int> c; + +class d { + void a() {} + + // cannot have this given the above, or you'll get: + // error: ISO C++ forbids declaration of ‘a’ with no type + // a<int> y; + b<int> y; + + c z; + + // more generally, types and values share the same namespace when inside a + // class, so this is not allowed, or you'll get: + // error: changes meaning of ‘c’ from ‘typedef class a<int> c’ + // c c; + + // but this works like a charm! + ::a<int> w; +}; + +int main() { + // but it's allowed for local variables + c c; + return c.f(); +} Added: sandbox/trunk/src/cc/protobuf.cc =================================================================== --- sandbox/trunk/src/cc/protobuf.cc (rev 0) +++ sandbox/trunk/src/cc/protobuf.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,43 @@ +#include <iostream> +#include <string> +#include <sstream> +#include "protobuf.pb.h" + +using namespace std; + +int +main() +{ + GOOGLE_PROTOBUF_VERIFY_VERSION; + + stringstream output; + { + Init init; + for (int i = 0; i < 3; i++) { + SockAddr * sa = init.add_node(); + sa->set_host(123); + sa->set_port(321); + } + + //fstream output("tmp", ios::out | ios::trunc | ios::binary); + if (!init.SerializeToOstream(&output)) { + cerr << "failed" << endl; + return -1; + } + } + + stringstream &input = output; + { + Init init; + if (!init.ParseFromIstream(&input)){ + cerr << "failed" << endl; + return -1; + } + for (int i = 0; i < init.node_size(); i++) { + const SockAddr& sa = init.node(i); + cout << sa.host() << ':' << sa.port() << endl; + } + } + + return 0; +} Added: sandbox/trunk/src/cc/protobuf.proto =================================================================== --- sandbox/trunk/src/cc/protobuf.proto (rev 0) +++ sandbox/trunk/src/cc/protobuf.proto 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,33 @@ +message SockAddr { + required int32 host = 1; + required int32 port = 2; +} +message Init { + repeated SockAddr node = 1; +} +message Op { + enum OpType { + read = 0; + write = 1; + del = 2; + } + required OpType type = 1; + required int32 key = 2; + optional int32 value = 3; +} +message Txn { + repeated Op op = 1; +} +message Response { + repeated int32 result = 1; +} +message Ready { + optional int32 ready = 1; +} +message Recovery { + message Pair { + required int32 key = 1; + required int32 value = 2; + } + repeated Pair pair = 1; +} Added: sandbox/trunk/src/cc/rtti.cc =================================================================== --- sandbox/trunk/src/cc/rtti.cc (rev 0) +++ sandbox/trunk/src/cc/rtti.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,25 @@ +// names are prefixed with their length + +#include <typeinfo> +#include <iostream> +using namespace std; + +class MyClass {}; +class SomeOtherClass {}; + +int +main() +{ + MyClass c; + const type_info &info = typeid(c); + cout << info.name() << endl; + + SomeOtherClass x; + cout << typeid(x).name() << endl; // value + cout << typeid(SomeOtherClass).name() << endl; // type + cout << typeid(SomeOtherClass()).name() << endl; // weirdness + cout << typeid(new SomeOtherClass).name() << endl; // ptr + cout << typeid((SomeOtherClass&)*(new SomeOtherClass)).name() << endl; // ref + + return 0; +} Added: sandbox/trunk/src/cc/set.cc =================================================================== --- sandbox/trunk/src/cc/set.cc (rev 0) +++ sandbox/trunk/src/cc/set.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,14 @@ +// Prints out: 0 1 1 +#include <set> +#include <iostream> +using namespace std; +int main() { + set<int> s; + s.insert(1); + s.insert(2); + s.end(); + cout << s.erase(3) << endl; + cout << s.erase(2) << endl; + cout << s.erase(1) << endl; + return 0; +} Modified: sandbox/trunk/src/cc/st/basics.cc =================================================================== --- sandbox/trunk/src/cc/st/basics.cc 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/st/basics.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -12,7 +12,7 @@ void* f(void *p) { - stfd s = tcp_connect("www.google.com", 80); + stfd s = st_tcp_connect("www.google.com", 80, ST_UTIME_NO_TIMEOUT); return NULL; } @@ -32,11 +32,11 @@ int main() { - check0(st_init()); - st_thread_t t1 = checkpass(st_thread_create(&f, (void*)"abc", 0, 0)); - checkpass(st_thread_create(&f, (void*)"abc", 0, 0)); - checkpass(st_thread_create(&g, (void*)t1, 0, 0)); - //check0(st_thread_join(t, NULL)); + check0x(st_init()); + st_thread_t t1 = checkerr(st_thread_create(&f, (void*)"abc", 0, 0)); + checkerr(st_thread_create(&f, (void*)"abc", 0, 0)); + checkerr(st_thread_create(&g, (void*)t1, 0, 0)); + //check0x(st_thread_join(t, NULL)); st_thread_exit(NULL); return 1; } Added: sandbox/trunk/src/cc/st/basics.mk =================================================================== --- sandbox/trunk/src/cc/st/basics.mk (rev 0) +++ sandbox/trunk/src/cc/st/basics.mk 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,2 @@ +basics: basics.cc + g++ -Wall -o basics basics.cc -lst -lstx -lresolv Added: sandbox/trunk/src/cc/st/exceptions.cc =================================================================== --- sandbox/trunk/src/cc/st/exceptions.cc (rev 0) +++ sandbox/trunk/src/cc/st/exceptions.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,28 @@ +// By default, the stack is not unwound in the other threads, so the destructor +// here is not called. + +#include <commons/check.h> +#include <exception> +#include <iostream> +#include <st.h> + +using namespace commons; +using namespace std; + +class cleanup { + public: + ~cleanup() { cout << "clean up" << endl; } +}; + +void *f(void *) { + throw exception(); + return NULL; +} + +int main() { + cleanup c; + st_init(); + st_thread_t t = checkerr(st_thread_create(&f, (void*)"abc", 1, 0)); + check0x(st_thread_join(t, NULL)); + return 0; +} Added: sandbox/trunk/src/cc/st/exceptions.mk =================================================================== --- sandbox/trunk/src/cc/st/exceptions.mk (rev 0) +++ sandbox/trunk/src/cc/st/exceptions.mk 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,2 @@ +exceptions: exceptions.cc + g++ -o exceptions exceptions.cc -lst Added: sandbox/trunk/src/cc/st/exceptions2.cc =================================================================== --- sandbox/trunk/src/cc/st/exceptions2.cc (rev 0) +++ sandbox/trunk/src/cc/st/exceptions2.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,26 @@ +#include <commons/st/st.h> +#include <exception> +#include <iostream> +#include <st.h> + +using namespace commons; +using namespace std; + +class cleanup { + public: + ~cleanup() { cout << "cleanup" << endl; } +}; + +void *f(void *) { + cout << "f called" << endl; + try { throw exception(); } + catch (...) { cout << "caught" << endl; } +} + +int main() { + cleanup c; + st_init(); + st_thread_t t = st_thread_create(&f, NULL, 1, 0); + st_thread_join(t, NULL); + return 0; +} Added: sandbox/trunk/src/cc/st/exceptions2.mk =================================================================== --- sandbox/trunk/src/cc/st/exceptions2.mk (rev 0) +++ sandbox/trunk/src/cc/st/exceptions2.mk 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,2 @@ +exceptions2: exceptions2.cc + g++ -o exceptions2 exceptions2.cc -lst -lstx -lresolv Modified: sandbox/trunk/src/cc/streams.cc =================================================================== --- sandbox/trunk/src/cc/streams.cc 2008-12-02 21:05:08 UTC (rev 1084) +++ sandbox/trunk/src/cc/streams.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -1,3 +1,5 @@ +// Playing with streams. + #include <fstream> #include <iostream> #include <sstream> @@ -6,32 +8,46 @@ using namespace std; int main() { -// string filename; -// getline(cin, filename); -// ifstream myFile(filename.c_str()); -// string x; -// myFile >> x; -// cout << x; + // Basic usage. + // + // Prompt the user for a filename and display its contents. +#if 0 + string filename; + getline(cin, filename); + ifstream myFile(filename.c_str()); + string x; + myFile >> x; + cout << x; +#endif - // TODO test out fread to make a simple case in which it doesn't block - //socket(); - //fopen(); - //fread(); - //fclose(); + // binary vs. text + // + // Stringstream does *not* replace \r\n with \n. This actually prints out 97 + // 13 10 97 on Linux. Unclear what the difference is between text and binary + // mode. TODO figure out the difference. +#if 0 + stringstream ss; + ss.write("a\r\na", 4); + cout << ss.str().size() << "characters: "; + char a[4]; + ss.read(a, 4); + for (int i = 0; i < 4; i++) + { + cout << static_cast<int>(a[i]) << " "; + } + cout << endl; +#endif - // TODO make sure that this does the same thing when compiled natively for - // Windows (i.e., are read/write affected by binary mode?) - stringstream ss; - ss.write("a\r\na", 4); - cout << ss.str().size() << endl; - char a[4]; - ss.read(a, 4); - for (int i = 0; i < 4; i++) - { - cout << static_cast<int>(a[i]) << " "; - } - cout << endl; - return 0; - - // TODO look for a lib that already does this stuff. what was that one? ___io___? + // Avoid stringstream(string(...)). + // + // TODO figure out why it's problematic. + // + const char *buf = "hello!"; + // The correct way: + string s(buf); + stringstream ss(s); + // The incorrect way: + // stringstream ss(string(buf)); + cout << ss.str() << endl; + return 0; } Added: sandbox/trunk/src/cc/strings.cc =================================================================== --- sandbox/trunk/src/cc/strings.cc (rev 0) +++ sandbox/trunk/src/cc/strings.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,23 @@ +#include <string> +#include <iostream> +using namespace std; +int main() { + // Are the heap data backing strings shared? Yes! + { + string s("abc"); + string s2(s); + s[0]='A'; + cout << s2 << endl; // Prints Abc! + } + + const char *p; + { + string s = "hello!"; + // This is a pointer to the internal buffer maintained by the string + // object. Must use strdup() or some such. + p = s.c_str(); + } + // E.g., valgrind would flag this. + cout << p << endl; + return 0; +} Added: sandbox/trunk/src/cc/thrift.cc =================================================================== --- sandbox/trunk/src/cc/thrift.cc (rev 0) +++ sandbox/trunk/src/cc/thrift.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,33 @@ +#include "gen-cpp/thrift_types.h" +#include <boost/foreach.hpp> +#include <boost/shared_ptr.hpp> +#include <iostream> +#include <protocol/TBinaryProtocol.h> +#include <transport/TBufferTransports.h> +using namespace boost; +using namespace facebook::thrift::protocol; +using namespace facebook::thrift::transport; +using namespace std; +#define foreach BOOST_FOREACH + +int main() { + shared_ptr<TTransport> transport(new TMemoryBuffer()); + TBinaryProtocol prot(transport); + { + init init; + init.nodes.resize(5); + foreach (sock_addr &node, init.nodes) { + node.host = 16777343; + node.port = 7654; + } + init.write(&prot); + } + { + init init; + init.read(&prot); + foreach (sock_addr &node, init.nodes) { + cout << node.host << ':' << node.port << endl; + } + } + return 0; +} Added: sandbox/trunk/src/cc/thrift.mk =================================================================== --- sandbox/trunk/src/cc/thrift.mk (rev 0) +++ sandbox/trunk/src/cc/thrift.mk 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,8 @@ +all: thrift + +thrift: thrift.cc thrift.thrift + thrift -strict --gen cpp thrift.thrift + g++ -g3 -Wall -I/opt/thrift/include/thrift -lthrift -o thrift \ + thrift.cc \ + gen-cpp/thrift_types.cpp \ + gen-cpp/thrift_constants.cpp Added: sandbox/trunk/src/cc/thrift.thrift =================================================================== --- sandbox/trunk/src/cc/thrift.thrift (rev 0) +++ sandbox/trunk/src/cc/thrift.thrift 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,29 @@ +enum op_type { + op_read, op_write, op_del +} +struct sock_addr { + 1: i32 host, + 2: i16 port +} +struct init { + 1: list<sock_addr> nodes +} +struct op { + 1: op_type type, + 2: i32 key, + 3: optional i32 value +} +struct txn { + 1: list<op> ops +} +struct response { + 1: list<i32> results +} +struct ready {} +struct pair { + 1: i32 key, + 2: i32 value +} +struct recovery { + 1: list<pair> pairs +} Added: sandbox/trunk/src/cc/unwinding.cc =================================================================== --- sandbox/trunk/src/cc/unwinding.cc (rev 0) +++ sandbox/trunk/src/cc/unwinding.cc 2008-12-03 06:28:53 UTC (rev 1085) @@ -0,0 +1,14 @@ +// Uncaught exceptions do not result in stack unwinding! They call terminate() +// which calls abort(), immediately ending the program. +#include <iostream> +using namespace std; +class cleanup { + public: + ~cleanup() { cout << "~c()" << endl; } + void f() {} +}; +int main() { + cleanup c; + c.f(); + throw exception(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-02 21:05:16
|
Revision: 1084 http://assorted.svn.sourceforge.net/assorted/?rev=1084&view=rev Author: yangzhang Date: 2008-12-02 21:05:08 +0000 (Tue, 02 Dec 2008) Log Message: ----------- - added information on when the recovering joiner catches up (as measured by the leader) - made the optional recovery yields an option - more cleanup/comments, updated docs Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-02 05:37:00 UTC (rev 1083) +++ ydb/trunk/README 2008-12-02 21:05:08 UTC (rev 1084) @@ -62,12 +62,23 @@ ./ydb localhost 7654 7657 -It will connect to the first replica (on port 7655) and receive a DB dump from it. +It will connect to the first replica (on port 7655) and receive a DB dump from +it. To terminate the system, send a sigint (ctrl-c) to the leader, and a clean shutdown should take place. The replicas dump their DB state to a tmp file, which you can then verify to be identical. +Recovery Mechanisms +------------------- + +The following are currently implemented: + +- Network recovery + - From a single node + - Interleave the state recovery/catch up with the backlogging of live txns + - Recover/catch up in one swoop, then backlog the live txns + Pseudo-code ----------- @@ -110,8 +121,13 @@ - Add test suite. +- Add benchmarking hooks, e.g.: + - start the recovering joiner at a well-defined time (after a certain # txns + or after the DB reaches a certain size) + - Add benchmarking information, e.g.: - txns/second normally + - txns during recovery - txns/second during recovery - time to recover - bytes used to recover Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-02 05:37:00 UTC (rev 1083) +++ ydb/trunk/src/main.lzz 2008-12-02 21:05:08 UTC (rev 1084) @@ -5,6 +5,7 @@ #include <boost/scoped_array.hpp> #include <commons/nullptr.h> #include <commons/st/st.h> +#include <commons/time.h> #include <csignal> #include <cstdio> #include <cstdlib> @@ -30,11 +31,15 @@ extern const st_utime_t timeout = 1000000; const int chkpt = 10000; const bool verbose = true; +const bool yield_during_recovery = false; +const bool yield_during_catch_up = false; +const bool use_epoll = false; const uint16_t base_port = 7654; st_intr_bool stop_hub, kill_hub; /** - * The list of threads. + * The list of all threads. Keep track of these so that we may cleanly shut + * down all threads. */ set<st_thread_t> threads; @@ -256,11 +261,13 @@ * leader. */ void -process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno) +process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno, + bool caught_up) { checkeq(txn.seqno(), seqno + 1); Response res; res.set_seqno(txn.seqno()); + res.set_caught_up(caught_up); seqno = txn.seqno(); for (int o = 0; o < txn.op_size(); o++) { const Op &op = txn.op(o); @@ -296,7 +303,7 @@ if (txn.has_seqno()) { if (txn.seqno() == seqno + 1) { - process_txn(leader, map, txn, seqno); + process_txn(leader, map, txn, seqno, true); } else { // Queue up for later processing once a snapshot has been received. backlog.push(new Txn(txn)); @@ -321,14 +328,19 @@ * Keep swallowing replica responses. */ void -handle_responses(st_netfd_t replica, const int &seqno) +handle_responses(st_netfd_t replica, const int &seqno, bool caught_up) { + long long start_time = current_time_millis(); while (true) { Response res; { st_intr intr(kill_hub); readmsg(replica, res); } + if (!caught_up && res.caught_up()) { + caught_up = true; + cout << "recovering node caught up; took " << current_time_millis() - start_time << "ms" << endl; + } if (res.seqno() % chkpt == 0) { if (verbose) cout << "got response " << res.seqno() << " from " << replica << endl; @@ -434,7 +446,7 @@ // Start handling responses. st_thread_group handlers; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno)))); + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno), true))); } // Accept the recovering node, and tell it about the online replicas. @@ -454,7 +466,7 @@ cout << "start streaming txns to joiner" << endl; replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno)))); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno), false))); } /** @@ -522,6 +534,9 @@ for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); + if (i % chkpt == 0) { + if (yield_during_recovery) st_sleep(0); + } } assert(seqno == -1 && static_cast<typeof(seqno)>(recovery.seqno()) > seqno); @@ -530,10 +545,10 @@ while (!backlog.empty()) { Txn *p = backlog.take(); - process_txn(leader, map, *p, seqno); + process_txn(leader, map, *p, seqno, false); if (p->seqno() % chkpt == 0) { cout << "processed txn " << p->seqno() << " off the backlog" << endl; - st_sleep(0); + if (yield_during_catch_up) st_sleep(0); } delete p; } @@ -583,12 +598,16 @@ } } +/** + * Initialization and command-line parsing. + */ int main(int argc, char **argv) { try { GOOGLE_PROTOBUF_VERIFY_VERSION; + // Initialize support for ST working with asynchronous signals. check0x(pipe(sig_pipe)); struct sigaction sa; sa.sa_handler = handle_sig; @@ -596,16 +615,22 @@ sa.sa_flags = 0; check0x(sigaction(SIGINT, &sa, nullptr)); - //check0x(st_set_eventsys(ST_EVENTSYS_ALT)); + // Initialize ST. + if (use_epoll) check0x(st_set_eventsys(ST_EVENTSYS_ALT)); check0x(st_init()); + st_spawn(bind(handle_sig_sync)); + + // Initialize thread manager for clean shutdown of all threads. thread_eraser eraser; - st_spawn(bind(handle_sig_sync)); threads.insert(st_thread_self()); + + // Parse command-line arguments. if (argc != 2 && argc != 4) die("leader: ydb <nreplicas>\n" "replica: ydb <leaderhost> <leaderport> <listenport>\n"); bool is_leader = argc == 2; + // Which role are we? if (is_leader) { run_leader(atoi(argv[1])); } else { @@ -616,6 +641,7 @@ return 0; } catch (const std::exception &ex) { + // Must catch all exceptions at the top to make the stack unwind. cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; return 1; } Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2008-12-02 05:37:00 UTC (rev 1083) +++ ydb/trunk/src/ydb.proto 2008-12-02 21:05:08 UTC (rev 1084) @@ -52,6 +52,8 @@ required int32 seqno = 1; // The list of answers to read operations. repeated int32 result = 2; + // Whether the replica has caught_up. + required bool caught_up = 3; } // Message from a running node to a joining node to bring it up to speed. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-02 05:37:05
|
Revision: 1083 http://assorted.svn.sourceforge.net/assorted/?rev=1083&view=rev Author: yangzhang Date: 2008-12-02 05:37:00 +0000 (Tue, 02 Dec 2008) Log Message: ----------- - updated code and docs so that the system works with >2 replicas - only instruct a single node to help recover - only the recoverer connects to the helping node, and only that node - added another mode where the recoverer can process the backlog in concurrently with pushing live txns onto it - re-enabled killing of process_txns - bumped up the chkpt threshold - print strerror of errors - fixed signed/unsigned comparisons and other warnings (changed seqnos in pb msgs to be signed) - more clean-up - migrated to boost 1.37 (mainly had to avoid) Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2008-12-02 05:34:16 UTC (rev 1082) +++ ydb/trunk/README 2008-12-02 05:37:00 UTC (rev 1083) @@ -7,29 +7,29 @@ [VOLTDB]: http://db.cs.yale.edu/hstore/ -Currently, the only recovery implemented mechanism is to have one of the -replicas serialize the entire database state and send that to the joining node. +Currently, the only recovery implemented mechanism is to have the first-joining +replica serialize its entire database state and send that to the joining node. If you start a system of $n$ replicas, then the leader will wait for $n-1$ of them to join before it starts issuing transactions. (Think of $n-1$ as the minimum number of replicas the system requires before it is willing to process transactions.) Then when replica $n$ joins, it will need to catch up to the -current state of the system, and it will do so by contacting one of the other -replicas and requesting a complete dump of its DB state. +current state of the system, and it will do so by contacting that first replica +and receiving a complete dump of its DB state. The leader will report the current txn seqno to the joiner, and start streaming txns beyond that seqno to the joiner, which the joiner will push onto its -backlog. It will also instruct the standing replicas to snapshot and send -their DB state at this txn seqno. As a result, the standing replicas will -pause once they get this message until they can send their state to the joiner. +backlog. It will also instruct that first replica to snapshot its DB state at +this txn seqno and prepare to send it to the recovering node as soon as it +connects. Setup ----- Requirements: -- [boost] 1.35 -- [C++ Commons] svn r1074 +- [boost] 1.37 +- [C++ Commons] svn r1082 - [GCC] 4.3.2 - [Lazy C++] 2.8.0 - [Protocol Buffers] 2.0.0 @@ -45,19 +45,29 @@ Usage ----- -To start a leader to manage 2 replicas, run: +To start a leader to manage 3 replicas, run: - ./ydb 2 + ./ydb 3 -This will listen on port 7654. Then to start a replica, run: +This will listen on port 7654. Then to start the first two replicas, run: ./ydb localhost 7654 7655 + ./ydb localhost 7654 7656 This means "connect to the leader at localhost:7654, and listen on port 7655." -The replicas have to listen for connections from other replicas. +The replicas have to listen for connections from other replicas (namely the +recovering replica). -Currently handles only 2 replicas. +The recovering replica then joins: + ./ydb localhost 7654 7657 + +It will connect to the first replica (on port 7655) and receive a DB dump from it. + +To terminate the system, send a sigint (ctrl-c) to the leader, and a clean +shutdown should take place. The replicas dump their DB state to a tmp file, +which you can then verify to be identical. + Pseudo-code ----------- @@ -96,6 +106,10 @@ Todo ---- +- Expose program options. + +- Add test suite. + - Add benchmarking information, e.g.: - txns/second normally - txns/second during recovery Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-12-02 05:34:16 UTC (rev 1082) +++ ydb/trunk/src/Makefile 2008-12-02 05:37:00 UTC (rev 1083) @@ -22,8 +22,9 @@ LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ - -Winit-self -Wno-sign-compare -Wno-unused-parameter -Wc++0x-compat \ - -Wparentheses + -Winit-self -Wsign-promo -Wno-unused-parameter -Wc++0x-compat \ + -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ + -Winline -Wsynth PBCXXFLAGS := -g3 -Wall -Werror all: $(TARGET) Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-12-02 05:34:16 UTC (rev 1082) +++ ydb/trunk/src/main.lzz 2008-12-02 05:37:00 UTC (rev 1083) @@ -1,8 +1,3 @@ -// TODO -// - how does replication affect overhead? -// - implement other recovery schemes (disk-based) -// - verify correctness of the simple recovery scheme - #hdr #include <boost/bind.hpp> #include <boost/foreach.hpp> @@ -13,6 +8,7 @@ #include <csignal> #include <cstdio> #include <cstdlib> +#include <cstring> #include <iostream> #include <fstream> #include <map> @@ -32,7 +28,7 @@ // Why does just timeout require the `extern`? extern const st_utime_t timeout = 1000000; -const int chkpt = 1000; +const int chkpt = 10000; const bool verbose = true; const uint16_t base_port = 7654; st_intr_bool stop_hub, kill_hub; @@ -60,7 +56,7 @@ thread_eraser eraser; try { f(); - } catch (const exception &ex) { + } catch (const std::exception &ex) { cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; if (intr) stop_hub.set(); } @@ -162,9 +158,9 @@ // Broadcast the length-prefixed message to replicas. foreach (st_netfd_t dst, dsts) { checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), - static_cast<int>(sizeof len)); + static_cast<ssize_t>(sizeof len)); checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), - static_cast<int>(s.size())); + static_cast<ssize_t>(s.size())); } } @@ -190,7 +186,7 @@ uint32_t len; checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, timeout), - static_cast<int>(sizeof len)); + static_cast<ssize_t>(sizeof len)); len = ntohl(len); #define GETMSG(buf) \ @@ -225,11 +221,10 @@ while (!stop_hub) { // Did we get a new member? + if (!newreps.empty() && seqno > 0) { + sendmsg(fds[0], Txn()); + } while (!newreps.empty()) { - if (seqno > 0) { - Txn txn; - bcastmsg(fds, txn); - } fds.push_back(newreps.take().fd()); } @@ -294,10 +289,10 @@ { while (true) { Txn txn; - //{ - //st_intr intr(stop_hub); + { + st_intr intr(kill_hub); readmsg(leader, txn); - //} + } if (txn.has_seqno()) { if (txn.seqno() == seqno + 1) { @@ -306,18 +301,19 @@ // Queue up for later processing once a snapshot has been received. backlog.push(new Txn(txn)); } + + if (txn.seqno() % chkpt == 0) { + if (verbose) cout << "processed txn " << txn.seqno() << endl; + st_sleep(0); + } } else { // Wait for the snapshot to be generated. send_state.set(); cout << "waiting for state to be sent" << endl; sent_state.waitset(); sent_state.reset(); + cout << "state sent" << endl; } - - if (txn.seqno() % chkpt == 0) { - if (verbose) cout << "processed txn " << txn.seqno() << endl; - st_sleep(0); - } } } @@ -338,8 +334,8 @@ cout << "got response " << res.seqno() << " from " << replica << endl; st_sleep(0); } - if (stop_hub && res.seqno() == seqno - 1) { - cout << "seqno = " << seqno - 1 << endl; + if (stop_hub && res.seqno() + 1 == seqno) { + cout << "seqno = " << res.seqno() << endl; break; } } @@ -352,17 +348,11 @@ recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, st_bool &send_state, st_bool &sent_state) { - st_netfd_t joiner; + // Wait for the right time to generate the snapshot. { st_intr intr(stop_hub); - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); + send_state.waitset(); } - st_closing closing(joiner); - cout << "got recoverer's connection" << endl; - - // Wait for the right time to generate the snapshot. - send_state.waitset(); send_state.reset(); cout << "snapshotting state for recovery" << endl; @@ -377,7 +367,16 @@ // Notify process_txns that it may continue processing. sent_state.set(); - cout << "sending recovery" << endl; + // Wait for the new joiner. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + st_closing closing(joiner); + + cout << "got joiner's connection, sending recovery" << endl; sendmsg(joiner, recovery); cout << "sent" << endl; } @@ -402,7 +401,7 @@ { st_intr intr(stop_hub); fd = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); + ST_UTIME_NO_TIMEOUT)); } Join join; readmsg(fd, join); @@ -500,10 +499,10 @@ cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, INET_ADDRSTRLEN)) << ':' << sa.port() << (is_self ? " (self)" : "") << endl; - if (!is_self) { + if (!is_self && init.txnseqno() > 0) { replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(sa.port()), - timeout)); + ST_UTIME_NO_TIMEOUT)); } } @@ -515,13 +514,11 @@ // If there's anything to recover. if (init.txnseqno() > 0) { - cout << "waiting for recovery" << endl; + cout << "waiting for recovery from " << replicas[0] << endl; // Read the recovery message. Recovery recovery; - foreach (st_netfd_t r, replicas) { - readmsg(r, recovery); - } + readmsg(replicas[0], recovery); for (int i = 0; i < recovery.pair_size(); i++) { const Recovery_Pair &p = recovery.pair(i); map[p.key()] = p.value(); @@ -534,6 +531,10 @@ while (!backlog.empty()) { Txn *p = backlog.take(); process_txn(leader, map, *p, seqno); + if (p->seqno() % chkpt == 0) { + cout << "processed txn " << p->seqno() << " off the backlog" << endl; + st_sleep(0); + } delete p; } cout << "caught up." << endl; @@ -554,8 +555,9 @@ */ void handle_sig(int sig) { int err = errno; - cerr << "got signal: " << sig << endl; - checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), sizeof sig); + cerr << "got signal: " << strsignal(sig) << " (" << sig << ")" << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), + static_cast<ssize_t>(sizeof sig)); errno = err; } @@ -568,7 +570,7 @@ while (true) { int sig; checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), - sizeof sig); + static_cast<ssize_t>(sizeof sig)); if (sig == SIGINT) { if (!stop_hub) stop_hub.set(); else kill_hub.set(); @@ -613,7 +615,7 @@ } return 0; - } catch (const exception &ex) { + } catch (const std::exception &ex) { cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; return 1; } Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2008-12-02 05:34:16 UTC (rev 1082) +++ ydb/trunk/src/ydb.proto 2008-12-02 05:37:00 UTC (rev 1083) @@ -15,7 +15,7 @@ // Initialization message sent to a nodes when it joins. message Init { // The current seqno that the server is on. - required uint32 txnseqno = 1; + required int32 txnseqno = 1; // What the leader perceives to be the joining replica's IP address. required uint32 yourhost = 2; // The nodes that have joined (including the joining node); the ports here @@ -42,14 +42,14 @@ // A transaction. Currently just a simple sequence of Ops. message Txn { - optional uint32 seqno = 1; + optional int32 seqno = 1; repeated Op op = 2; } // Response to a transaction, containing a list of results. message Response { // The txn that this is a response for. - required uint32 seqno = 1; + required int32 seqno = 1; // The list of answers to read operations. repeated int32 result = 2; } @@ -62,7 +62,7 @@ } // The seqno that this recovery message will bring us up through (the last // txn seqno before the snapshot was generated). - required uint32 seqno = 1; + required int32 seqno = 1; // The data map. repeated Pair pair = 2; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-12-02 05:34:29
|
Revision: 1082 http://assorted.svn.sourceforge.net/assorted/?rev=1082&view=rev Author: yangzhang Date: 2008-12-02 05:34:16 +0000 (Tue, 02 Dec 2008) Log Message: ----------- - migrated to boost 1.37 - fixed some check calls - added printf format attributes to check functions - fixed format specifiers/typing by using stringstream/lexical_cast - updated and published the README Modified Paths: -------------- cpp-commons/trunk/README cpp-commons/trunk/src/commons/boost/delegates.h cpp-commons/trunk/src/commons/check.h cpp-commons/trunk/src/commons/die.h cpp-commons/trunk/src/commons/sockets.h cpp-commons/trunk/src/commons/st/st.h Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/README 2008-12-02 05:34:16 UTC (rev 1082) @@ -6,33 +6,43 @@ -------- The C++ Commons is a general-purpose utility library for the C++ programming -language. The library is nascent, so it doesn't have enough to warrant a -release. This is a library of header files, in the same spirit as [boost]. +language. This is a library of header files, in the same spirit as [boost]. Here are some of the features present in the library: -- check macros (assertions that aren't meant to be removed in releases) +- C functions for string manipulation - RAII utilities, such as for closing file descriptors -- low-level system information from cpuid +- `array`: thin wrapper around arrays (`scoped_array` + size) +- `pool`: fixed-size object pools +- bit manipulation +- bundles of header includes +- C++ support for pthreads, but allowing the user to access the underlying + `pthread_t` (a major annoyance of using boost threads was its complete + encapsulation) +- C++ support for [State Threads] +- check macros (like assertions but never removed from compilation) +- `deque`: simpler deque implementation that uses coarse-grained allocation +- error handling, such as `die()`, which leverages `strerror` - file I/O utilities, such as reading complete files +- function delegates (for use with C functions that take `(void*)(void*)`) - hash functions +- low-level system information from `cpuid` +- `nullptr`: from C++0x - pseudo-random number generators -- error handling, such as `die()` -- function delegates (for use with C functions that take `(void*)(void*)`) -- region-based allocation -- bundles of header includes +- region-based memory management - socket utilities -- C functions for string manipulation -- bit manipulation -- C++ support for pthreads, but allowing the user to access the underlying - `pthread_t` (a major annoyance of using boost threads was its complete - encapsulation) - portable re-implementations of pthread primitives such as barriers -- C++ support for [State Threads] +- time utilities, including timers and simpler interfaces to system clocks +- utilities for streams - utilities for [tamer] +- x86 architecture-specific tools -[State Threads]: http://state-threads.sourceforge.net/ +Setup +----- +Like [boost], just include these header files in your next C++ project, and +you're ready to go! + Related Work ------------ Modified: cpp-commons/trunk/src/commons/boost/delegates.h =================================================================== --- cpp-commons/trunk/src/commons/boost/delegates.h 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/src/commons/boost/delegates.h 2008-12-02 05:34:16 UTC (rev 1082) @@ -14,7 +14,7 @@ void swallow(const function0<void> f) { try { f(); } - catch (exception &ex) { cerr << ex.what() << endl; } + catch (std::exception &ex) { cerr << ex.what() << endl; } } /** @@ -45,7 +45,7 @@ // run_function0_ex(void* p) // { // try { run_function0(p); } -// catch (exception &ex) { return exception(ex); } +// catch (std::exception &ex) { return std::exception(ex); } // } } Modified: cpp-commons/trunk/src/commons/check.h =================================================================== --- cpp-commons/trunk/src/commons/check.h 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/src/commons/check.h 2008-12-02 05:34:16 UTC (rev 1082) @@ -5,6 +5,7 @@ #include <sstream> #include <string> #include <commons/die.h> +#include <boost/lexical_cast.hpp> // TODO: rename: // - chk(x): verifies and return x; if not, throw strerror @@ -31,9 +32,10 @@ // TODO: provide strerror in other functions too // TODO: better way to deal with errno? (rather than resetting it) + using namespace boost; using namespace std; - class check_exception : public exception + class check_exception : public std::exception { public: check_exception(const string & name) : name(name) {} @@ -43,7 +45,7 @@ const string name; }; - inline void + __attribute__((format(printf, 4, 0))) inline void _vcheck(bool cond, const char *file, int line, const char *fmt, va_list ap) { if (!cond) { @@ -58,7 +60,7 @@ } } - inline void + __attribute__((format(printf, 4, 5))) inline void _check(bool cond, const char *file, int line, const char *fmt, ...) { va_list ap; @@ -104,7 +106,8 @@ if (!x) { int e = errno; errno = 0; - _check(x, file, line, "expecting !=0, got %d: %s", x, strerror(e)); + _check(x, file, line, "expecting !=0, got %s: %s", + lexical_cast<string>(x).c_str(), strerror(e)); } return x; } @@ -130,7 +133,7 @@ if (l != r) { stringstream ss; ss << "expecting " << l << " == " << r; - _check(false, file, line, ss.str().c_str()); + _check(false, file, line, "%s", ss.str().c_str()); } } @@ -140,8 +143,9 @@ if (l < 0) { int e = errno; errno = 0; - _check(l == r, file, line, - "expecting %d, got %d: %s", r, l, strerror(e)); + stringstream ss; + ss << "expecting " << r << ", got " << l << ": " << strerror(e); + _check(l == r, file, line, "%s", ss.str().c_str()); } else { _checkeq(l, r, file, line); } Modified: cpp-commons/trunk/src/commons/die.h =================================================================== --- cpp-commons/trunk/src/commons/die.h 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/src/commons/die.h 2008-12-02 05:34:16 UTC (rev 1082) @@ -15,7 +15,7 @@ * * TODO: move into C Commons. */ - void + __attribute__((format(printf, 1, 2))) void die(const char *format, ...) { const char *errstr; Modified: cpp-commons/trunk/src/commons/sockets.h =================================================================== --- cpp-commons/trunk/src/commons/sockets.h 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/src/commons/sockets.h 2008-12-02 05:34:16 UTC (rev 1082) @@ -46,7 +46,7 @@ sa.sin_addr.s_addr = htonl(INADDR_ANY); // Bind the socket to the local socket address. - check0x(bind(fd, (sockaddr*) &sa, sizeof sa)); + check0x(::bind(fd, (sockaddr*) &sa, sizeof sa)); return fd; } catch (...) { Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2008-11-30 23:46:31 UTC (rev 1081) +++ cpp-commons/trunk/src/commons/st/st.h 2008-12-02 05:34:16 UTC (rev 1082) @@ -100,7 +100,7 @@ sa.sin_addr = host; // Create the socket. - int sfd = checknneg(socket(PF_INET, SOCK_STREAM, 0)); + int sfd = checknnegerr(socket(PF_INET, SOCK_STREAM, 0)); st_netfd_t nfd = st_netfd_open_socket(sfd); // Connect. @@ -108,7 +108,7 @@ check0x(st_connect(nfd, (sockaddr*) &sa, sizeof sa, timeout)); return nfd; } catch (...) { - st_netfd_close(nfd); + check0x(st_netfd_close(nfd)); throw; } } @@ -313,17 +313,17 @@ st_intr_hub &hub_; }; - class st_group_join_exception : public exception + class st_group_join_exception : public std::exception { public: - st_group_join_exception(const map<st_thread_t, exception> &th2ex) : + st_group_join_exception(const map<st_thread_t, std::exception> &th2ex) : th2ex_(th2ex) {} virtual ~st_group_join_exception() throw() {} virtual const char *what() const throw() { if (!th2ex_.empty() && s == "") { bool first = true; stringstream ss; - typedef pair<st_thread_t, exception> p; + typedef pair<st_thread_t, std::exception> p; foreach (p p, th2ex_) { ss << (first ? "" : ", ") << p.first << " -> " << p.second.what(); first = false; @@ -333,7 +333,7 @@ return s.c_str(); } private: - map<st_thread_t, exception> th2ex_; + map<st_thread_t, std::exception> th2ex_; string s; }; @@ -357,10 +357,10 @@ { public: ~st_thread_group() { - map<st_thread_t, exception> th2ex; + map<st_thread_t, std::exception> th2ex; foreach (st_thread_t t, ts) { try { st_join(t); } - catch (exception &ex) { th2ex[t] = ex; } + catch (std::exception &ex) { th2ex[t] = ex; } } if (!th2ex.empty()) throw st_group_join_exception(th2ex); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-11-30 23:46:33
|
Revision: 1081 http://assorted.svn.sourceforge.net/assorted/?rev=1081&view=rev Author: yangzhang Date: 2008-11-30 23:46:31 +0000 (Sun, 30 Nov 2008) Log Message: ----------- - added simplest state-sending recovery - verifiably produce (dump) the same state on both machines - general clean up - filled out the README Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz ydb/trunk/src/ydb.proto Added Paths: ----------- ydb/trunk/README ydb/trunk/publish.bash Removed Paths: ------------- ydb/trunk/src/ydb.thrift Added: ydb/trunk/README =================================================================== --- ydb/trunk/README (rev 0) +++ ydb/trunk/README 2008-11-30 23:46:31 UTC (rev 1081) @@ -0,0 +1,120 @@ +Overview +-------- + +YDB (Yang's Database) is a simple replicated memory store, developed for the +purpose of researching various approaches to recovery in such OLTP-optimized +databases as [VOLTDB] (formerly H-Store/Horizontica). + +[VOLTDB]: http://db.cs.yale.edu/hstore/ + +Currently, the only recovery implemented mechanism is to have one of the +replicas serialize the entire database state and send that to the joining node. + +If you start a system of $n$ replicas, then the leader will wait for $n-1$ of +them to join before it starts issuing transactions. (Think of $n-1$ as the +minimum number of replicas the system requires before it is willing to process +transactions.) Then when replica $n$ joins, it will need to catch up to the +current state of the system, and it will do so by contacting one of the other +replicas and requesting a complete dump of its DB state. + +The leader will report the current txn seqno to the joiner, and start streaming +txns beyond that seqno to the joiner, which the joiner will push onto its +backlog. It will also instruct the standing replicas to snapshot and send +their DB state at this txn seqno. As a result, the standing replicas will +pause once they get this message until they can send their state to the joiner. + +Setup +----- + +Requirements: + +- [boost] 1.35 +- [C++ Commons] svn r1074 +- [GCC] 4.3.2 +- [Lazy C++] 2.8.0 +- [Protocol Buffers] 2.0.0 +- [State Threads] 1.8 + +[boost]: http://www.boost.org/ +[C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ +[GCC]: http://gcc.gnu.org/ +[Lazy C++]: http://www.lazycplusplus.com/ +[Protocol Buffers]: http://code.google.com/p/protobuf/ +[State Threads]: http://state-threads.sourceforge.net/ + +Usage +----- + +To start a leader to manage 2 replicas, run: + + ./ydb 2 + +This will listen on port 7654. Then to start a replica, run: + + ./ydb localhost 7654 7655 + +This means "connect to the leader at localhost:7654, and listen on port 7655." +The replicas have to listen for connections from other replicas. + +Currently handles only 2 replicas. + +Pseudo-code +----------- + +### Leader + + foreach event + if event == departure + remove replica + if event == join + add replica + send init msg to new replica + who else is in the system + which txn we're on + start sending txns to new replica + start handling responses from new replica + read responses up till the current seqno + +### Replica + + start listening for conns from new replicas + generate recovery msg from map + send recovery msg to new replica + send join msg to leader + recv init msg from leader + start recving txns from leader + if map is caught up + apply txn directly + else + push onto backlog + foreach replica + connect to replica + recv recovery msg from replica + apply the state + apply backlog + +Todo +---- + +- Add benchmarking information, e.g.: + - txns/second normally + - txns/second during recovery + - time to recover + - bytes used to recover + +- Run some benchmarks, esp. on multiple physical hosts. + +- Figure out why things are running so slowly with >2 replicas. + +- Add a variant of the recovery scheme so that the standing replicas can just + send any snapshot of their DB beyond a certain seqno. The joiner can simply + discard from its leader-populated backlog any txns before the seqno of the + actual state it receives. This way, no communication between the leader and + the standing replicas needs to take place, and the replicas don't need to + wait for the new guy to join before they can continue processing txns. + +- Add a recovery scheme to recover from multiple replicas simultaneously. + +- Add richer transactions/queries/operations. + +- Add disk-based recovery methods. Copied: ydb/trunk/publish.bash (from rev 1067, hash-join/trunk/publish.bash) =================================================================== --- ydb/trunk/publish.bash (rev 0) +++ ydb/trunk/publish.bash 2008-11-30 23:46:31 UTC (rev 1081) @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +fullname='YDB' +version=0.1 +license=gpl3 +websrcs=( README ) +rels=( src-tgz: ) +nodl=true +. assorted.bash "$@" Property changes on: ydb/trunk/publish.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/Makefile 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,4 +1,5 @@ TARGET := ydb +WTF := wtf LZZS := $(wildcard *.lzz) LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) @@ -31,10 +32,10 @@ $(CXX) -o $@ $^ $(LDFLAGS) %.o: %.cc $(PBHDRS) - wtf $(CXX) $(CXXFLAGS) -c -o $@ $< + $(WTF) $(CXX) $(CXXFLAGS) -c -o $@ $< %.o: %.pb.cc %.pb.h - wtf $(CXX) $(PBCXXFLAGS) -c -o $@ $< + $(WTF) $(CXX) $(PBCXXFLAGS) -c -o $@ $< %.cc: %.lzz lzz -hx hh -sx cc -hl -sl -hd -sd $< Modified: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/main.lzz 2008-11-30 23:46:31 UTC (rev 1081) @@ -6,13 +6,20 @@ #hdr #include <boost/bind.hpp> #include <boost/foreach.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/scoped_array.hpp> #include <commons/nullptr.h> #include <commons/st/st.h> +#include <csignal> #include <cstdio> #include <cstdlib> #include <iostream> +#include <fstream> #include <map> +#include <set> #include <sstream> +#include <sys/types.h> +#include <unistd.h> #include <vector> #include "ydb.pb.h" #define foreach BOOST_FOREACH @@ -21,11 +28,123 @@ using namespace std; #end -extern int chkpt = 1000; +typedef pair<int, int> pii; + +// Why does just timeout require the `extern`? extern const st_utime_t timeout = 1000000; -extern const bool verbose = false; +const int chkpt = 1000; +const bool verbose = true; +const uint16_t base_port = 7654; +st_intr_bool stop_hub, kill_hub; /** + * The list of threads. + */ +set<st_thread_t> threads; + +class thread_eraser +{ + public: + thread_eraser() { threads.insert(st_thread_self()); } + ~thread_eraser() { threads.erase(st_thread_self()); } +}; + +/** + * Delegate for running thread targets. + * \param[in] f The function to execute. + * \param[in] intr Whether to signal stop_hub on an exception. + */ +void +my_spawn_helper(const function0<void> f, bool intr) +{ + thread_eraser eraser; + try { + f(); + } catch (const exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + if (intr) stop_hub.set(); + } +} + +/** + * Spawn a thread using ST but wrap it in an exception handler that interrupts + * all other threads (hopefully causing them to unwind). + */ +st_thread_t +my_spawn(const function0<void> &f, bool intr = true) +{ + st_thread_t t = st_spawn(bind(my_spawn_helper, f, intr)); + threads.insert(t); + return t; +} + +/** + * Used by the leader to bookkeep information about replicas. + */ +class replica_info +{ + public: + replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} + st_netfd_t fd() const { return fd_; } + /** The port on which the replica is listening. */ + uint16_t port() const { return port_; } +#hdr +#define GETSA sockaddr_in sa; sockaddr(sa); return sa +#end + /** The port on which the replica connected to us. */ + uint16_t local_port() const { GETSA.sin_port; } + uint32_t host() const { GETSA.sin_addr.s_addr; } + sockaddr_in sockaddr() const { GETSA; } + void sockaddr(sockaddr_in &sa) const { + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd_), + reinterpret_cast<struct sockaddr*>(&sa), + &salen)); + } + private: + st_netfd_t fd_; + uint16_t port_; +}; + +/** + * RAII to close all contained netfds. + */ +class st_closing_all +{ + public: + st_closing_all(const vector<replica_info>& rs) : rs_(rs) {} + ~st_closing_all() { + foreach (replica_info r, rs_) + check0x(st_netfd_close(r.fd())); + } + private: + const vector<replica_info> &rs_; +}; + +/** + * RAII for dumping the final state of the DB to a file on disk. + */ +class dump_state +{ + public: + dump_state(const map<int, int> &map, const int &seqno) + : map_(map), seqno_(seqno) {} + ~dump_state() { + stringstream fname; + fname << "/tmp/ydb" << getpid(); + cout << "dumping DB state (" << seqno_ << ") to " << fname.str() << endl; + ofstream of(fname.str().c_str()); + of << "seqno: " << seqno_ << endl; + foreach (const pii &p, map_) { + of << p.first << ": " << p.second << endl; + } + } + private: + const map<int, int> &map_; + const int &seqno_; +}; + +/** * Send a message to some destinations (sequentially). */ template<typename T> @@ -44,7 +163,7 @@ foreach (st_netfd_t dst, dsts) { checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), static_cast<int>(sizeof len)); - checkeqnneg(st_write(dst, buf, s.size(), timeout), + checkeqnneg(st_write(dst, buf, s.size(), ST_UTIME_NO_TIMEOUT), static_cast<int>(s.size())); } } @@ -52,7 +171,7 @@ /** * Send a message to a single recipient. */ -template<typename T> +template<typename T> void sendmsg(st_netfd_t dst, const T &msg) { @@ -65,19 +184,28 @@ */ template <typename T> void -readmsg(st_netfd_t src, T & msg) +readmsg(st_netfd_t src, T & msg, st_utime_t timeout = ST_UTIME_NO_TIMEOUT) { // Read the message length. uint32_t len; checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, - ST_UTIME_NO_TIMEOUT), + timeout), static_cast<int>(sizeof len)); len = ntohl(len); +#define GETMSG(buf) \ + checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); \ + check(msg.ParseFromArray(buf, len)); + // Parse the message body. - char buf[len]; - checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); - check(msg.ParseFromArray(buf, len)); + if (len < 4096) { + char buf[len]; + GETMSG(buf); + } else { + cout << "receiving large msg; heap-allocating " << len << " bytes" << endl; + scoped_array<char> buf(new char[len]); + GETMSG(buf.get()); + } } inline int @@ -90,239 +218,403 @@ * Keep issuing transactions to the replicas. */ void -issue_txns(const vector<st_netfd_t> &replicas) +issue_txns(st_channel<replica_info> &newreps, int &seqno) { Op_OpType types[] = {Op::read, Op::write, Op::del}; - size_t lastsize = replicas.size(); - cout << "replicas = " << &replicas << endl; - int i = 0; - while (true) { - if (replicas.size() != lastsize) { - cout << "size changed from " << lastsize << " to " << replicas.size() - << endl; - lastsize = replicas.size(); + vector<st_netfd_t> fds; + + while (!stop_hub) { + // Did we get a new member? + while (!newreps.empty()) { + if (seqno > 0) { + Txn txn; + bcastmsg(fds, txn); + } + fds.push_back(newreps.take().fd()); } + // Generate a random transaction. Txn txn; + txn.set_seqno(seqno++); int count = rand32(5) + 1; for (int o = 0; o < count; o++) { Op *op = txn.add_op(); - int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); + int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); op->set_type(types[rtype]); op->set_key(rkey); op->set_value(rvalue); } - bcastmsg(replicas, txn); - if (++i % chkpt == 0) { - if (verbose) cout << "issued txn " << i << endl; + + // Broadcast. + bcastmsg(fds, txn); + + // Checkpoint. + if (txn.seqno() % chkpt == 0) { + if (verbose) cout << "issued txn " << txn.seqno() << endl; st_sleep(0); } } } /** - * Keep swallowing replica responses. + * Process a transaction: update DB state (incl. seqno) and send response to + * leader. */ void -handle_responses(st_netfd_t replica) +process_txn(st_netfd_t leader, map<int, int> &map, const Txn &txn, int &seqno) { - int i = 0; - while (true) { - Response res; - readmsg(replica, res); - if (++i % chkpt == 0) { - if (verbose) - cout << "got response " << i << " from " << replica << " of size " - << res.result_size() << endl; - st_sleep(0); + checkeq(txn.seqno(), seqno + 1); + Response res; + res.set_seqno(txn.seqno()); + seqno = txn.seqno(); + for (int o = 0; o < txn.op_size(); o++) { + const Op &op = txn.op(o); + switch (op.type()) { + case Op::read: + res.add_result(map[op.key()]); + break; + case Op::write: + map[op.key()] = op.value(); + break; + case Op::del: + map.erase(op.key()); + break; } } + sendmsg(leader, res); } /** * Actually do the work of executing a transaction and sending back the reply. */ void -process_txns(st_netfd_t leader, map<int, int> &map) +process_txns(st_netfd_t leader, map<int, int> &map, int &seqno, + st_bool &send_state, st_bool &sent_state, + st_channel<Txn*> &backlog) { - int i = 0; while (true) { Txn txn; - readmsg(leader, txn); + //{ + //st_intr intr(stop_hub); + readmsg(leader, txn); + //} - Response res; - for (int o = 0; o < txn.op_size(); o++) { - const Op &op = txn.op(o); - switch (op.type()) { - case Op::read: - res.add_result(map[op.key()]); - break; - case Op::write: - map[op.key()] = op.value(); - break; - case Op::del: - map.erase(op.key()); - break; + if (txn.has_seqno()) { + if (txn.seqno() == seqno + 1) { + process_txn(leader, map, txn, seqno); + } else { + // Queue up for later processing once a snapshot has been received. + backlog.push(new Txn(txn)); } + } else { + // Wait for the snapshot to be generated. + send_state.set(); + cout << "waiting for state to be sent" << endl; + sent_state.waitset(); + sent_state.reset(); } - sendmsg(leader, res); - if (++i % chkpt == 0) { - if (verbose) cout << "processed txn " << i << endl; + if (txn.seqno() % chkpt == 0) { + if (verbose) cout << "processed txn " << txn.seqno() << endl; st_sleep(0); } } } /** + * Keep swallowing replica responses. + */ +void +handle_responses(st_netfd_t replica, const int &seqno) +{ + while (true) { + Response res; + { + st_intr intr(kill_hub); + readmsg(replica, res); + } + if (res.seqno() % chkpt == 0) { + if (verbose) + cout << "got response " << res.seqno() << " from " << replica << endl; + st_sleep(0); + } + if (stop_hub && res.seqno() == seqno - 1) { + cout << "seqno = " << seqno - 1 << endl; + break; + } + } +} + +/** * Help the recovering node. */ void -recover_joiner(st_netfd_t listener, const map<int, int> &map) +recover_joiner(st_netfd_t listener, const map<int, int> &map, const int &seqno, + st_bool &send_state, st_bool &sent_state) { - st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - cout << "got the joiner! " << joiner << endl; + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + st_closing closing(joiner); + cout << "got recoverer's connection" << endl; + + // Wait for the right time to generate the snapshot. + send_state.waitset(); + send_state.reset(); + + cout << "snapshotting state for recovery" << endl; Recovery recovery; - typedef pair<int, int> pii; - foreach (pii p, map) { + foreach (const pii &p, map) { Recovery_Pair *pair = recovery.add_pair(); pair->set_key(p.first); pair->set_value(p.second); } + recovery.set_seqno(seqno); + + // Notify process_txns that it may continue processing. + sent_state.set(); + + cout << "sending recovery" << endl; sendmsg(joiner, recovery); + cout << "sent" << endl; } -int -main(int argc, char **argv) +/** + * Run the leader. + */ +void +run_leader(int nreps) { - check0x(st_init()); - if (argc < 2) - die("leader: ydb <nreplicas>\n" - "replica: ydb <leaderhost> <leaderport> <listenport>\n" - "joiner: ydb <leaderhost> <leaderport>\n"); - bool is_leader = argc == 2; - bool is_joiner = argc == 3; - if (is_leader) { - cout << "starting as leader" << endl; + cout << "starting as leader" << endl; - // Wait until all replicas have joined. - st_netfd_t listener = st_tcp_listen(7654); - vector<st_netfd_t> replicas; - for (int i = 1; i < atoi(argv[1]); i++) { - replicas.push_back(checkpass( - st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT))); + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(base_port); + st_closing close_listener(listener); + // TODO rename these + int min_reps = nreps - 1; + vector<replica_info> replicas; + st_closing_all close_replicas(replicas); + for (int i = 0; i < min_reps; i++) { + st_netfd_t fd; + { + st_intr intr(stop_hub); + fd = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); } + Join join; + readmsg(fd, join); + replicas.push_back(replica_info(fd, static_cast<uint16_t>(join.port()))); + } - // Construct the initialization message. - Init init; - foreach (st_netfd_t r, replicas) { - // Get socket addresses. + // Construct the initialization message. + Init init; + init.set_txnseqno(0); + foreach (replica_info r, replicas) { + SockAddr *psa = init.add_node(); + psa->set_host(r.host()); + psa->set_port(r.port()); + } - sockaddr_in sa; - socklen_t salen = sizeof sa; - check0x(getpeername(st_netfd_fileno(r), reinterpret_cast<sockaddr*>(&sa), &salen)); + // Send init to each initial replica. + foreach (replica_info r, replicas) { + init.set_yourhost(r.host()); + sendmsg(r.fd(), init); + } - SockAddr *psa = init.add_node(); - psa->set_host(sa.sin_addr.s_addr); - psa->set_port(sa.sin_port); - } + // Start dispatching queries. + int seqno = 0; + st_channel<replica_info> newreps; + const function0<void> f = bind(issue_txns, ref(newreps), ref(seqno)); + st_thread_t swallower = my_spawn(bind(swallow, f)); + foreach (const replica_info &r, replicas) newreps.push(r); + st_joining join_swallower(swallower); - bcastmsg(replicas, init); + // Start handling responses. + st_thread_group handlers; + foreach (replica_info r, replicas) { + handlers.insert(my_spawn(bind(handle_responses, r.fd(), ref(seqno)))); + } - // Start dispatching queries. - const function0<void> f = bind(issue_txns, ref(replicas)); - st_thread_t t = st_spawn(bind(swallow, f)); + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner; + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + Join join; + readmsg(joiner, join); + cout << "setting seqno to " << seqno << endl; + init.set_txnseqno(seqno); + sendmsg(joiner, init); - // Start handling responses. - vector<st_thread_t> handlers(replicas.size()); - foreach (st_netfd_t r, replicas) { - handlers.push_back(st_spawn(bind(handle_responses, r))); - } + // Start streaming txns to joiner. + cout << "start streaming txns to joiner" << endl; + replicas.push_back(replica_info(joiner, static_cast<uint16_t>(join.port()))); + newreps.push(replicas.back()); + handlers.insert(my_spawn(bind(handle_responses, joiner, ref(seqno)))); +} - // Accept the recovering node, and tell it about the online replicas. - st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); - sendmsg(joiner, init); +/** + * Run a replica. + */ +void +run_replica(char *leader_host, uint16_t leader_port, uint16_t listen_port) +{ + // Initialize database state. + map<int, int> map; + int seqno = -1; + dump_state ds(map, seqno); + st_bool send_state, sent_state; - // Bring the new guy "back" into action. - Ready ready; - readmsg(joiner, ready); - cout << "the prodigal son has returned" << endl; - cout << "replicas = " << &replicas << endl; - replicas.push_back(joiner); - handlers.push_back(st_spawn(bind(handle_responses, joiner))); + cout << "starting as replica" << endl; - // Wait on other threads. - check0x(st_thread_join(t, nullptr)); + // Listen for connections from other replicas. + st_netfd_t listener = + st_tcp_listen(listen_port); + st_thread_t rec = my_spawn(bind(recover_joiner, listener, ref(map), + ref(seqno), ref(send_state), + ref(sent_state))); - // Cleanly close all connections. - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); + // Connect to the leader and join the system. + st_netfd_t leader = st_tcp_connect(leader_host, leader_port, timeout); + Join join; + join.set_port(listen_port); + sendmsg(leader, join); + Init init; + readmsg(leader, init); + uint32_t listen_host = init.yourhost(); + + // Display the info. + cout << "got init msg with txn seqno " << init.txnseqno() + << " and hosts:" << endl; + vector<st_netfd_t> replicas; + for (uint16_t i = 0; i < init.node_size(); i++) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host = { sa.host() }; + bool is_self = sa.host() == listen_host && sa.port() == listen_port; + cout << "- " << checkerr(inet_ntop(AF_INET, &host, buf, + INET_ADDRSTRLEN)) + << ':' << sa.port() << (is_self ? " (self)" : "") << endl; + if (!is_self) { + replicas.push_back(st_tcp_connect(host, + static_cast<uint16_t>(sa.port()), + timeout)); } - check0x(st_netfd_close(listener)); - } else { - map<int, int> map; + } - // Connect to the leader. - char *host = argv[1]; - uint16_t port = static_cast<uint16_t>(atoi(argv[2])); + // Process txns. + st_channel<Txn*> backlog; + st_thread_t proc = my_spawn(bind(process_txns, leader, ref(map), ref(seqno), + ref(send_state), ref(sent_state), + ref(backlog))); - if (!is_joiner) { - // Listen for then talk to the joiner. - st_netfd_t listener = - st_tcp_listen(static_cast<uint16_t>(atoi(argv[3]))); - st_spawn(bind(recover_joiner, listener, ref(map))); + // If there's anything to recover. + if (init.txnseqno() > 0) { + cout << "waiting for recovery" << endl; + + // Read the recovery message. + Recovery recovery; + foreach (st_netfd_t r, replicas) { + readmsg(r, recovery); } + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + } + assert(seqno == -1 && + static_cast<typeof(seqno)>(recovery.seqno()) > seqno); + seqno = recovery.seqno(); + cout << "recovered." << endl; - st_sleep(0); - cout << "here" << endl; - st_netfd_t leader = st_tcp_connect(host, port, timeout); - cout << "there" << endl; + while (!backlog.empty()) { + Txn *p = backlog.take(); + process_txn(leader, map, *p, seqno); + delete p; + } + cout << "caught up." << endl; + } - // Read the initialization message. - Init init; - readmsg(leader, init); + st_join(proc); + st_join(rec); + foreach (st_netfd_t r, replicas) { + check0x(st_netfd_close(r)); + } + check0x(st_netfd_close(leader)); +} - // Display the info. - cout << "hosts:" << endl; - vector<st_netfd_t> replicas; - for (uint16_t i = 0; i < init.node_size(); i++) { - const SockAddr &sa = init.node(i); - char buf[INET_ADDRSTRLEN]; - in_addr host; - host.s_addr = sa.host(); - cout << checkpass(inet_ntop(AF_INET, &host, buf, INET_ADDRSTRLEN)) - << ':' << sa.port() << endl; - if (is_joiner) - replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(7655+i), - timeout)); - } +int sig_pipe[2]; - if (is_joiner) { - // Read the recovery message. - Recovery recovery; - readmsg(replicas[0], recovery); - for (int i = 0; i < recovery.pair_size(); i++) { - const Recovery_Pair &p = recovery.pair(i); - map[p.key()] = p.value(); +/** + * Raw signal handler that triggers the (synchronous) handler. + */ +void handle_sig(int sig) { + int err = errno; + cerr << "got signal: " << sig << endl; + checkeqnneg(write(sig_pipe[1], &sig, sizeof sig), sizeof sig); + errno = err; +} + +/** + * Synchronous part of the signal handler; cleanly interrrupts any threads that + * have marked themselves as interruptible. + */ +void handle_sig_sync() { + stfd fd = checkerr(st_netfd_open(sig_pipe[0])); + while (true) { + int sig; + checkeqnneg(st_read(fd, &sig, sizeof sig, ST_UTIME_NO_TIMEOUT), + sizeof sig); + if (sig == SIGINT) { + if (!stop_hub) stop_hub.set(); + else kill_hub.set(); + } else if (sig == SIGTERM) { + foreach (st_thread_t t, threads) { + st_thread_interrupt(t); } - cout << "recovered." << endl; - - // Notify the leader. - Ready ready; - sendmsg(leader, ready); } + break; + } +} - // Process txns. - st_thread_t t = st_spawn(bind(process_txns, leader, ref(map))); - check0x(st_thread_join(t, nullptr)); +int +main(int argc, char **argv) +{ + try { + GOOGLE_PROTOBUF_VERIFY_VERSION; - foreach (st_netfd_t r, replicas) { - check0x(st_netfd_close(r)); + check0x(pipe(sig_pipe)); + struct sigaction sa; + sa.sa_handler = handle_sig; + check0x(sigemptyset(&sa.sa_mask)); + sa.sa_flags = 0; + check0x(sigaction(SIGINT, &sa, nullptr)); + + //check0x(st_set_eventsys(ST_EVENTSYS_ALT)); + check0x(st_init()); + thread_eraser eraser; + st_spawn(bind(handle_sig_sync)); + threads.insert(st_thread_self()); + if (argc != 2 && argc != 4) + die("leader: ydb <nreplicas>\n" + "replica: ydb <leaderhost> <leaderport> <listenport>\n"); + bool is_leader = argc == 2; + + if (is_leader) { + run_leader(atoi(argv[1])); + } else { + run_replica(argv[1], + static_cast<uint16_t>(atoi(argv[2])), + static_cast<uint16_t>(atoi(argv[3]))); } - check0x(st_netfd_close(leader)); + + return 0; + } catch (const exception &ex) { + cerr << "thread " << st_thread_self() << ": " << ex.what() << endl; + return 1; } - - return 0; } Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/ydb.proto 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,10 +1,34 @@ +option optimize_for = SPEED; + +// A socket address (host:port). message SockAddr { - required int32 host = 1; - required int32 port = 2; + required uint32 host = 1; + required uint32 port = 2; } + +// Join request sent from nodes to leader. +message Join { + // The port on which the joining replica will listen for connections. + required uint32 port = 1; +} + +// Initialization message sent to a nodes when it joins. message Init { - repeated SockAddr node = 1; + // The current seqno that the server is on. + required uint32 txnseqno = 1; + // What the leader perceives to be the joining replica's IP address. + required uint32 yourhost = 2; + // The nodes that have joined (including the joining node); the ports here + // are the ports on which the nodes are listening. + repeated SockAddr node = 3; } + +// Sent to already-joined nodes to inform them of a newly joining node. +message Joined { + required SockAddr node = 1; +} + +// A single operation in a transaction. message Op { enum OpType { read = 0; @@ -15,19 +39,35 @@ required int32 key = 2; optional int32 value = 3; } + +// A transaction. Currently just a simple sequence of Ops. message Txn { - repeated Op op = 1; + optional uint32 seqno = 1; + repeated Op op = 2; } + +// Response to a transaction, containing a list of results. message Response { - repeated int32 result = 1; + // The txn that this is a response for. + required uint32 seqno = 1; + // The list of answers to read operations. + repeated int32 result = 2; } -message Ready { - optional int32 ready = 1; -} + +// Message from a running node to a joining node to bring it up to speed. message Recovery { message Pair { required int32 key = 1; required int32 value = 2; } - repeated Pair pair = 1; + // The seqno that this recovery message will bring us up through (the last + // txn seqno before the snapshot was generated). + required uint32 seqno = 1; + // The data map. + repeated Pair pair = 2; } + +// Message from a joining node to the leader to inform it that it is fully back +// into action. +message Ready { +} Deleted: ydb/trunk/src/ydb.thrift =================================================================== --- ydb/trunk/src/ydb.thrift 2008-11-30 23:45:26 UTC (rev 1080) +++ ydb/trunk/src/ydb.thrift 2008-11-30 23:46:31 UTC (rev 1081) @@ -1,9 +0,0 @@ -enum op_type { read, write, rm } -struct sock_addr { i32 host, i16 port } -struct init { list<sock_addr> node } -struct op { op_type type, i32 key, optional i32 value } -struct txn { list<op> op } -struct response { list<i32> results } -struct ready {} -struct pair { i32 key, i32 value } -struct recovery { list<pair> pairs } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-11-30 23:45:29
|
Revision: 1080 http://assorted.svn.sourceforge.net/assorted/?rev=1080&view=rev Author: yangzhang Date: 2008-11-30 23:45:26 +0000 (Sun, 30 Nov 2008) Log Message: ----------- added smv, zcc, zpp Modified Paths: -------------- shell-tools/trunk/src/bash-commons/bashrc.bash shell-tools/trunk/src/bash-commons/common.bash Modified: shell-tools/trunk/src/bash-commons/bashrc.bash =================================================================== --- shell-tools/trunk/src/bash-commons/bashrc.bash 2008-11-30 23:44:58 UTC (rev 1079) +++ shell-tools/trunk/src/bash-commons/bashrc.bash 2008-11-30 23:45:26 UTC (rev 1080) @@ -629,6 +629,18 @@ . ~/.bashrc } +zcc() { + local file="$1" + shift + gcc -Wall -g3 -o "${file%.*}" "$file" "$@" +} + +zpp() { + local file="$1" + shift + wtf g++ -Wall -g3 -o "${file%.*}" "$file" "$@" +} + #function set_title() { # if [ $# -eq 0 ] ; then # eval set -- "$PWD" Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2008-11-30 23:44:58 UTC (rev 1079) +++ shell-tools/trunk/src/bash-commons/common.bash 2008-11-30 23:45:26 UTC (rev 1080) @@ -162,6 +162,12 @@ done } +smv() { + local from="$1" to="$2" + scp "$@" + trace rm "$from" +} + # squeeze whitespace sq() { tr -s '[:space:]' ' ' This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-11-30 23:45:01
|
Revision: 1079 http://assorted.svn.sourceforge.net/assorted/?rev=1079&view=rev Author: yangzhang Date: 2008-11-30 23:44:58 +0000 (Sun, 30 Nov 2008) Log Message: ----------- mkdir mountpt first; fixed target path of webfiles that are directories Modified Paths: -------------- shell-tools/trunk/src/bash-commons/assorted.bash Modified: shell-tools/trunk/src/bash-commons/assorted.bash =================================================================== --- shell-tools/trunk/src/bash-commons/assorted.bash 2008-11-30 23:44:06 UTC (rev 1078) +++ shell-tools/trunk/src/bash-commons/assorted.bash 2008-11-30 23:44:58 UTC (rev 1079) @@ -180,8 +180,11 @@ if [[ "${inp%:*}" == "$inp" ]] ; then cp -r "$inp" "$stagedir/" else - mkdir -p "$stagedir/${inp%:*}" - cp -r $( eval ls -d "${inp#*:}" ) "$stagedir/${inp%:*}" + local dst="${inp%:*}" src="${inp#*:}" + if [[ "${dst%/}" != "$dst" ]] + then mkdir -p "$stagedir/${inp%:*}" + fi + cp -r "$src" "$stagedir/$dst" fi done fi @@ -210,8 +213,9 @@ stage # Mount sshfs if the not already mounted. - if ! mount | fgrep assorted: | fgrep "$mountpt" > /dev/null - then sshfs assorted: "$mountpt" + if ! mount | fgrep assorted: | fgrep "$mountpt" > /dev/null ; then + mkdir -p "$mountpt" + sshfs assorted: "$mountpt" fi # Wipe out/start from scratch? This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-11-30 23:44:08
|
Revision: 1078 http://assorted.svn.sourceforge.net/assorted/?rev=1078&view=rev Author: yangzhang Date: 2008-11-30 23:44:06 +0000 (Sun, 30 Nov 2008) Log Message: ----------- fixed doxygen bug Modified Paths: -------------- cpp-commons/trunk/src/commons/pool.h Modified: cpp-commons/trunk/src/commons/pool.h =================================================================== --- cpp-commons/trunk/src/commons/pool.h 2008-11-30 21:34:08 UTC (rev 1077) +++ cpp-commons/trunk/src/commons/pool.h 2008-11-30 23:44:06 UTC (rev 1078) @@ -38,7 +38,7 @@ /** * Release an item back into the pool. This doesn't check that the pointer * being released was one that was originally taken from this pool. - * \param[in] The pointer to release. + * \param[in] p The pointer to release. * \throw exception The pool is full. */ void drop(T *p) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |