[Assorted-commits] SF.net SVN: assorted:[1074] ydb
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-11-21 01:39:37
|
Revision: 1074 http://assorted.svn.sourceforge.net/assorted/?rev=1074&view=rev Author: yangzhang Date: 2008-11-21 01:37:44 +0000 (Fri, 21 Nov 2008) Log Message: ----------- initial commit of ydb! Added Paths: ----------- ydb/ ydb/trunk/ ydb/trunk/src/ ydb/trunk/src/Makefile ydb/trunk/src/main.lzz ydb/trunk/src/ydb.proto ydb/trunk/src/ydb.thrift Added: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile (rev 0) +++ ydb/trunk/src/Makefile 2008-11-21 01:37:44 UTC (rev 1074) @@ -0,0 +1,59 @@ +TARGET := ydb + +LZZS := $(wildcard *.lzz) +LZZHDRS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.hh,$(lzz))) +LZZSRCS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.cc,$(lzz))) +LZZOBJS := $(foreach lzz,$(LZZS),$(patsubst %.lzz,%.o,$(lzz))) + +PBS := $(wildcard *.proto) +PBHDRS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.h,$(pb))) +PBSRCS := $(foreach pb,$(PBS),$(patsubst %.proto,%.pb.cc,$(pb))) +PBOBJS := $(foreach pb,$(PBS),$(patsubst %.proto,%.o,$(pb))) + +GENHDRS := $(LZZHDRS) $(PBHDRS) +GENSRCS := $(LZZSRCS) $(PBSRCS) +GENOBJS := $(LZZOBJS) $(PBOBJS) + +HDRS := $(GENHDRS) +SRCS := $(GENSRCS) +OBJS := $(GENOBJS) + +LDFLAGS := -lstx -lst -lresolv -lpthread -lprotobuf +CXXFLAGS := -g3 -Wall -Werror -Wextra -Woverloaded-virtual -Wconversion \ + -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ + -Winit-self -Wno-sign-compare -Wno-unused-parameter -Wc++0x-compat \ + -Wparentheses +PBCXXFLAGS := -g3 -Wall -Werror + +all: $(TARGET) + +$(TARGET): $(OBJS) + $(CXX) -o $@ $^ $(LDFLAGS) + +%.o: %.cc $(PBHDRS) + wtf $(CXX) $(CXXFLAGS) -c -o $@ $< + +%.o: %.pb.cc %.pb.h + wtf $(CXX) $(PBCXXFLAGS) -c -o $@ $< + +%.cc: %.lzz + lzz -hx hh -sx cc -hl -sl -hd -sd $< + +%.hh: %.lzz + lzz -hx hh -sx cc -hl -sl -hd -sd $< + +%.pb.cc: %.proto + protoc --cpp_out=. $< + +%.pb.h: %.proto + protoc --cpp_out=. $< + +clean: + rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) + +doc: $(SRCS) $(HDRS) + doxygen + +.PHONY: clean + +.SECONDARY: $(SRCS) $(HDRS) $(OBJS) Added: ydb/trunk/src/main.lzz =================================================================== --- ydb/trunk/src/main.lzz (rev 0) +++ ydb/trunk/src/main.lzz 2008-11-21 01:37:44 UTC (rev 1074) @@ -0,0 +1,328 @@ +// TODO +// - how does replication affect overhead? +// - implement other recovery schemes (disk-based) +// - verify correctness of the simple recovery scheme + +#hdr +#include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <commons/nullptr.h> +#include <commons/st/st.h> +#include <cstdio> +#include <cstdlib> +#include <iostream> +#include <map> +#include <sstream> +#include <vector> +#include "ydb.pb.h" +#define foreach BOOST_FOREACH +using namespace boost; +using namespace commons; +using namespace std; +#end + +extern int chkpt = 1000; +extern const st_utime_t timeout = 1000000; +extern const bool verbose = false; + +/** + * Send a message to some destinations (sequentially). + */ +template<typename T> +void +bcastmsg(const vector<st_netfd_t> &dsts, const T & msg) +{ + // Serialize message to a buffer. + string s; + check(msg.SerializeToString(&s)); + const char *buf = s.c_str(); + + // Prefix the message with a four-byte length. + uint32_t len = htonl(static_cast<uint32_t>(s.size())); + + // Broadcast the length-prefixed message to replicas. + foreach (st_netfd_t dst, dsts) { + checkeqnneg(st_write(dst, static_cast<void*>(&len), sizeof len, timeout), + static_cast<int>(sizeof len)); + checkeqnneg(st_write(dst, buf, s.size(), timeout), + static_cast<int>(s.size())); + } +} + +/** + * Send a message to a single recipient. + */ +template<typename T> +void +sendmsg(st_netfd_t dst, const T &msg) +{ + vector<st_netfd_t> dsts(1, dst); + bcastmsg(dsts, msg); +} + +/** + * Read a message. + */ +template <typename T> +void +readmsg(st_netfd_t src, T & msg) +{ + // Read the message length. + uint32_t len; + checkeqnneg(st_read_fully(src, static_cast<void*>(&len), sizeof len, + ST_UTIME_NO_TIMEOUT), + static_cast<int>(sizeof len)); + len = ntohl(len); + + // Parse the message body. + char buf[len]; + checkeqnneg(st_read_fully(src, buf, len, timeout), (int) len); + check(msg.ParseFromArray(buf, len)); +} + +inline int +rand32(int max = RAND_MAX) +{ + return static_cast<int>( random() / ( RAND_MAX / max ) ); +} + +/** + * Keep issuing transactions to the replicas. + */ +void +issue_txns(const vector<st_netfd_t> &replicas) +{ + Op_OpType types[] = {Op::read, Op::write, Op::del}; + size_t lastsize = replicas.size(); + cout << "replicas = " << &replicas << endl; + int i = 0; + while (true) { + if (replicas.size() != lastsize) { + cout << "size changed from " << lastsize << " to " << replicas.size() + << endl; + lastsize = replicas.size(); + } + // Generate a random transaction. + Txn txn; + int count = rand32(5) + 1; + for (int o = 0; o < count; o++) { + Op *op = txn.add_op(); + int rtype = rand32(3), rkey = rand32(), rvalue = rand32(); + op->set_type(types[rtype]); + op->set_key(rkey); + op->set_value(rvalue); + } + bcastmsg(replicas, txn); + if (++i % chkpt == 0) { + if (verbose) cout << "issued txn " << i << endl; + st_sleep(0); + } + } +} + +/** + * Keep swallowing replica responses. + */ +void +handle_responses(st_netfd_t replica) +{ + int i = 0; + while (true) { + Response res; + readmsg(replica, res); + if (++i % chkpt == 0) { + if (verbose) + cout << "got response " << i << " from " << replica << " of size " + << res.result_size() << endl; + st_sleep(0); + } + } +} + +/** + * Actually do the work of executing a transaction and sending back the reply. + */ +void +process_txns(st_netfd_t leader, map<int, int> &map) +{ + int i = 0; + while (true) { + Txn txn; + readmsg(leader, txn); + + Response res; + for (int o = 0; o < txn.op_size(); o++) { + const Op &op = txn.op(o); + switch (op.type()) { + case Op::read: + res.add_result(map[op.key()]); + break; + case Op::write: + map[op.key()] = op.value(); + break; + case Op::del: + map.erase(op.key()); + break; + } + } + sendmsg(leader, res); + + if (++i % chkpt == 0) { + if (verbose) cout << "processed txn " << i << endl; + st_sleep(0); + } + } +} + +/** + * Help the recovering node. + */ +void +recover_joiner(st_netfd_t listener, const map<int, int> &map) +{ + st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + cout << "got the joiner! " << joiner << endl; + Recovery recovery; + typedef pair<int, int> pii; + foreach (pii p, map) { + Recovery_Pair *pair = recovery.add_pair(); + pair->set_key(p.first); + pair->set_value(p.second); + } + sendmsg(joiner, recovery); +} + +int +main(int argc, char **argv) +{ + check0x(st_init()); + if (argc < 2) + die("leader: ydb <nreplicas>\n" + "replica: ydb <leaderhost> <leaderport> <listenport>\n" + "joiner: ydb <leaderhost> <leaderport>\n"); + bool is_leader = argc == 2; + bool is_joiner = argc == 3; + if (is_leader) { + cout << "starting as leader" << endl; + + // Wait until all replicas have joined. + st_netfd_t listener = st_tcp_listen(7654); + vector<st_netfd_t> replicas; + for (int i = 1; i < atoi(argv[1]); i++) { + replicas.push_back(checkpass( + st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT))); + } + + // Construct the initialization message. + Init init; + foreach (st_netfd_t r, replicas) { + // Get socket addresses. + + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(r), reinterpret_cast<sockaddr*>(&sa), &salen)); + + SockAddr *psa = init.add_node(); + psa->set_host(sa.sin_addr.s_addr); + psa->set_port(sa.sin_port); + } + + bcastmsg(replicas, init); + + // Start dispatching queries. + const function0<void> f = bind(issue_txns, ref(replicas)); + st_thread_t t = st_spawn(bind(swallow, f)); + + // Start handling responses. + vector<st_thread_t> handlers(replicas.size()); + foreach (st_netfd_t r, replicas) { + handlers.push_back(st_spawn(bind(handle_responses, r))); + } + + // Accept the recovering node, and tell it about the online replicas. + st_netfd_t joiner = checkpass(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + sendmsg(joiner, init); + + // Bring the new guy "back" into action. + Ready ready; + readmsg(joiner, ready); + cout << "the prodigal son has returned" << endl; + cout << "replicas = " << &replicas << endl; + replicas.push_back(joiner); + handlers.push_back(st_spawn(bind(handle_responses, joiner))); + + // Wait on other threads. + check0x(st_thread_join(t, nullptr)); + + // Cleanly close all connections. + foreach (st_netfd_t r, replicas) { + check0x(st_netfd_close(r)); + } + check0x(st_netfd_close(listener)); + } else { + map<int, int> map; + + // Connect to the leader. + char *host = argv[1]; + uint16_t port = static_cast<uint16_t>(atoi(argv[2])); + + if (!is_joiner) { + // Listen for then talk to the joiner. + st_netfd_t listener = + st_tcp_listen(static_cast<uint16_t>(atoi(argv[3]))); + st_spawn(bind(recover_joiner, listener, ref(map))); + } + + st_sleep(0); + cout << "here" << endl; + st_netfd_t leader = st_tcp_connect(host, port, timeout); + cout << "there" << endl; + + // Read the initialization message. + Init init; + readmsg(leader, init); + + // Display the info. + cout << "hosts:" << endl; + vector<st_netfd_t> replicas; + for (uint16_t i = 0; i < init.node_size(); i++) { + const SockAddr &sa = init.node(i); + char buf[INET_ADDRSTRLEN]; + in_addr host; + host.s_addr = sa.host(); + cout << checkpass(inet_ntop(AF_INET, &host, buf, INET_ADDRSTRLEN)) + << ':' << sa.port() << endl; + if (is_joiner) + replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(7655+i), + timeout)); + } + + if (is_joiner) { + // Read the recovery message. + Recovery recovery; + readmsg(replicas[0], recovery); + for (int i = 0; i < recovery.pair_size(); i++) { + const Recovery_Pair &p = recovery.pair(i); + map[p.key()] = p.value(); + } + cout << "recovered." << endl; + + // Notify the leader. + Ready ready; + sendmsg(leader, ready); + } + + // Process txns. + st_thread_t t = st_spawn(bind(process_txns, leader, ref(map))); + check0x(st_thread_join(t, nullptr)); + + foreach (st_netfd_t r, replicas) { + check0x(st_netfd_close(r)); + } + check0x(st_netfd_close(leader)); + } + + return 0; +} Added: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto (rev 0) +++ ydb/trunk/src/ydb.proto 2008-11-21 01:37:44 UTC (rev 1074) @@ -0,0 +1,33 @@ +message SockAddr { + required int32 host = 1; + required int32 port = 2; +} +message Init { + repeated SockAddr node = 1; +} +message Op { + enum OpType { + read = 0; + write = 1; + del = 2; + } + required OpType type = 1; + required int32 key = 2; + optional int32 value = 3; +} +message Txn { + repeated Op op = 1; +} +message Response { + repeated int32 result = 1; +} +message Ready { + optional int32 ready = 1; +} +message Recovery { + message Pair { + required int32 key = 1; + required int32 value = 2; + } + repeated Pair pair = 1; +} Added: ydb/trunk/src/ydb.thrift =================================================================== --- ydb/trunk/src/ydb.thrift (rev 0) +++ ydb/trunk/src/ydb.thrift 2008-11-21 01:37:44 UTC (rev 1074) @@ -0,0 +1,9 @@ +enum op_type { read, write, rm } +struct sock_addr { i32 host, i16 port } +struct init { list<sock_addr> node } +struct op { op_type type, i32 key, optional i32 value } +struct txn { list<op> op } +struct response { list<i32> results } +struct ready {} +struct pair { i32 key, i32 value } +struct recovery { list<pair> pairs } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |