Thread: [Assorted-commits] SF.net SVN: assorted:[1277] ydb/trunk (Page 2)
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-09 23:00:13
|
Revision: 1277 http://assorted.svn.sourceforge.net/assorted/?rev=1277&view=rev Author: yangzhang Date: 2009-03-09 22:59:57 +0000 (Mon, 09 Mar 2009) Log Message: ----------- - fast memcpy recovery serialization with direct access to fast_map table - using recovery_t = array<entry> instead of Recovery - using c++0x unique_ptr/move instead of c++03/boost - using raw_reader/raw_writer - added line-counts to test.bash - adde/updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/README 2009-03-09 22:59:57 UTC (rev 1277) @@ -518,18 +518,16 @@ containers - built something really fast, faster than even google dense_hash_map -- TODO experiment with large pages +- DONE use rb instead of pb for recovery state -- TODO use rb instead of pb for recovery state - - TODO test out recovery mode more thoroughly, make sure progress is being made, see how fast it is -- TODO fix multi-recovery if necessary +- DONE fix multi-recovery if necessary -- TODO speed up map dump; don't use range partitioning, but hash partitioning +- DONE speed up map dump; don't use range partitioning, but hash partitioning -- TODO refactor st_reader, etc. to be generic opportunistic buffered readers +- DONE refactor st_reader, etc. to be generic opportunistic buffered readers - TODO see how streambuf read/write is actually implemented (whether it's too slow) - TODO try making a streambuf for st_write, then try it in conj with @@ -538,12 +536,12 @@ - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) -- TODO batch up the responses until they make large-enough buffer in pb mode - - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore +- TODO experiment with libhugetlbfs + - TODO remove extraneous copies; use custom buffer-backed data structures designed for serialization/deserialization - TODO flushing Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/src/main.lzz.clamp 2009-03-09 22:59:57 UTC (rev 1277) @@ -8,12 +8,13 @@ #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> #include <boost/tuple/tuple.hpp> -#include <boost/unique_ptr.hpp> #include <commons/fast_map.h> +#include <commons/memory.h> #include <commons/nullptr.h> #include <commons/rand.h> #include <commons/st/st.h> #include <commons/time.h> +#include <commons/unique_ptr.h> #include <csignal> // sigaction etc. #include <cstdio> #include <cstring> // strsignal @@ -57,10 +58,13 @@ //#define map_t dense_hash_map #define map_t fast_map typedef map_t<int, int> mii; -typedef pair<int, int> pii; +typedef mii::value_type entry; typedef tuple<sized_array<char>, char*, char*> chunk; +//typedef unique_ptr<Recovery> recovery_t; +typedef commons::array<char> recovery_t; + template<typename T> void init_map(T &map) {} template<> void init_map(dense_hash_map<int, int> &map) { map.set_empty_key(-1); @@ -637,7 +641,7 @@ // Generate some random transactions. start_txn(batch); - for (int t = 0; t < batch_size; ++t) { + for (int t = 0; t < batch_size && !stop_hub; ++t) { char *txn_start = w.cur(); Txn &txn = *batch.add_txn(); txn.set_seqno(seqno); @@ -856,7 +860,7 @@ template<typename Types, typename RTypes> void process_txns(st_netfd_t leader, mii &map, int &seqno, - st_channel<unique_ptr<Recovery> > &send_states, + st_channel<recovery_t> &send_states, /* XXX st_channel<shared_ptr<pb::Txn> > &backlog */ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) @@ -896,7 +900,7 @@ showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), __ref(seqno_caught_up)); } - __ref(send_states).push(unique_ptr<Recovery>()); + __ref(send_states).push(recovery_t()); __ref(w).mark_and_flush(); st_sleep(1); }); @@ -971,9 +975,10 @@ if (reader.unread() + reader.rem() < prefix + sizeof(uint32_t) - headerlen) { // Move current partial message to new buffer. sized_array<char> tmp(new char[read_buf_size], read_buf_size); - *reinterpret_cast<uint32_t*>(tmp.get()) = prefix; - *reinterpret_cast<short*>(tmp.get() + sizeof(uint32_t)) = short(batch.txn_size()); - *reinterpret_cast<int*>(tmp.get() + sizeof(uint32_t) + sizeof(short)) = first_txn.seqno(); + raw_writer ser(tmp.get()); + ser.write(prefix); + ser.write(short(batch.txn_size())); + ser.write(first_txn.seqno()); memcpy(tmp.get() + headerlen, reader.start(), reader.unread()); // Swap the buffers. @@ -1028,9 +1033,7 @@ } } else if (multirecover || mypos == 0) { // Empty (default) TxnBatch means "generate a snapshot." - unique_ptr<Recovery> recovery(make_recovery(map, mypos, nnodes)); - recovery->set_seqno(seqno); - send_states.push(boost::move(recovery)); + send_states.push(make_recovery(map, mypos, nnodes, seqno)); } } } catch (break_exception &ex) { @@ -1038,8 +1041,11 @@ } +#if 0 template<typename mii> -unique_ptr<Recovery> make_recovery(const mii &map, int mypos, int nnodes) { +unique_ptr<Recovery> +make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +{ // TODO make this faster cout << "generating recovery..." << endl; unique_ptr<Recovery> recovery(new Recovery); @@ -1062,9 +1068,55 @@ } cout << "generating recovery took " << current_time_millis() - start_snap << " ms" << endl; - return boost::move(recovery); + recovery->set_seqno(seqno); + return move(recovery); } +#endif +template<typename mii> +recovery_t +make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +{ + return recovery_t(); +} + +struct recovery_header +{ + int seqno; + size_t count; + size_t total; + size_t size; +}; + +pair<size_t, size_t> +recovery_range(size_t size, int mypos, int nnodes) +{ + return make_pair(multirecover ? size * mypos / size_t(nnodes) : 0, + multirecover ? size * (mypos + 1) / size_t(nnodes) : size); +} + +template<> +recovery_t +make_recovery(const fast_map<int, int> &map, int mypos, int nnodes, int &seqno) +{ + const commons::array<entry> &src = map.get_table(); + pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); + size_t begin = range.first, end = range.second; + assert(end > begin); + recovery_header hdr = { seqno, end - begin, src.size(), map.size() }; + size_t bodylen = sizeof(entry) * hdr.count; + cout << "generating recovery of " << hdr.size << " records in " + << hdr.count << " slots (" + << bodylen << " bytes); range is [" + << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; + commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); + raw_writer ser(recovery.begin()); + ser.write(recovery.size()); + ser.write(hdr); + memcpy(ser.ptr(), src.begin() + begin, bodylen); + return recovery; +} + class response_handler { public: @@ -1254,10 +1306,10 @@ */ void recover_joiner(st_netfd_t listener, - st_channel<unique_ptr<Recovery> > &send_states) + st_channel<recovery_t> &send_states) { st_netfd_t joiner; - unique_ptr<Recovery> recovery; + recovery_t recovery; { st_intr intr(stop_hub); // Wait for the snapshot. @@ -1272,9 +1324,16 @@ st_closing closing(joiner); cout << "got joiner's connection, sending recovery of " - << recovery->pair_size() << " records" << endl; + << recovery.size() << " bytes" << endl; + long long start_time = current_time_millis(); + checkeqnneg(st_write(joiner, recovery.get(), recovery.size(), + ST_UTIME_NO_TIMEOUT), + ssize_t(recovery.size())); + long long diff = current_time_millis() - start_time; +#if 0 sendmsg(joiner, *recovery); - cout << "sent recovery" << endl; +#endif + cout << "sent recovery in " << diff << " ms" << endl; } void @@ -1353,7 +1412,7 @@ cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, g_map) { + foreach (const entry &p, g_map) { of << p.first << ": " << p.second << endl; } } @@ -1432,12 +1491,12 @@ cout << "- dumping to " << fname << endl; ofstream of(fname.c_str()); of << "seqno: " << __ref(seqno) << endl; - foreach (const pii &p, __ref(map)) { + foreach (const entry &p, __ref(map)) { of << p.first << ": " << p.second << endl; } } }); - st_channel<unique_ptr<Recovery> > send_states; + st_channel<recovery_t> send_states; cout << "starting as replica on port " << listen_port << endl; @@ -1502,9 +1561,61 @@ vector<st_thread_t> recovery_builders; assert(seqno == -1); + bool first = true; for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message. + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + + long long tm = current_time_millis(); + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + cout << "got recovery message of " << len << " bytes (" + << hdr.size << " records in " << hdr.count << " slots) in " + << tm - __ref(before_recv) << " ms; now at seqno " + << hdr.seqno << endl; +#if 0 Recovery recovery; long long receive_start = 0, receive_end = 0; size_t len = 0; @@ -1533,6 +1644,7 @@ << " ms; built up map of " << recovery.pair_size() << " records in " << build_end - build_start << " ms; now at seqno " << seqno << endl; +#endif }, "recovery_builder" + lexical_cast<string>(i))); } foreach (st_thread_t t, recovery_builders) { Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-09 22:59:33 UTC (rev 1276) +++ ydb/trunk/tools/test.bash 2009-03-09 22:59:57 UTC (rev 1277) @@ -527,6 +527,11 @@ hostargs p2-helper } +line-counts() { + wc -l "$(dirname "$0")/../src/"{main.lzz.clamp,ser.{h,cc},p2.cc,ydb.proto,Makefile,serperf.cc} \ + ~/ccom/src/{commons/{,st/}*.h,test/{*.*,Makefile}} +} + # # Main # This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-10 23:32:32
|
Revision: 1279 http://assorted.svn.sourceforge.net/assorted/?rev=1279&view=rev Author: yangzhang Date: 2009-03-10 23:32:14 +0000 (Tue, 10 Mar 2009) Log Message: ----------- - fixed optimized build issues with unused variables - fixed bug: issue_txns not updating seqno correctly on final batch - renamed methods of and reinstated pwal - removed unnecessary sleep at end of process_txns - relabeled messages to make explicit their approximating nature - added clamp to setup-deps - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/README 2009-03-10 23:32:14 UTC (rev 1279) @@ -536,6 +536,15 @@ - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) +- DONE figure out why i'm not getting all the responses + - because issue_txns was not incrementing seqno correctly on the last batch + - fixed + - because start_seqno in handle_responses was not actually being initialized + to the correct starting seqno (there is some delay between the time the + issue_txns begins issuing and the time the response_handler thread starts) + - decided to leave this as-is; getting an accurate number is too much effort + - relabeled the messages to be "roughly" + - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/src/main.lzz.clamp 2009-03-10 23:32:14 UTC (rev 1279) @@ -40,6 +40,7 @@ #define ref boost::ref #define tuple boost::tuple #define make_tuple boost::make_tuple +#define unused __attribute__((unused)) using namespace boost; using namespace boost::archive; @@ -548,15 +549,15 @@ void logbuf(const void *buf, size_t len) { of.write(reinterpret_cast<const char*>(buf), len); } - void del(int key) { + void logdel(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; } - void write(int key, int val) { + void logwrite(int key, int val) { int op = op_write; out & op & key & val; } - void commit() { + void logcommit() { int op = op_commit; out & op; } @@ -691,7 +692,6 @@ if (seqno == stop_on_seqno) { cout << "stopping on issue of seqno " << seqno << endl; stop_hub.set(); - break; } ++seqno; @@ -776,14 +776,14 @@ case Op::write: { int value = op.value(); - if (use_pwal) g_wal->write(key, value); + if (use_pwal) g_wal->logwrite(key, value); if (it == map.end()) map[key] = value; else it->second = value; break; } case Op::del: if (it != map.end()) { - if (use_pwal) g_wal->del(key); + if (use_pwal) g_wal->logdel(key); map.erase(it); } break; @@ -792,7 +792,7 @@ } if (res != nullptr) fin_result(*res); - if (use_pwal) g_wal->commit(); + if (use_pwal) g_wal->logcommit(); } void @@ -902,7 +902,6 @@ } __ref(send_states).push(recovery_t()); __ref(w).mark_and_flush(); - st_sleep(1); }); try { @@ -1253,7 +1252,7 @@ recovery_end_time = sub.take(); recovery_end_seqno = seqno; cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, + showtput("during recovery, finished roughly", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); } } @@ -1261,7 +1260,7 @@ void cleanup() { long long end_time = current_time_millis(); cout << rid << ": "; - showtput("handled", end_time, start_time, seqno, start_seqno); + showtput("handled roughly", end_time, start_time, seqno, start_seqno); if (recovery_end_time > -1) { cout << rid << ": "; showtput("after recovery, finished", end_time, recovery_end_time, @@ -1672,8 +1671,8 @@ swap(buf, reader.buf()); reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = reader.read<uint32_t>(); + unused char *start = reader.start(); + unused uint32_t prefix = reader.read<uint32_t>(); assert(prefix < 10000); assert(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/tools/test.bash 2009-03-10 23:32:14 UTC (rev 1279) @@ -214,6 +214,7 @@ /tmp/st-1.8.tar.gz \ /tmp/protobuf-2.0.2.tar.bz2 \ /tmp/boost_1_37_0.tar.bz2 \ + /tmp/clamp_053_src.tar.gz \ clamp.patch \ ^:/tmp/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-16 22:32:32
|
Revision: 1294 http://assorted.svn.sourceforge.net/assorted/?rev=1294&view=rev Author: yangzhang Date: 2009-03-16 22:32:07 +0000 (Mon, 16 Mar 2009) Log Message: ----------- started integrating tpcc txn processing into ydb; currently only handles new order txns and no recovery Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/README 2009-03-16 22:32:07 UTC (rev 1294) @@ -684,8 +684,10 @@ - stock 100k 306 - item 100k 82 -- TODO try integrating TPC-C +- TODO try integrating TPC-C; just get txns working for now +- TODO implement TPC-C recovery + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/main.lzz.clamp 2009-03-16 22:32:07 UTC (rev 1294) @@ -93,13 +93,14 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, + use_pb, use_pb_res, g_caught_up, rec_pwal, do_tpcc, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; st_bool do_pause; +bool stopped_issuing; // Statistics. int updates; @@ -326,6 +327,17 @@ //typedef string ser_t; typedef ser_array ser_t; +template<typename T> +void +ser(writer &w, const T &msg) +{ + uint32_t len = msg.ByteSize(); + w.mark(); + w.reserve(len); + check(msg.SerializeToArray(w.cur(), len)); + w.skip(len); +} + /** * Serialization. * @@ -564,6 +576,14 @@ check(msg.ParseFromArray(src.read(len), len)); } +template<typename T> +inline void +readmsg(anchored_stream_reader &src, T &msg) +{ + uint32_t len = ntohl(src.read<uint32_t>()); + check(msg.ParseFromArray(src.read(len), len)); +} + enum { op_del, op_write, op_commit }; /** @@ -885,6 +905,145 @@ } #end +unique_ptr<TPCCTables> g_tables; + +void +process_tpcc(const TpccReq &req, int &seqno, TpccRes *res) +{ + checkeq(req.seqno(), seqno + 1); + ++seqno; + if (req.has_new_order()) { + const NewOrderMsg &no = req.new_order(); + vector<NewOrderItem> items(no.item_size()); + for (int i = 0; i < no.item_size(); ++i) { + NewOrderItem &dst = items[i]; + const NewOrderItemMsg &src = no.item(i); + dst.i_id = src.i_id(); + dst.ol_supply_w_id = src.ol_supply_w_id(); + dst.ol_quantity = src.ol_quantity(); + } + NewOrderOutput output; + g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), + items, no.now().c_str(), &output); + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); + } else { + throw_not_implemented(); + } +} + +void +process_tpccs(st_netfd_t leader, int &seqno, + st_channel<recovery_t> &send_states, + st_channel<chunk> &backlog, int init_seqno, + int mypos, int nnodes) +{ + bool caught_up = init_seqno == 0; + long long start_time = current_time_millis(), + time_caught_up = caught_up ? start_time : -1; + int seqno_caught_up = caught_up ? seqno : -1; + // Used by joiner only to tell where we actually started (init_seqno is just + // the seqno reported by the leader in the Init message, but it may have + // issued more since the Init message). + int first_seqno = -1; + + function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { + sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); + memcpy(buf.get(), reader.start(), reader.unread()); + swap(buf, reader.buf()); + __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); + reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); + }; + + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + commons::array<char> wbuf(buf_size); + anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), + overflow_fn, rbuf.get(), rbuf.size()); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, wbuf.get(), wbuf.size()); + + finally f(lambda () { + long long now = current_time_millis(); + stopped_issuing = true; + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(recovery_t()); + __ref(w).mark_and_flush(); + }); + + TpccReq req; + TpccRes res; + + while (true) + { + { + st_intr intr(stop_hub); + readmsg(reader, req); + } + + if (req.seqno() == -1) { + // End of stream. + break; + } else if (req.seqno() == -2) { + // Prepare recovery msg. + send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); + } else { + // Backlog (auto-implicit) or process. + if (check_interval(req.seqno(), process_display)) + cout << "processing req " << req.seqno() << endl; + if (req.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + // Process. + process_tpcc(req, seqno, &res); + ser(w, res); + reader.set_anchor(); + } + } + } + +} + +// XXX +recovery_t +make_tpcc_recovery(int mypos, int nnodes, int seqno) +{ + mypos = nnodes = seqno; + throw_not_implemented(); + return recovery_t(); +} + /** * Actually do the work of executing a transaction and sending back the reply. * @@ -936,7 +1095,6 @@ sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); st_reader reader(leader, rbuf.get(), rbuf.size()); - vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), static_cast<ssize_t>(len)); @@ -998,7 +1156,7 @@ } } else { st_intr intr(stop_hub); - prefix = reader.read<uint32_t>(); + prefix = ntohl(reader.read<uint32_t>()); check(prefix < 10000); batch.Clear(); } @@ -1103,7 +1261,7 @@ #if 0 template<typename mii> unique_ptr<Recovery> -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { // TODO make this faster cout << "generating recovery..." << endl; @@ -1134,7 +1292,7 @@ template<typename mii> recovery_t -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { return recovery_t(); } @@ -1156,7 +1314,7 @@ template<> recovery_t -make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int &seqno) +make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int seqno) { const commons::array<entry> &src = map.get_table(); pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); @@ -1233,7 +1391,7 @@ try { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -1251,7 +1409,7 @@ // to get all the acks back). st_intr intr(kill_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } for (int i = 0; i < batch.res_size(); ++i) { @@ -1353,6 +1511,142 @@ h.run<Types>(); } +class tpcc_response_handler +{ +public: + tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + start_seqno(seqno), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + finally f(bind(&tpcc_response_handler::cleanup, this)); + + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(replica, rbuf.get(), rbuf.size()); + writer w(lambda(const void*, size_t) { + throw not_supported_exception("response handler should not be writing"); + }, wbuf.get(), wbuf.size()); + stream s(reader,w); + + long long last_display_time = current_time_millis(); + + function<void()> loop_cleanup = + bind(&tpcc_response_handler::loop_cleanup, this); + + TpccRes res; + + while (true) { + finally f(loop_cleanup); + + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (stopped_issuing) { + st_intr intr(stop_hub); + readmsg(reader, res); + } else { + st_intr intr(kill_hub); + readmsg(reader, res); + } + + if (res.seqno() < last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(res.seqno())); + + if (!caught_up) { + long long now = current_time_millis(), time_diff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": " << "recovering node caught up; took " + << time_diff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + + if (check_interval(res.seqno(), handle_responses_display)) { + cout << rid << ": " << "got response " << res.seqno() << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, res.seqno(), + res.seqno() - handle_responses_display); + last_display_time = display_time; + } + if (check_interval(res.seqno(), yield_interval)) { + st_sleep(0); + } + last_seqno = res.seqno(); + } + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = last_seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = last_seqno; + cout << rid << ": "; + showtput("during recovery, finished roughly", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + } + + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled roughly", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +void +handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + tpcc_response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); +} + /** * Help the recovering node. * @@ -1458,11 +1752,11 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function<void()> f = + foreach (const replica_info &r, replicas) newreps.push(r); + const function<void()> f = do_tpcc ? + bind(issue_tpcc, ref(newreps), ref(seqno), ref(accept_joiner)) : bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_thread_t issue_txns_thread = my_spawn(f, "issue_txns"); - foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_issue_txns(issue_txns_thread); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); finally fin(lambda () { cout << "LEADER SUMMARY" << endl; @@ -1485,9 +1779,14 @@ st_thread_group handlers; int rid = 0; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true), - "handle_responses")); + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + else + fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -1560,6 +1859,33 @@ }); st_channel<recovery_t> send_states; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + } + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. @@ -1608,8 +1934,12 @@ // Process txns. st_channel<chunk> backlog; - const function<void()> process_fn = - bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); @@ -1775,7 +2105,7 @@ reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { char *start = reader.start(); - uint32_t prefix = reader.read<uint32_t>(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); ASSERT(prefix < 10000); ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); @@ -1883,9 +2213,200 @@ } } +class st_tpcc : public TPCCDB +{ + private: + commons::array<char> a_; + writer writer_; + TpccReq req_; + int seqno_; + + public: + st_tpcc(const vector<st_netfd_t> &fds) : + a_(buf_size), + writer_(lambda(const void *buf, size_t len) { + foreach (st_netfd_t dst, __ref(fds)) + st_timed_write(dst, buf, len); + }, a_, buf_size) {} + + void flush() { writer_.mark_and_flush(); } + void set_seqno(int seqno) { seqno_ = seqno; } + + void sendRec() { + req_.Clear(); + req_.set_seqno(-2); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + void sendStop() { + req_.Clear(); + req_.set_seqno(-1); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + // Executes the TPC-C "slev" transaction. From the last 20 orders, returns the number of rows in + // the STOCK table that have S_QUANTITY < threshold. See TPC-C 2.8 (page 43). + int stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { + warehouse_id = district_id = threshold = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + OrderStatusOutput* output) { + warehouse_id = district_id = customer_id = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, + OrderStatusOutput* output) { + warehouse_id = district_id = 0; c_last = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C new order transaction. Enter the new order for customer_id into the + // database. See TPC-C 2.4 (page 27). Returns true if the transaction commits. + bool newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + const vector<NewOrderItem>& items, const char* now, + NewOrderOutput* output) { + req_.Clear(); + req_.set_seqno(seqno_); + + NewOrderMsg &new_order = *req_.mutable_new_order(); + new_order.set_warehouse_id(warehouse_id); + new_order.set_district_id(district_id); + new_order.set_customer_id(customer_id); + foreach (const NewOrderItem &item, items) { + NewOrderItemMsg &msg = *new_order.add_item(); + msg.set_i_id(item.i_id); + msg.set_ol_supply_w_id(item.ol_supply_w_id); + msg.set_ol_quantity(item.ol_quantity); + } + new_order.set_now(now); + + ser(writer_, req_); + output = nullptr; + return true; + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_district_id = c_warehouse_id = customer_id = 0; + h_amount = 0; now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, const char* c_last, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_warehouse_id = c_district_id = 0; + h_amount = 0; c_last = now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C delivery transaction. Delivers the oldest undelivered transaction in each + // district in warehouse_id. See TPC-C 2.7 (page 39). + void delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, + vector<DeliveryOrderInfo>* orders) { + warehouse_id = carrier_id = 0; now = 0; orders = 0; + throw_not_implemented(); + } +}; + void -run_tpcc() +issue_tpcc(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) { + vector<st_netfd_t> fds; + long long start_time = current_time_millis(); + + finally f(lambda () { + showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), + 0); + }); + + SystemClock* clock = new SystemClock(); + + // Change the constants for run + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + st_tpcc &tables = *new st_tpcc(fds); + TPCCClient client(clock, random, &tables, Item::NUM_ITEMS, nwarehouses, + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + while (!stop_hub) { + // Did we get a new member? If so, notify an arbitrary member (the first + // one) to prepare to send recovery information (by sending an + // empty/default Txn). + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { + tables.sendRec(); + } + + // Bring in any new members. + // TODO more efficient: copy/extend/append + while (!newreps.empty()) { + fds.push_back(newreps.take().fd()); + } + + tables.set_seqno(seqno); + client.doOne(); + + // Checkpoint. + if (check_interval(seqno, yield_interval)) st_sleep(0); + if (check_interval(seqno, issue_display)) { + cout << "issued txn " << seqno << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << seqno << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + } + + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + tables.flush(); + } + + // Are we to accept a new joiner? + if (seqno == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (seqno == stop_on_seqno) { + cout << "stopping on issue of seqno " << seqno << endl; + stop_hub.set(); + } + + ++seqno; + + // Pause? + if (do_pause) + do_pause.waitreset(); + } + + if (!fds.empty()) { + tables.sendStop(); + } +} + +void +run_tpcc_demo() +{ TPCCTables* tables = new TPCCTables(); SystemClock* clock = new SystemClock(); @@ -1996,6 +2517,8 @@ "exit after the joiner fully recovers (for leader)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader)") + ("tpcc", po::bool_switch(&do_tpcc), + "run the TPCC workload") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader)") ("accept-joiner-size,s", Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-16 22:32:07 UTC (rev 1294) @@ -144,6 +144,8 @@ // This is not strictly accurate: The requirement is for certain *minimum* percentages to be // maintained. This is close to the right thing, but not precisely correct. // See TPC-C 5.2.4 (page 68). + doNewOrder(); +#if 0 int x = generator_->number(1, 100); if (x <= 4) { // 4% doStockLevel(); @@ -157,6 +159,7 @@ ASSERT(x > 100-45); doNewOrder(); } +#endif } int32_t TPCCClient::generateWarehouse() { Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/ydb.proto 2009-03-16 22:32:07 UTC (rev 1294) @@ -85,3 +85,63 @@ message ResponseBatch { repeated Response res = 1; } + +// +// TPCC messages +// + +message NewOrderItemMsg { + required int32 i_id = 1; + required int32 ol_supply_w_id = 2; + required int32 ol_quantity = 3; +} + +message NewOrderMsg { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 customer_id = 3; + repeated NewOrderItemMsg item = 4; + required string now = 5; +} + +message ItemInfoMsg { + required int32 s_quantity = 1; + required float i_price = 2; + // TODO: Client can compute this from other values. + required float ol_amount = 3; + // char + required int32 brand_generic = 4; + required string i_name = 5; +} + +message NewOrderOutputMsg { + required float w_tax = 1; + required float d_tax = 2; + + // From district d_next_o_id + required int32 o_id = 3; + + required float c_discount = 4; + + // TODO: Client can compute this from other values. + required float total = 5; + + repeated ItemInfoMsg item = 6; + required string c_last = 7; + required string c_credit = 8; + + //static const int MAX_STATUS = 25; + //static const char INVALID_ITEM_STATUS[]; + //char status[MAX_STATUS+1]; + required string status = 9; +} + +message TpccReq { + required int32 seqno = 1; + optional NewOrderMsg new_order = 2; +} + +message TpccRes { + required int32 seqno = 1; + optional NewOrderOutputMsg new_order = 2; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 02:35:07
|
Revision: 1295 http://assorted.svn.sourceforge.net/assorted/?rev=1295&view=rev Author: yangzhang Date: 2009-03-17 02:34:49 +0000 (Tue, 17 Mar 2009) Log Message: ----------- added size-tracking to btree Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpcctables.cc ydb/trunk/src/tpcc/tpcctables.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/README 2009-03-17 02:34:49 UTC (rev 1295) @@ -684,6 +684,59 @@ - stock 100k 306 - item 100k 82 +- DONE add size-tracking to btree + - new order seems to be causing the store to grow nearly linearly + - initially: + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 30000 + |ordersBC| = 30000 + |orderlines| = 299484 + |cbn| = 30000 + |neworders| = 9000 + |history| = 30000 + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 30000 + |ordersBC| = 30000 + |orderlines| = 299462 + |cbn| = 30000 + |neworders| = 9000 + |history| = 30000 + + - with 1000 txns: + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 3099 (almost 30000+1000)1 + |ordersBC| = 30991 + |orderlines| = 309477 (almost 300000+1000*10) + |cbn| = 30000 + |neworders| = 9991 + |history| = 30000 + + - with 100000 txns (100x greater than 1000): + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 128973 (almost 30000+100000) + |ordersBC| = 128973 + |orderlines| = 1288365 (almost 300000+100000*10) + |cbn| = 30000 + |neworders| = 107973 + |history| = 30000 + +- TODO add iteration to btree - TODO try integrating TPC-C; just get txns working for now - TODO implement TPC-C recovery Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/main.lzz.clamp 2009-03-17 02:34:49 UTC (rev 1295) @@ -1758,21 +1758,7 @@ bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); st_joining join_issue_txns(my_spawn(f, "issue_txns")); - finally fin(lambda () { - cout << "LEADER SUMMARY" << endl; - cout << "- total updates = " << updates << endl; - cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " - << g_map.size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const entry &p, g_map) { - of << p.first << ": " << p.second << endl; - } - } - }); + finally fin(bind(summarize, "LEADER", ref(seqno))); try { // Start handling responses. @@ -1825,6 +1811,36 @@ } } +void +summarize(const char *role, int seqno) +{ + cout << role << " SUMMARY\n"; + if (do_tpcc) { + cout << "seqno: " << seqno << endl; + if (g_tables != nullptr) { + cout << "state:\n"; + g_tables->show(); + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + // XXX iterate & dump + } + } + } else { + cout << "- total updates = " << updates << "\n" + << "- final DB state: seqno = " << seqno << ", size = " + << g_map.size() << endl; + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + cout << "- dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << seqno << endl; + foreach (const entry &p, g_map) { + of << p.first << ": " << p.second << endl; + } + } + } +} + /** * Run a replica. */ @@ -1840,25 +1856,8 @@ } // Initialize database state. + int seqno = -1; mii &map = g_map; - int seqno = -1; - finally f(lambda () { - cout << "REPLICA SUMMARY" << endl; - cout << "- total updates = " << updates << endl; - cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " - << __ref(map).size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const entry &p, __ref(map)) { - of << p.first << ": " << p.second << endl; - } - } - }); - st_channel<recovery_t> send_states; - if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -1884,8 +1883,12 @@ } cout << "loaded " << nwarehouses << " warehouses in " << current_time_millis() - start_time << " ms" << endl; + tables->show(); } + finally f(bind(summarize, "REPLICA", ref(seqno))); + st_channel<recovery_t> send_states; + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/btree.h 2009-03-17 02:34:49 UTC (rev 1295) @@ -47,7 +47,8 @@ // Builds a new empty tree. BPlusTree() : depth(0), - root(new_leaf_node()) + root(new_leaf_node()), + size_(0) { // DEBUG // cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl; @@ -140,6 +141,7 @@ index= leaf_position_for(key, leaf->keys, leaf->num_keys); if( leaf->keys[index] == key ) { leaf->values[index] = 0; + --size_; return true; } else { return false; @@ -201,6 +203,8 @@ unsigned sizeof_leaf_node() const { return sizeof(LeafNode); } + + size_t size() const { return size_; } private: @@ -378,7 +382,7 @@ return was_split; } - static void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, + void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, unsigned index) { assert( node->type == NODE_LEAF ); assert( node->num_keys < M ); @@ -398,6 +402,7 @@ node->num_keys++; node->keys[index]= key; node->values[index]= value; + ++size_; } } @@ -506,6 +511,7 @@ // Pointer to the root node. It may be a leaf or an inner node, but // it is never null. void* root; + size_t size_; }; #endif // !defined BPLUSTREE_HPP_227824 Modified: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-17 02:34:49 UTC (rev 1295) @@ -5,6 +5,7 @@ #include <vector> #include <commons/assert.h> +#include <iostream> #include "stlutil.h" using std::vector; @@ -605,3 +606,17 @@ History* h = new History(history); history_.push_back(h); } + +void TPCCTables::show() const { + using namespace std; + cout << "|warehouses| = " << warehouses_.size() << "\n" + << "|stock| = " << stock_.size() << "\n" + << "|districts| = " << districts_.size() << "\n" + << "|customers| = " << customers_.size() << "\n" + << "|orders| = " << orders_.size() << "\n" + << "|ordersBC| = " << orders_by_customer_.size() << "\n" + << "|orderlines| = " << orderlines_.size() << "\n" + << "|cbn| = " << customers_by_name_.size() << "\n" + << "|neworders| = " << neworders_.size() << "\n" + << "|history| = " << history_.size() << endl; +} Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-03-17 02:34:49 UTC (rev 1295) @@ -67,6 +67,8 @@ const std::vector<const History*>& history() const { return history_; } void insertHistory(const History& history); + void show() const; + static const int KEYS_PER_INTERNAL = 8; static const int KEYS_PER_LEAF = 8; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 06:11:50
|
Revision: 1297 http://assorted.svn.sourceforge.net/assorted/?rev=1297&view=rev Author: yangzhang Date: 2009-03-17 06:11:47 +0000 (Tue, 17 Mar 2009) Log Message: ----------- added iterator (+tests) for btree Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h Added Paths: ----------- ydb/trunk/src/tpcc/btree_test.cc Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/README 2009-03-17 06:11:47 UTC (rev 1297) @@ -736,7 +736,8 @@ |neworders| = 107973 |history| = 30000 -- TODO add iteration to btree +- DONE add iteration to btree + - added btree_test for this - TODO try integrating TPC-C; just get txns working for now - TODO implement TPC-C recovery Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/src/tpcc/Makefile 2009-03-17 06:11:47 UTC (rev 1297) @@ -6,12 +6,15 @@ #CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) # Link withthe C++ standard library -LDFLAGS=-lstdc++ +#LDFLAGS=-lstdc++ +LDLIBS = -lgtest -BINARIES = tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o +BINARIES = btree_test.o tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o all: $(BINARIES) +btree_test: btree_test.o + clean : rm -f *.o *.d $(BINARIES) Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/src/tpcc/btree.h 2009-03-17 06:11:47 UTC (rev 1297) @@ -9,14 +9,25 @@ #include <assert.h> #include <stdlib.h> #include <string.h> +#include <deque> +#include <utility> +#include <iostream> // XXX #include <boost/static_assert.hpp> #include <boost/pool/object_pool.hpp> +#include <commons/nullptr.h> +#ifdef DEBUG +#include <iostream> +using namespace std; // XXX +#endif + // DEBUG -#include <iostream> using std::cout; using std::endl; +using namespace commons; +using std::deque; +using std::pair; #ifdef __linux__ #define HAVE_POSIX_MEMALIGN @@ -206,6 +217,62 @@ size_t size() const { return size_; } + class iterator + { + public: + typedef typename BPlusTree::InnerNode inner_type; + typedef typename BPlusTree::LeafNode leaf_type; + iterator(BPlusTree &tree) : tree_(tree) { + if (tree.size_ > 0) { + stack_.push_back(make_pair(tree.root, 0)); + // If depth = 0, that means we have only a root node. + // 1 isn't <= 0 so we don't do anything, which is good. + // If depth = 1, that means we have a root and children. + // 1 is <= 1 so we do add the first child, which is good. + down(); + } + } + iterator &operator++() { +#ifdef DEBUG + for (int i = 0; i < stack_.size(); ++i) + cout << (i > 0 ? " > " : "") << stack_[i].first << ' ' << stack_[i].second + << '/' << (i == tree_.depth ? as_inner(stack_[i].first).num_keys : as_leaf(stack_[i].first).num_keys); + cout << endl; +#endif + up(); if (!stack_.empty()) down(); +#ifdef DEBUG + for (int i = 0; i < stack_.size(); ++i) + cout << (i > 0 ? " > " : "") << stack_[i].first << ' ' << stack_[i].second + << '/' << (i == tree_.depth ? as_inner(stack_[i].first).num_keys : as_leaf(stack_[i].first).num_keys); + cout << endl; +#endif + return *this; + } + pair<KEY, VALUE> operator*() { + return make_pair(leaf().keys[pos()], leaf().values[pos()]); + } + bool end() { return stack_.empty(); } + private: + inner_type &as_inner(void *p) { return *reinterpret_cast<inner_type*>(p); } + leaf_type &as_leaf(void *p) { return *reinterpret_cast<leaf_type*>(p); } + inner_type &inner() { return as_inner(stack_.back().first); } + leaf_type &leaf() { return as_leaf(stack_.back().first); } + size_t &pos() { return stack_.back().second; } + void down() { + while (stack_.size() <= tree_.depth) + stack_.push_back(make_pair(inner().children[pos()], 0)); + } + void up() { + while (!stack_.empty() && + ++pos() > (stack_.size() == tree_.depth + 1 ? + leaf().num_keys - 1 : inner().num_keys)) + stack_.pop_back(); // back up + } + BPlusTree &tree_; + // stack of (node, pos) + deque< pair<void*, size_t> > stack_; + }; + private: // Used when debugging enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; Added: ydb/trunk/src/tpcc/btree_test.cc =================================================================== --- ydb/trunk/src/tpcc/btree_test.cc (rev 0) +++ ydb/trunk/src/tpcc/btree_test.cc 2009-03-17 06:11:47 UTC (rev 1297) @@ -0,0 +1,61 @@ +#include "btree.h" +#include <gtest/gtest.h> +using namespace std; +using namespace testing; + +typedef BPlusTree<int, int, 8, 8> tree_t; + +TEST(btree, empty) { + tree_t tree; + tree_t::iterator it(tree); + ASSERT_TRUE(it.end()); +} + +TEST(btree, shallow) { + tree_t tree; + for (int i = 1; i < 4; ++i) + tree.insert(2*i, 3*i); // [(2,3),(4,6),(6,9)] + { + tree_t::iterator it(tree); + for (int i = 1; i < 4; ++i) { + EXPECT_EQ(2*i, (*it).first); + EXPECT_EQ(3*i, (*it).second); + ++it; + } + EXPECT_TRUE(it.end()); + } + + tree.del(4); // [(2,3),(4,0),(6,9)] + + { + tree_t::iterator it(tree); + EXPECT_EQ(2, (*it).first); + EXPECT_EQ(3, (*it).second); + ++it; + EXPECT_EQ(4, (*it).first); + EXPECT_EQ(0, (*it).second); + ++it; + EXPECT_EQ(6, (*it).first); + EXPECT_EQ(9, (*it).second); + ++it; + EXPECT_TRUE(it.end()); + } +} + +TEST(btree, deep) { + tree_t tree; + for (int i = 1; i < 100; ++i) + tree.insert(2*i, 3*i); + tree_t::iterator it(tree); + for (int i = 1; i < 100; ++i) { + EXPECT_EQ(2*i, (*it).first); + EXPECT_EQ(3*i, (*it).second); + ++it; + } + EXPECT_TRUE(it.end()); +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-18 09:17:15
|
Revision: 1301 http://assorted.svn.sourceforge.net/assorted/?rev=1301&view=rev Author: yangzhang Date: 2009-03-18 09:17:01 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - added recovery, backlogging, catchup to ydb for tpcc - added ser(), deser() to TPCCTables - added separate (and simple) recovery build-up/catch-up logic to the main thread - added marker, first_seqno_in_chunk, proper overflow_fn (can still be simplified/refactored in conj with "anchored_stream_reader" design) - cleaned up memory management so that we don't double-free things that are backed by a recovery message buffer - added tpcc_recovery_header - added process_buf for catching up (replaying) tpccs from a buffer - added yielding to process_tpccs and reduced yield interval so that other things get a chance to run besides the process_tpccs thread - build system - added cog; used extensively for ser/deser - using gnu++0x for tpcc as well - added SECONDARY targets - added null-read-checking in readmsg - ignore null res in process_tpcc - added some typedefs to bplustree - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpcctables.h Added Paths: ----------- ydb/trunk/src/tpcc/tpcctables.cc.cog Removed Paths: ------------- ydb/trunk/src/tpcc/tpcctables.cc Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/README 2009-03-18 09:17:01 UTC (rev 1301) @@ -716,7 +716,7 @@ |stock| = 100000 |districts| = 10 |customers| = 30000 - |orders| = 3099 (almost 30000+1000)1 + |orders| = 30991 (almost 30000+1000) |ordersBC| = 30991 |orderlines| = 309477 (almost 300000+1000*10) |cbn| = 30000 @@ -736,12 +736,48 @@ |neworders| = 107973 |history| = 30000 +- DONE try integrating TPC-C; just get txns working for now + - DONE add iteration to btree - added btree_test for this -- TODO try integrating TPC-C; just get txns working for now +- DONE make process_tpccs switch into shifting mode (instead of buffer-swapping + mode) after catch-up +- DONE make process_tpccs rewind & process the accumulated buffer once seqno + has caught up to the first seqno in the buffer +- DONE implement TPC-C network recovery -- TODO implement TPC-C recovery +- TODO full TPC-C txn workload +- TODO prelim measurements on cluster +- TODO PAPER!!! + +- TODO related work + + [HST+91] Svein-Olaf Hvasshovd, Tore Saeter, Oystein Torboørnsen, Petter + Moe, and Oddvar Risnes. A continously available and highly scalable + transaction server: Design experience from the HypRa project. In + Proceedings of the 4th International Workshop on High Performance + Transaction Systems, September 1991. + + Ricardo Jimenez-Peris , M. Patino-Martinez , Gustavo Alonso , Bettina + Kemme, Are quorums an alternative for data replication?, ACM Transactions + on Database Systems (TODS), v.28 n.3, p.257-294, September 2003 + + ~\cite{peris-srds202}R. Jimenez-Peris , M. Patino-Martinez , G. Alonso, + An algorithm for non-intrusive, parallel recovery of replicated data and + its correctness, Proceedings of the 21st IEEE Symposium on Reliable + Distributed Systems (SRDS'02), p.150, October 13-16, 2002 + + ~\cite{bartoli-dsn01}B. Kemme, A. Bartoli, and O. Babaoglu. Online + Reconfiguration in Replicated Databases Based on Group Communication. In + Proc. of the Int. Conf. on Dependable Systems and Networks (DSN 2001), + Goteborg, Sweden, June 2001. + + ~\cite{amir-thesis} Y. Amir. Replication Using Group Communication Over a + Dynamic Network. PhD thesis, Institute of Computer Science, The Hebrew + University of Jerusalem, Israel. Also available at + http://www.dsn.jhu.edu/~yairamir/Yair_phd.pdf, 1995. + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -104,6 +104,11 @@ tpcc/%.o: tpcc/%.cc make -C tpcc/ +tpcc/%.cc: tpcc/%.cc.cog + make -C tpcc/ + +.SECONDARY: tpcc/tpcctables.cc tpcc/tpcctables.o + %.o: %.cc $(COMPILE.cc) $(OUTPUT_OPTION) $< Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 09:17:01 UTC (rev 1301) @@ -581,7 +581,7 @@ readmsg(anchored_stream_reader &src, T &msg) { uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(src.read(len), len)); + check(msg.ParseFromArray(checkpass(src.read(len)), len)); } enum { op_del, op_write, op_commit }; @@ -923,27 +923,30 @@ dst.ol_quantity = src.ol_quantity(); } NewOrderOutput output; - g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), - items, no.now().c_str(), &output); - res->Clear(); - res->set_seqno(seqno); - NewOrderOutputMsg &outmsg = *res->mutable_new_order(); - outmsg.set_w_tax(output.w_tax); - outmsg.set_d_tax(output.d_tax); - outmsg.set_o_id(output.o_id); - outmsg.set_c_discount(output.c_discount); - outmsg.set_total(output.total); - foreach (const NewOrderOutput::ItemInfo &src, output.items) { - ItemInfoMsg &dst = *outmsg.add_item(); - dst.set_s_quantity(src.s_quantity); - dst.set_i_price(src.i_price); - dst.set_ol_amount(src.ol_amount); - dst.set_brand_generic(src.brand_generic); - dst.set_i_name(src.i_name); + g_tables->newOrder(no.warehouse_id(), no.district_id(), + no.customer_id(), items, no.now().c_str(), + &output); + if (res != nullptr) { + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); } - outmsg.set_c_last(output.c_last); - outmsg.set_c_credit(output.c_credit); - outmsg.set_status(output.status); } else { throw_not_implemented(); } @@ -963,14 +966,38 @@ // the seqno reported by the leader in the Init message, but it may have // issued more since the Init message). int first_seqno = -1; + char *marker = nullptr; + int first_seqno_in_chunk = -1; + TpccReq req; + TpccRes res; - function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { - sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); - memcpy(buf.get(), reader.start(), reader.unread()); - swap(buf, reader.buf()); - __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); - reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); - }; + function<void(anchored_stream_reader& reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + if (__ref(caught_up)) { + // Anchor should already be correctly set, so just shift down. + shift_reader(reader); + } else if (__ref(first_seqno_in_chunk) == __ref(seqno) + 1) { + // Has the replayer just caught up to the start of the chunk? + ASSERT(reader.buf().get() == reader.anchor()); + // Replay all messages up to but not included the current unprocessed + // message (which we may be in the middle of receiving, triggering this + // overflow). + process_buf(reader.anchor(), __ref(marker), __ref(req), __ref(seqno)); + // Update the anchor to point to the unprocessed message, so that we + // shift the unprocessed message down. + reader.anchor() = __ref(marker); + shift_reader(reader); + } else { + // Push onto backlog and put in new buffer. + ASSERT(reader.buf().get() == reader.anchor()); + __ref(backlog).push(make_tuple(reader.buf(), reader.anchor(), __ref(marker))); + reader.anchor() = __ref(marker); + replace_reader(reader); + cout << "added to backlog, now has " << __ref(backlog).queue().size() + << " chunks" << endl; + } + __ref(marker) = reader.buf().get(); + }; sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); @@ -996,11 +1023,16 @@ __ref(w).mark_and_flush(); }); - TpccReq req; - TpccRes res; - while (true) { + // Has replayer just caught up to the start of the chunk? + if (first_seqno_in_chunk == seqno + 1) { + process_buf(reader.anchor(), reader.start(), req, seqno); + reader.set_anchor(); + } + + marker = reader.start(); + { st_intr intr(stop_hub); readmsg(reader, req); @@ -1013,35 +1045,67 @@ // Prepare recovery msg. send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); } else { - // Backlog (auto-implicit) or process. - if (check_interval(req.seqno(), process_display)) - cout << "processing req " << req.seqno() << endl; - if (req.seqno() == seqno + 1) { - if (!caught_up) { + // Backlog (auto/implicit) or process. + if (!caught_up) { + // If we were at the start of a new buffer (our chunk was recently reset). + if (reader.buf().get() == marker) + first_seqno_in_chunk = req.seqno(); + // If we fully caught up. + if (req.seqno() == seqno + 1) { time_caught_up = current_time_millis(); seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", + showtput("process_tpccs caught up; backlogged", time_caught_up, start_time, seqno_caught_up, first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } + } + if (caught_up) { // Process. process_tpcc(req, seqno, &res); ser(w, res); reader.set_anchor(); } + + // Display/yield. + if (check_interval(req.seqno(), process_display)) + cout << (caught_up ? "processed req " : "backlogged req ") + << req.seqno() << endl; + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); } } } -// XXX +void +process_buf(char *begin, char *end, TpccReq &req, int &seqno) +{ + ASSERT(begin < end); + raw_reader reader(begin); + while (reader.ptr() < end) { + uint32_t len = ntohl(reader.read<uint32_t>()); + ASSERT(len < 10000); + ASSERT(reinterpret_cast<char*>(reader.ptr()) + len <= end); + check(req.ParseFromArray(reader.readptr(len), len)); + process_tpcc(req, seqno, nullptr); + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); + if (check_interval(req.seqno(), process_display)) { + cout << "caught up to req " << req.seqno() << endl; + } + } +} + recovery_t make_tpcc_recovery(int mypos, int nnodes, int seqno) { - mypos = nnodes = seqno; - throw_not_implemented(); - return recovery_t(); + long long start_time = current_time_millis(); + cout << "serializing recovery, db state is now at seqno " + << seqno << ":" << endl; + g_tables->show(); + recovery_t recovery = g_tables->ser(mypos, nnodes, seqno); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); + return recovery; } /** @@ -1799,9 +1863,17 @@ // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + else + handle_responses_joiner_fn = + bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, + ref(recover_signals), false); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, - ref(recover_signals), false), + handlers.insert(my_spawn(handle_responses_joiner_fn, "handle_responses_joiner")); } catch (break_exception &ex) { } catch (std::exception &ex) { @@ -1858,6 +1930,7 @@ // Initialize database state. int seqno = -1; mii &map = g_map; + commons::array<char> recarr(0); if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -1953,203 +2026,334 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - if (rec_pwal) { - // Recover from physical log. - cout << "recovering from pwal" << endl; - long long start_time = current_time_millis(); - ifstream inf("pwal"); - binary_iarchive in(inf); - int rseqno = -1; - while (inf.peek() != ifstream::traits_type::eof()) { - int op; - in & op; - switch (op) { - case op_del: - { - int key; - in & key; - mii::iterator it = map.find(key); - map.erase(it); - break; + if (do_tpcc) { + + // + // TPCC txns + // + + g_tables.reset(new TPCCTables); + + if (rec_pwal) { + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + +#if 0 + // Resize the table if necessary. + + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - case op_write: - { - int key, val; - in & key & val; - map[key] = val; - break; - } - case op_commit: - ++rseqno; - break; + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); +#endif + }, "recovery_builder" + lexical_cast<string>(i))); } - if (check_interval(rseqno, yield_interval)) st_sleep(0); + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - seqno = init.txnseqno() - 1; - showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); - cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } else { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + // + // Simple txns + // - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - long long start_time = current_time_millis(); - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); - long long end_time = current_time_millis(); + // + // Build-up + // - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - showdatarate("got recovery message", len, end_time - start_time); - cout << "receive took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - } - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - // XXX - using msg::TxnBatch; - using msg::Txn; - commons::array<char> rbuf(0), wbuf(buf_size); - reader reader(nullptr, rbuf.get(), rbuf.size()); - writer writer(lambda(const void*, size_t) { - throw not_supported_exception("should not be writing responses during catch-up phase"); - }, wbuf.get(), wbuf.size()); - stream s(reader, writer); - TxnBatch batch(s); - while (!backlog.empty()) { - chunk chunk = backlog.take(); - sized_array<char> &buf = chunk.get<0>(); - ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - ASSERT(chunk.get<1>() < chunk.get<2>()); - swap(buf, reader.buf()); - reader.reset_range(chunk.get<1>(), chunk.get<2>()); - while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = ntohl(reader.read<uint32_t>()); - ASSERT(prefix < 10000); - ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); - batch.Clear(); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; - process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } + // + // Catch-up + // - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "caught up txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); + if (fake_exec && !Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - } - g_caught_up = true; + g_caught_up = true; #if 0 - while (!backlog.empty()) { - using pb::Txn; - shared_ptr<Txn> p = backlog.take(); - process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); } -#endif - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); } } catch (std::exception &ex) { cerr_thread_ex(ex) << endl; @@ -2566,7 +2770,7 @@ "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") - ("yield_interval,y", po::value<int>(&yield_interval)->default_value(10000), + ("yield-interval,y", po::value<int>(&yield_interval)->default_value(1000), "number of txns before yielding") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,9 +1,9 @@ WARNINGS = -Werror -Wall -Wextra -Wconversion -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings -Woverloaded-virtual -Wno-sign-compare -Wno-unused-parameter # Debug flags -CXXFLAGS = -g -MD $(WARNINGS) -I.. +CXXFLAGS = -g -MD $(WARNINGS) -I.. -std=gnu++0x # Optimization flags -#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) +#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) -std=gnu++0x # Link withthe C++ standard library #LDFLAGS=-lstdc++ @@ -15,7 +15,12 @@ btree_test: btree_test.o +%.cc: %.cc.cog + cog.py $< > $@ + clean : rm -f *.o *.d $(BINARIES) -include *.d + +.SECONDARY: tpcctables.cc Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/btree.h 2009-03-18 09:17:01 UTC (rev 1301) @@ -46,9 +46,13 @@ template <typename KEY, typename VALUE, unsigned N, unsigned M, unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, unsigned NODE_ALIGNMENT= 64> - class BPlusTree +class BPlusTree { public: + typedef pair<KEY, VALUE> entry_type; + typedef KEY key_type; + typedef VALUE data_type; + // N must be greater than two to make the split of // two inner nodes sensible. BOOST_STATIC_ASSERT(N>2); @@ -222,7 +226,8 @@ public: typedef typename BPlusTree::InnerNode inner_type; typedef typename BPlusTree::LeafNode leaf_type; - iterator(BPlusTree &tree) : tree_(tree) { + typedef typename BPlusTree::entry_type entry_type; + iterator(const BPlusTree &tree) : tree_(tree) { if (tree.size_ > 0) { stack_.push_back(make_pair(tree.root, 0)); // If depth = 0, that means we have only a root node. @@ -248,7 +253,7 @@ #endif return *this; } - pair<KEY, VALUE> operator*() { + entry_type operator*() { return make_pair(leaf().keys[pos()], leaf().values[pos()]); } bool end() { return stack_.empty(); } @@ -268,7 +273,7 @@ leaf().num_keys - 1 : inner().num_keys)) stack_.pop_back(); // back up } - BPlusTree &tree_; + const BPlusTree &tree_; // stack of (node, pos) deque< pair<void*, size_t> > stack_; }; Deleted: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,622 +0,0 @@ -#include "tpcctables.h" - -#include <algorithm> -#include <limits> -#include <vector> - -#include <commons/assert.h> -#include <iostream> -#include "stlutil.h" - -using std::vector; - -bool CustomerByNameOrdering::operator()(const Customer* a, const Customer* b) { - if (a->c_w_id < b->c_w_id) return true; - if (a->c_w_id > b->c_w_id) return false; - assert(a->c_w_id == b->c_w_id); - - if (a->c_d_id < b->c_d_id) return true; - if (a->c_d_id > b->c_d_id) return false; - assert(a->c_d_id == b->c_d_id); - - int diff = strcmp(a->c_last, b->c_last); - if (diff < 0) return true; - if (diff > 0) return false; - assert(diff == 0); - - // Finally delegate to c_first - return strcmp(a->c_first, b->c_first) < 0; -} - -template <typename KeyType, typename ValueType> -static void deleteBTreeValues(BPlusTree<KeyType, ValueType*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* btree) { - KeyType key = std::numeric_limits<KeyType>::max(); - ValueType* value = NULL; - while (btree->findLastLessThan(key, &value, &key)) { - assert(value != NULL); - delete value; - } -} - -TPCCTables::~TPCCTables() { - // Clean up the b-trees with this gross hack - deleteBTreeValues(&warehouses_); - deleteBTreeValues(&stock_); - deleteBTreeValues(&districts_); - deleteBTreeValues(&orders_); - deleteBTreeValues(&orderlines_); - - STLDeleteValues(&neworders_); - STLDeleteElements(&customers_by_name_); - STLDeleteElements(&history_); -} - -int TPCCTables::stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { - /* EXEC SQL SELECT d_next_o_id INTO :o_id FROM district - WHERE d_w_id=:w_id AND d_id=:d_id; */ - //~ printf("stock level %d %d %d\n", warehouse_id, district_id, threshold); - District* d = findDistrict(warehouse_id, district_id); - int32_t o_id = d->d_next_o_id; - - /* EXEC SQL SELECT COUNT(DISTINCT (s_i_id)) INTO :stock_count FROM order_line, stock - WHERE ol_w_id=:w_id AND ol_d_id=:d_id AND ol_o_id<:o_id AND ol_o_id>=:o_id-20 - AND s_w_id=:w_id AND s_i_id=ol_i_id AND s_quantity < :threshold;*/ - - - // retrieve up to 300 tuples from order line, using ( [o_id-20, o_id), d_id, w_id, [1, 15]) - // and for each retrieved tuple, read the corresponding stock tuple using (ol_i_id, w_id) - // NOTE: This is a cheat because it hard codes the maximum number of orders. - // We really should use the ordered b-tree index to find (0, o_id-20, d_id, w_id) then iterate - // until the end. This will also do less work (wasted finds). Since this is only 4%, it probably - // doesn't matter much - - // TODO: Test the performance more carefully. I tried: std::set, std::hash_set, std::vector - // with linear search, and std::vector with binary search using std::lower_bound. The best - // seemed to be to simply save all the s_i_ids, then sort and eliminate duplicates at the end. - std::vector<int32_t> s_i_ids; - // Average size is more like ~30. - s_i_ids.reserve(300); - - // Iterate over [o_id-20, o_id) - for (int order_id = o_id - STOCK_LEVEL_ORDERS; order_id < o_id; ++order_id) { - // HACK: We shouldn't rely on MAX_OL_CNT. See comment above. - for (int line_number = 1; line_number <= Order::MAX_OL_CNT; ++line_number) { - OrderLine* line = findOrderLine(warehouse_id, district_id, order_id, line_number); - if (line == NULL) { - // We can break since we have reached the end of the lines for this order. - // TODO: A btree iterate in (w_id, d_id, o_id) order would be a clean way to do this -#ifndef NDEBUG - for (int test_line_number = line_number + 1; line_number < Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(warehouse_id, district_id, order_id, test_line_number) == NULL); - } -#endif - break; - } - - // Check if s_quantity < threshold - Stock* stock = findStock(warehouse_id, line->ol_i_id); - if (stock->s_quantity < threshold) { - s_i_ids.push_back(line->ol_i_id); - } - } - } - - // Filter out duplicate s_i_id: multiple order lines can have the same item - std::sort(s_i_ids.begin(), s_i_ids.end()); - int num_distinct = 0; - int32_t last = -1; // NOTE: This relies on -1 being an invalid s_i_id - for (size_t i = 0; i < s_i_ids.size(); ++i) { - if (s_i_ids[i] != last) { - last = s_i_ids[i]; - num_distinct += 1; - } - } - - return num_distinct; -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, OrderStatusOutput* output) { - //~ printf("order status %d %d %d\n", warehouse_id, district_id, customer_id); - internalOrderStatus(findCustomer(warehouse_id, district_id, customer_id), output); -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, OrderStatusOutput* output) { - //~ printf("order status %d %d %s\n", warehouse_id, district_id, c_last); - Customer* customer = findCustomerByName(warehouse_id, district_id, c_last); - internalOrderStatus(customer, output); -} - -void TPCCTables::internalOrderStatus(Customer* customer, OrderStatusOutput* output) { - output->c_id = customer->c_id; - // retrieve from customer: balance, first, middle, last - output->c_balance = customer->c_balance; - strcpy(output->c_first, customer->c_first); - strcpy(output->c_middle, customer->c_middle); - strcpy(output->c_last, customer->c_last); - - // Find the row in the order table with largest o_id - Order* order = findLastOrderByCustomer(customer->c_w_id, customer->c_d_id, customer->c_id); - output->o_id = order->o_id; - output->o_carrier_id = order->o_carrier_id; - strcpy(output->o_entry_d, order->o_entry_d); - - output->lines.resize(order->o_ol_cnt); - for (int32_t line_number = 1; line_number <= order->o_ol_cnt; ++line_number) { - OrderLine* line = findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number); - output->lines[line_number-1].ol_i_id = line->ol_i_id; - output->lines[line_number-1].ol_supply_w_id = line->ol_supply_w_id; - output->lines[line_number-1].ol_quantity = line->ol_quantity; - output->lines[line_number-1].ol_amount = line->ol_amount; - strcpy(output->lines[line_number-1].ol_delivery_d, line->ol_delivery_d); - } -#ifndef NDEBUG - // Verify that none of the other OrderLines exist. - for (int32_t line_number = order->o_ol_cnt+1; line_number <= Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number) == NULL); - } -#endif -} - -bool TPCCTables::newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, - const std::vector<NewOrderItem>& items, const char* now, NewOrderOutput* output) { - //~ printf("new order %d %d %d %d %s\n", warehouse_id, district_id, customer_id, items.size(), now); - // 2.4.3.4. requires that we display c_last, c_credit, and o_id for rolled back transactions: - // read those values first - District* d = findDistrict(warehouse_id, district_id); - output->d_tax = d->d_tax; - output->o_id = d->d_next_o_id; - assert(findOrder(warehouse_id, district_id, output->o_id) == NULL); - - Customer* c = findCustomer(warehouse_id, district_id, customer_id); - assert(sizeof(output->c_last) == sizeof(c->c_last)); - memcpy(output->c_last, c->c_last, sizeof(output->c_last)); - memcpy(output->c_credit, c->c_credit, sizeof(output->c_credit)); - output->c_discount = c->c_discount; - - // CHEAT: Validate all items to see if we will need to abort - vector<Item*> item_tuples(items.size()); - bool all_local = true; - for (int i = 0; i < items.size(); ++i) { - item_tuples[i] = findItem(items[i].i_id); - if (item_tuples[i] == NULL) { - strcpy(output->status, NewOrderOutput::INVALID_ITEM_STATUS); - return false; - } - all_local = all_local && items[i].ol_supply_w_id == warehouse_id; - } - - // We will not abort: update the status and the database state - output->status[0] = '\0'; - - // Modify the order id to assign it - d->d_next_o_id += 1; - - Warehouse* w = findWarehouse(warehouse_id); - output->w_tax = w->w_tax; - - Order order; - order.o_w_id = warehouse_id; - order.o_d_id = district_id; - order.o_id = output->o_id; - order.o_c_id = customer_id; - order.o_carrier_id = Order::NULL_CARRIER_ID; - order.o_ol_cnt = static_cast<int32_t>(items.size()); - order.o_all_local = all_local ? 1 : 0; - strcpy(order.o_entry_d, now); - assert(strlen(order.o_entry_d) == DATETIME_SIZE); - insertOrder(order); - insertNewOrder(warehouse_id, district_id, output->o_id); - - OrderLine line; - line.ol_o_id = output->o_id; - line.ol_d_id = district_id; - line.ol_w_id = warehouse_id; - memset(line.ol_delivery_d, 0, DATETIME_SIZE+1); - - output->items.resize(items.size()); - output->total = 0; - for (int i = 0; i < items.size(); ++i) { - line.ol_number = i+1; - line.ol_i_id = items[i].i_id; - line.ol_supply_w_id = items[i].ol_supply_w_id; - line.ol_quantity = items[i].ol_quantity; - - // Read and update stock - Stock* stock = findStock(items[i].ol_supply_w_id, items[i].i_id); - if (stock->s_quantity >= items[i].ol_quantity + 10) { - stock->s_quantity -= items[i].ol_quantity; - } else { - stock->s_quantity = stock->s_quantity - items[i].ol_quantity + 91; - } - output->items[i].s_quantity = stock->s_quantity; - assert(sizeof(line.ol_dist_info) == sizeof(stock->s_dist[district_id])); - memcpy(line.ol_dist_info, stock->s_dist[district_id], sizeof(line.ol_dist_info)); - stock->s_ytd += items[i].ol_quantity; - stock->s_order_cnt += 1; - if (items[i].ol_supply_w_id != warehouse_id) { - // remote order - stock->s_remote_cnt += 1; - } - bool stock_is_original = (strstr(stock->s_data, "ORIGINAL") != NULL); - - assert(sizeof(output->items[i].i_name) == sizeof(item_tuples[i]->i_name)); - memcpy(output->items[i].i_name, item_tuples[i]->i_name, sizeof(output->items[i].i_name)); - output->items[i].i_price = item_tuples[i]->i_price; - output->items[i].ol_amount = - static_cast<float>(items[i].ol_quantity) * item_tuples[i]->i_price; - line.ol_amount = output->items[i].ol_amount; - output->total += output->items[i].ol_amount; - if (stock_is_original && strstr(item_tuples[i]->i_data, "ORIGINAL") != NULL) { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::BRAND; - } else { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::GENERIC; - } - insertOrderLine(line); - } - - return true; -} - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %d %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, customer_id, h_amount, now); - Customer* customer = findCustomer(c_warehouse_id, c_district_id, customer_id); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, const char* c_last, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %s %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, c_last, h_amount, now); - Customer* customer = findCustomerByName(c_warehouse_id, c_district_id, c_last); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - -void TPCCTables::internalPayment(int32_t warehouse_id, int32_t district_id, Customer* c, - float h_amount, const char* now, PaymentOutput* output) { - Warehouse* w = findWarehouse(warehouse_id); - w->w_ytd += h_amount; - output->warehouse = *w; - - District* d = findDistrict(warehouse_id, district_id); - d->d_ytd += h_amount; - output->district = *d; - - c->c_balance -= h_amount; - c->c_ytd_payment += h_amount; - c->c_payment_cnt += 1; - if (strcmp(c->c_credit, Customer::BAD_CREDIT) == 0) { - // Bad credit: insert history into c_data - static const int HISTORY_SIZE = Customer::MAX_DATA+1; - char history[HISTORY_SIZE]; - int characters = snprintf(history, HISTORY_SIZE, "(%d, %d, %d, %d, %d, %.2f)\n", - c->c_id, c->c_d_id, c->c_w_id, district_id, warehouse_id, h_amount); - assert(characters < HISTORY_SIZE); - - // Perform the insert with a move and copy - int current_keep = static_cast<int>(strlen(c->c_data)); - if (current_keep + characters > Customer::MAX_DATA) { - current_keep = Customer::MAX_DATA - characters; - } - assert(current_keep + characters <= Customer::MAX_DATA); - memmove(c->c_data+characters, c->c_data, current_keep); - memcpy(c->c_data, history, characters); - c->c_data[characters + current_keep] = '\0'; - assert(strlen(c->c_data) == characters + current_keep); - } - output->customer = *c; - - // Insert the line into the history table - History h; - h.h_w_id = warehouse_id; - h.h_d_id = district_id; - h.h_c_w_id = c->c_w_id; - h.h_c_d_id = c->c_d_id; - h.h_c_id = c->c_id; - h.h_amount = h_amount; - strcpy(h.h_date, now); - strcpy(h.h_data, w->w_name); - strcat(h.h_data, " "); - strcat(h.h_data, d->d_name); - insertHistory(h); -} - -// forward declaration for delivery -static int64_t makeNewOrderKey(int32_t w_id, int32_t d_id, int32_t o_id); - -void TPCCTables::delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, - std::vector<DeliveryOrderInfo>* orders) { - //~ printf("delivery %d %d %s\n", warehouse_id, carrier_id, now); - orders->clear(); - for (int32_t d_id = 1; d_id <= District::NUM_PER_WAREHOUSE; ++d_id) { - // Find and remove the lowest numbered order for the district - int64_t key = makeNewOrderKey(warehouse_id, d_id, 1); - NewOrderMap::iterator iterator = neworders_.lower_bound(key); - NewOrder* neworder = NULL; - if (iterator != neworders_.end()) { - neworder = iterator->second; - assert(neworder != NULL); - } - if (neworder == NULL || neworder->no_d_id != d_id || neworder->no_w_id != warehouse_id) { - // No orders for this district - // TODO: 2.7.4.2: If this occurs in max(1%, 1) of transactions, report it (???) - continue; - } - assert(neworder->no_d_id == d_id && neworder->no_w_id == warehouse_id); - int32_t o_id = neworder->no_o_id; - neworders_.erase(iterator); - delete neworder; - - DeliveryOrderInfo order; - order.d_id = d_id; - order.o_id = o_id; - orders->push_back(order); - - Order* o = findOrder(warehouse_id, d_id, o_id); - assert(o->o_carrier_id == Order::NULL_CARRIER_ID); - o->o_carrier_id = carrier_id; - - float total = 0; - // TODO: Select based on (w_id, d_id, o_id) rather than using ol_number? - for (int32_t i = 1; i <= o->o_ol_cnt; ++i) { - OrderLine* line = findOrderLine(warehouse_id, d_id, o_id, i); - assert(0 == strlen(line->ol_delivery_d)); - strcpy(line->ol_delivery_d, now); - assert(strlen(line->ol_delivery_d) == DATETIME_SIZE); - total += line->ol_amount; - } - - Customer* c = findCustomer(warehouse_id, d_id, o->o_c_id); - c->c_balance += total; - c->c_delivery_cnt += 1; - } -} - -template <typename T> -static T* insert(BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* tree, int32_t key, const T& item) { - assert(!tree->find(key)); - T* copy = new T(item); - tree->insert(key, copy); - return copy; -} - -template <typename T> -static T* find(const BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>& tree, int32_t key) { - T* output = NULL; - if (tree.find(key, &output)) { - return output; - } - return NULL; -} - -void TPCCTables::insertItem(const Item& item) { - assert(item.i_id == items_.size() + 1); - items_.push_back(item); -} -Item* TPCCTables::findItem(int32_t id) { - assert(1 <= id); - id -= 1; - if (id >= items_.size()) return NULL; - return &items_[id]; -} - -void TPCCTables::insertWarehouse(const Warehouse& w) { - insert(&warehouses_, w.w_id, w); -} -Warehouse* TPCCTables::findWarehouse(int32_t id) { - return find(warehouses_, id); -} - -static int32_t makeStockKey(int32_t w_id, int32_t s_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= s_id && s_id <= Stock::NUM_STOCK_PER_WAREHOUSE); - int32_t id = s_id + (w_id * Stock::NUM_STOCK_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertStock(const Stock& stock) { - insert(&stock_, makeStockKey(stock.s_w_id, stock.s_i_id), stock); -} -Stock* TPCCTables::findStock(int32_t w_id, int32_t s_id) { - return find(stock_, makeStockKey(w_id, s_id)); -} - -static int32_t makeDistrictKey(int32_t w_id, int32_t d_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - int32_t id = d_id + (w_id * District::NUM_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertDistrict(const District& district) { - insert(&districts_, makeDistrictKey(district.d_w_id, district.d_id), district); -} -District* TPCCTables::findDistrict(int32_t w_id, int32_t d_id) { - return find(districts_, makeDistrictKey(w_id, d_id)); -} - -static int32_t makeCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - int32_t id = (w_id * District::NUM_PER_WAREHOUSE + d_id) - * Customer::NUM_PER_DISTRICT + c_id; - assert(id >= 0); - return id; -} - -void TPCCTables::insertCustomer(const Customer& customer) { - Customer* c = insert(&customers_, makeCustomerKey(customer.c_w_id, customer.c_d_id, customer.c_id), customer); - assert(customers_by_name_.find(c) == customers_by_name_.end()); - customers_by_name_.insert(c); -} -Customer* TPCCTables::findCustomer(int32_t w_id, int32_t d_id, int32_t c_id) { - return find(customers_, makeCustomerKey(w_id, d_id, c_id)); -} - -Customer* TPCCTables::findCustomerByName(int32_t w_id, int32_t d_id, const char* c_last) { - // select (w_id, d_id, *, c_last) order by c_first - Customer c; - c.c_w_id = w_id; - c.c_d_id = d_id; - strcpy(c.c_last, c_last); - c.c_first[0] = '\0'; - CustomerByNameSet::const_iterator it = customers_by_name_.lower_bound(&c); - assert(it != customers_by_name_.end()); - assert((*it)->c_w_id == w_id && (*it)->c_d_id == d_id && strcmp((*it)->c_last, c_last) == 0); - - // go to the "next" c_last - // TODO: This is a GROSS hack. Can we do better? - int length = static_cast<int>(strlen(c_last)); - if (length == Customer::MAX_LAST) { - c.c_last[length-1] = static_cast<char>(c.c_last[length-1] + 1); - } else { - c.c_last[length] = 'A'; - c.c_last[length+1] = '\0'; - } - CustomerByNameSet::const_iterator stop = customers_by_name_.lower_bound(&c); - - Customer* customer = NULL; - // Choose position n/2 rounded up (1 based addressing) = floor((n-1)/2) - if (it != stop) { - CustomerByNameSet::const_iterator middle = it; - ++it; - int i = 0; - while (it != stop) { - // Increment the middle iterator on every second iteration - if (i % 2 == 1) { - ++middle; - } - assert(strcmp((*it)->c_last, c_last) == 0); - ++it; - ++i; - } - // There were i+1 matching last names - customer = *middle; - } - - assert(customer->c_w_id == w_id && customer->c_d_id == d_id && - strcmp(customer->c_last, c_last) == 0); - return customer; -} - -static int32_t makeOrderKey(int32_t w_id, int32_t d_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - // TODO: This is bad for locality since o_id is in the most significant position. Larger keys? - int32_t id = (o_id * District::NUM_PER_WAREHOUSE + d_id) - * Warehouse::MAX_WAREHOUSE_ID + w_id; - assert(id >= 0); - return id; -} - -static int64_t makeOrderByCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - int32_t top_id = (w_id * District::NUM_PER_WAREHOUSE + d_id) * Customer::NUM_PER_DISTRICT - + c_id; - assert(top_id >= 0); - int64_t id = (((int64_t) top_id) << 32) | o_id; - assert(id > 0); - return id; -} - -void TPCCTabl... [truncated message content] |
From: <yan...@us...> - 2009-03-18 11:41:05
|
Revision: 1305 http://assorted.svn.sourceforge.net/assorted/?rev=1305&view=rev Author: yangzhang Date: 2009-03-18 11:40:56 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - finished adding all tpcc txns Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-18 09:59:25 UTC (rev 1304) +++ ydb/trunk/README 2009-03-18 11:40:56 UTC (rev 1305) @@ -746,7 +746,10 @@ has caught up to the first seqno in the buffer - DONE implement TPC-C network recovery -- TODO full TPC-C txn workload +- DONE full TPC-C txn workload + - readonly: order_status, stock_level + - overwhelming majority are payment/neworder +- TODO skip processing readonly txns on catch-up - TODO prelim measurements on cluster - TODO PAPER!!! Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-18 09:59:25 UTC (rev 1304) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 11:40:56 UTC (rev 1305) @@ -908,11 +908,107 @@ unique_ptr<TPCCTables> g_tables; void +mkres(TpccRes *res, const OrderStatusOutput &output) +{ + OrderStatusOutputMsg &msg = *res->mutable_order_status(); + msg.set_c_id(output.c_id); + msg.set_c_balance(output.c_balance); + msg.set_o_id(output.o_id); + msg.set_o_carrier_id(output.o_carrier_id); + foreach (const OrderStatusOutput::OrderLineSubset &src, output.lines) { + OrderLineSubsetMsg &dst = *msg.add_line(); + dst.set_ol_i_id(src.ol_i_id); + dst.set_ol_supply_w_id(src.ol_supply_w_id); + dst.set_ol_quantity(src.ol_quantity); + dst.set_ol_amount(src.ol_amount); + dst.set_ol_delivery_d(src.ol_delivery_d); + } + msg.set_c_first(output.c_first); + msg.set_c_middle(output.c_middle); + msg.set_c_last(output.c_last); + msg.set_o_entry_d(output.o_entry_d); +} + +void +mkres(TpccRes *res, const PaymentOutput &output) +{ + PaymentOutputMsg &msg = *res->mutable_payment(); + + WarehouseMsg &w = *msg.mutable_warehouse(); + w.set_w_id(output.warehouse.w_id); + w.set_w_tax(output.warehouse.w_tax); + w.set_w_ytd(output.warehouse.w_ytd); + w.set_w_name(output.warehouse.w_name); + w.set_w_street_1(output.warehouse.w_street_1); + w.set_w_street_2(output.warehouse.w_street_2); + w.set_w_city(output.warehouse.w_city); + w.set_w_state(output.warehouse.w_state); + w.set_w_zip(output.warehouse.w_zip); + + DistrictMsg &d = *msg.mutable_district(); + d.set_d_id(output.district.d_id); + d.set_d_w_id(output.district.d_w_id); + d.set_d_tax(output.district.d_tax); + d.set_d_ytd(output.district.d_ytd); + d.set_d_next_o_id(output.district.d_next_o_id); + d.set_d_name(output.district.d_name); + d.set_d_street_1(output.district.d_street_1); + d.set_d_street_2(output.district.d_street_2); + d.set_d_city(output.district.d_city); + d.set_d_state(output.district.d_state); + d.set_d_zip(output.district.d_zip); + + CustomerMsg &c = *msg.mutable_customer(); + c.set_c_id(output.customer.c_id); + c.set_c_d_id(output.customer.c_d_id); + c.set_c_w_id(output.customer.c_w_id); + c.set_c_credit_lim(output.customer.c_credit_lim); + c.set_c_discount(output.customer.c_discount); + c.set_c_balance(output.customer.c_balance); + c.set_c_ytd_payment(output.customer.c_ytd_payment); + c.set_c_payment_cnt(output.customer.c_payment_cnt); + c.set_c_delivery_cnt(output.customer.c_delivery_cnt); + c.set_c_first(output.customer.c_first); + c.set_c_middle(output.customer.c_middle); + c.set_c_last(output.customer.c_last); + c.set_c_street_1(output.customer.c_street_1); + c.set_c_street_2(output.customer.c_street_2); + c.set_c_city(output.customer.c_city); + c.set_c_state(output.customer.c_state); + c.set_c_zip(output.customer.c_zip); + c.set_c_phone(output.customer.c_phone); + c.set_c_since(output.customer.c_since); + c.set_c_credit(output.customer.c_credit); + c.set_c_data(output.customer.c_data); +} + +void process_tpcc(const TpccReq &req, int &seqno, TpccRes *res) { checkeq(req.seqno(), seqno + 1); ++seqno; - if (req.has_new_order()) { + if (res != nullptr) { + res->Clear(); + res->set_seqno(seqno); + } + if (req.has_stock_level()) { + const StockLevelMsg &sl = req.stock_level(); + int result = g_tables->stockLevel(sl.warehouse_id(), sl.district_id(), sl.threshold()); + if (res != nullptr) { + StockLevelOutputMsg &msg = *res->mutable_stock_level(); + msg.set_result(result); + } + } else if (req.has_order_status_1()) { + const OrderStatusMsg1 &os = req.order_status_1(); + OrderStatusOutput output; + g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.customer_id(), &output); + if (res != nullptr) mkres(res, output); + } else if (req.has_order_status_2()) { + const OrderStatusMsg2 &os = req.order_status_2(); + OrderStatusOutput output; + g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.c_last().c_str(), &output); + if (res != nullptr) mkres(res, output); + } else if (req.has_new_order()) { const NewOrderMsg &no = req.new_order(); vector<NewOrderItem> items(no.item_size()); for (int i = 0; i < no.item_size(); ++i) { @@ -927,28 +1023,52 @@ no.customer_id(), items, no.now().c_str(), &output); if (res != nullptr) { - res->Clear(); - res->set_seqno(seqno); - NewOrderOutputMsg &outmsg = *res->mutable_new_order(); - outmsg.set_w_tax(output.w_tax); - outmsg.set_d_tax(output.d_tax); - outmsg.set_o_id(output.o_id); - outmsg.set_c_discount(output.c_discount); - outmsg.set_total(output.total); + NewOrderOutputMsg &msg = *res->mutable_new_order(); + msg.set_w_tax(output.w_tax); + msg.set_d_tax(output.d_tax); + msg.set_o_id(output.o_id); + msg.set_c_discount(output.c_discount); + msg.set_total(output.total); foreach (const NewOrderOutput::ItemInfo &src, output.items) { - ItemInfoMsg &dst = *outmsg.add_item(); + ItemInfoMsg &dst = *msg.add_item(); dst.set_s_quantity(src.s_quantity); dst.set_i_price(src.i_price); dst.set_ol_amount(src.ol_amount); dst.set_brand_generic(src.brand_generic); dst.set_i_name(src.i_name); } - outmsg.set_c_last(output.c_last); - outmsg.set_c_credit(output.c_credit); - outmsg.set_status(output.status); + msg.set_c_last(output.c_last); + msg.set_c_credit(output.c_credit); + msg.set_status(output.status); } + } else if (req.has_payment_1()) { + const PaymentMsg1 &p = req.payment_1(); + PaymentOutput output; + g_tables->payment(p.warehouse_id(), p.district_id(), p.c_warehouse_id(), + p.c_district_id(), p.customer_id(), p.h_amount(), + p.now().c_str(), &output); + if (res != nullptr) mkres(res, output); + } else if (req.has_payment_2()) { + const PaymentMsg2 &p = req.payment_2(); + PaymentOutput output; + g_tables->payment(p.warehouse_id(), p.district_id(), p.c_warehouse_id(), + p.c_district_id(), p.c_last().c_str(), p.h_amount(), + p.now().c_str(), &output); + if (res != nullptr) mkres(res, output); + } else if (req.has_delivery()) { + const DeliveryMsg &d = req.delivery(); + vector<DeliveryOrderInfo> orders; + g_tables->delivery(d.warehouse_id(), d.carrier_id(), d.now().c_str(), &orders); + if (res != nullptr) { + DeliveryOutputMsg &msg = *res->mutable_delivery(); + foreach (const DeliveryOrderInfo &src, orders) { + DeliveryOrderInfoMsg &dst = *msg.add_order(); + dst.set_d_id(src.d_id); + dst.set_o_id(src.o_id); + } + } } else { - throw_not_implemented(); + ASSERT(false); } } @@ -2456,24 +2576,48 @@ // Executes the TPC-C "slev" transaction. From the last 20 orders, returns the number of rows in // the STOCK table that have S_QUANTITY < threshold. See TPC-C 2.8 (page 43). int stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { - warehouse_id = district_id = threshold = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + StockLevelMsg &sl = *req_.mutable_stock_level(); + sl.set_warehouse_id(warehouse_id); + sl.set_district_id(district_id); + sl.set_threshold(threshold); + + ser(writer_, req_); + return 0; } // Executes the TPC-C order status transaction. Find the customer's last order and check the // delivery date of each item on the order. See TPC-C 2.6 (page 36). void orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, OrderStatusOutput* output) { - warehouse_id = district_id = customer_id = 0; output = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + OrderStatusMsg1 &os = *req_.mutable_order_status_1(); + os.set_warehouse_id(warehouse_id); + os.set_district_id(district_id); + os.set_customer_id(customer_id); + + ser(writer_, req_); + output = nullptr; } // Executes the TPC-C order status transaction. Find the customer's last order and check the // delivery date of each item on the order. See TPC-C 2.6 (page 36). void orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, OrderStatusOutput* output) { - warehouse_id = district_id = 0; c_last = 0; output = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + OrderStatusMsg2 &os = *req_.mutable_order_status_2(); + os.set_warehouse_id(warehouse_id); + os.set_district_id(district_id); + os.set_c_last(c_last); + + ser(writer_, req_); + output = nullptr; } // Executes the TPC-C new order transaction. Enter the new order for customer_id into the @@ -2506,9 +2650,20 @@ void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, PaymentOutput* output) { - warehouse_id = district_id = c_district_id = c_warehouse_id = customer_id = 0; - h_amount = 0; now = 0; output = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + PaymentMsg1 &p = *req_.mutable_payment_1(); + p.set_warehouse_id(warehouse_id); + p.set_district_id(district_id); + p.set_c_warehouse_id(c_warehouse_id); + p.set_c_district_id(c_district_id); + p.set_customer_id(customer_id); + p.set_h_amount(h_amount); + p.set_now(now); + + ser(writer_, req_); + output = nullptr; } // Executes the TPC-C payment transaction. Add h_amount to the customer's account. @@ -2516,17 +2671,36 @@ void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, int32_t c_district_id, const char* c_last, float h_amount, const char* now, PaymentOutput* output) { - warehouse_id = district_id = c_warehouse_id = c_district_id = 0; - h_amount = 0; c_last = now = 0; output = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + PaymentMsg2 &p = *req_.mutable_payment_2(); + p.set_warehouse_id(warehouse_id); + p.set_district_id(district_id); + p.set_c_warehouse_id(c_warehouse_id); + p.set_c_district_id(c_district_id); + p.set_c_last(c_last); + p.set_h_amount(h_amount); + p.set_now(now); + + ser(writer_, req_); + output = nullptr; } // Executes the TPC-C delivery transaction. Delivers the oldest undelivered transaction in each // district in warehouse_id. See TPC-C 2.7 (page 39). void delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, vector<DeliveryOrderInfo>* orders) { - warehouse_id = carrier_id = 0; now = 0; orders = 0; - throw_not_implemented(); + req_.Clear(); + req_.set_seqno(seqno_); + + DeliveryMsg &d = *req_.mutable_delivery(); + d.set_warehouse_id(warehouse_id); + d.set_carrier_id(carrier_id); + d.set_now(now); + + ser(writer_, req_); + orders = nullptr; } }; Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-18 09:59:25 UTC (rev 1304) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-18 11:40:56 UTC (rev 1305) @@ -68,9 +68,11 @@ vector<DeliveryOrderInfo> orders; db_->delivery(generateWarehouse(), carrier, now, &orders); +#ifdef NDEBUG if (orders.size() != District::NUM_PER_WAREHOUSE) { printf("Only delivered from %zd districts\n", orders.size()); } +#endif } void TPCCClient::doPayment() { @@ -144,22 +146,24 @@ // This is not strictly accurate: The requirement is for certain *minimum* percentages to be // maintained. This is close to the right thing, but not precisely correct. // See TPC-C 5.2.4 (page 68). - doNewOrder(); -#if 0 int x = generator_->number(1, 100); if (x <= 4) { // 4% + //printf("stocklevel\n"); doStockLevel(); } else if (x <= 8) { // 4% + //printf("delivery\n"); doDelivery(); } else if (x <= 12) { // 4% + //printf("orderstatus\n"); doOrderStatus(); } else if (x <= 12+43) { // 43% + //printf("payment\n"); doPayment(); } else { // 45% + //printf("neworder\n"); ASSERT(x > 100-45); doNewOrder(); } -#endif } int32_t TPCCClient::generateWarehouse() { Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-03-18 09:59:25 UTC (rev 1304) +++ ydb/trunk/src/ydb.proto 2009-03-18 11:40:56 UTC (rev 1305) @@ -87,9 +87,27 @@ } // -// TPCC messages +// TPCC request messages // +message StockLevelMsg { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 threshold = 3; +} + +message OrderStatusMsg1 { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 customer_id = 3; +} + +message OrderStatusMsg2 { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required string c_last = 3; +} + message NewOrderItemMsg { required int32 i_id = 1; required int32 ol_supply_w_id = 2; @@ -104,6 +122,71 @@ required string now = 5; } +message PaymentMsg1 { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 c_warehouse_id = 3; + required int32 c_district_id = 4; + required int32 customer_id = 5; + required float h_amount = 6; + required string now = 7; +} + +message PaymentMsg2 { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 c_warehouse_id = 3; + required int32 c_district_id = 4; + required string c_last = 5; + required float h_amount = 6; + required string now = 7; +} + +message DeliveryMsg { + required int32 warehouse_id = 1; + required int32 carrier_id = 2; + required string now = 3; +} + +message TpccReq { + required int32 seqno = 1; + optional StockLevelMsg stock_level = 2; + optional OrderStatusMsg1 order_status_1 = 3; + optional OrderStatusMsg2 order_status_2 = 4; + optional NewOrderMsg new_order = 5; + optional PaymentMsg1 payment_1 = 6; + optional PaymentMsg2 payment_2 = 7; + optional DeliveryMsg delivery = 8; +} + +// +// TPCC response messages. +// + +message StockLevelOutputMsg { + required int32 result = 1; +} + +message OrderLineSubsetMsg { + required int32 ol_i_id = 1; + required int32 ol_supply_w_id = 2; + required int32 ol_quantity = 3; + required float ol_amount = 4; + required string ol_delivery_d = 5; +} + +message OrderStatusOutputMsg { + required int32 c_id = 1; + required float c_balance = 2; + required int32 o_id = 3; + required int32 o_carrier_id = 4; + repeated OrderLineSubsetMsg line = 5; + required string c_first = 6; + required string c_middle = 7; + required string c_last = 8; + required string o_entry_d = 9; +} + message ItemInfoMsg { required int32 s_quantity = 1; required float i_price = 2; @@ -136,12 +219,76 @@ required string status = 9; } -message TpccReq { - required int32 seqno = 1; - optional NewOrderMsg new_order = 2; +message WarehouseMsg { + required int32 w_id = 1; + required float w_tax = 2; + required float w_ytd = 3; + required string w_name = 4; + required string w_street_1 = 5; + required string w_street_2 = 6; + required string w_city = 7; + required string w_state = 8; + required string w_zip = 9; } +message DistrictMsg { + required int32 d_id = 1; + required int32 d_w_id = 2; + required float d_tax = 3; + required float d_ytd = 4; + required int32 d_next_o_id = 5; + required string d_name = 6; + required string d_street_1 = 7; + required string d_street_2 = 8; + required string d_city = 9; + required string d_state = 10; + required string d_zip = 11; +} + +message CustomerMsg { + required int32 c_id = 1; + required int32 c_d_id = 2; + required int32 c_w_id = 3; + required float c_credit_lim = 4; + required float c_discount = 5; + required float c_balance = 6; + required float c_ytd_payment = 7; + required int32 c_payment_cnt = 8; + required int32 c_delivery_cnt = 9; + required string c_first = 10; + required string c_middle = 11; + required string c_last = 12; + required string c_street_1 = 13; + required string c_street_2 = 14; + required string c_city = 15; + required string c_state = 16; + required string c_zip = 17; + required string c_phone = 18; + required string c_since = 19; + required string c_credit = 20; + required string c_data = 21; +} + +message PaymentOutputMsg { + required WarehouseMsg warehouse = 1; + required DistrictMsg district = 2; + required CustomerMsg customer = 3; +} + +message DeliveryOrderInfoMsg { + required int32 d_id = 1; + required int32 o_id = 2; +} + +message DeliveryOutputMsg { + repeated DeliveryOrderInfoMsg order = 1; +} + message TpccRes { required int32 seqno = 1; - optional NewOrderOutputMsg new_order = 2; + optional StockLevelOutputMsg stock_level = 2; + optional OrderStatusOutputMsg order_status = 3; + optional NewOrderOutputMsg new_order = 4; + optional PaymentOutputMsg payment = 5; + optional DeliveryOutputMsg delivery = 6; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-18 19:25:57
|
Revision: 1306 http://assorted.svn.sourceforge.net/assorted/?rev=1306&view=rev Author: yangzhang Date: 2009-03-18 19:25:42 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - tpccdb: fixed buf overflow, h_data should have size MAX_DATA + 1 - general: - fixed the tpcc_response_handler to have proper termination condition - fixed optimized build issues - tested and verified that scaling, rec fully work - test.bash: - added cog to setups - adjusted rec, scaling to use tpcc (and lowered some constants) - added locking - tpcctables: safely delete neworders, avoiding deletion if in serbuf - btree: added conditional compilation of NODE_INNER, NODE_LEAF just to be safe - tpccclient: fixed inverted NDEBUG - tpcc/Makefile: handle OPT - Makefile: use g++ for linking Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/tpcc/tpccdb.h ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/tools/test.bash Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/Makefile 2009-03-18 19:25:42 UTC (rev 1306) @@ -32,12 +32,12 @@ PPROF := -lprofiler endif ifneq ($(OPT),) - OPT := -O3 -Wdisabled-optimization -DNDEBUG + OPT := -g3 -O3 -Wdisabled-optimization -DNDEBUG else OPT := -g3 endif # CXX := $(WTF) ag++ -k --Xcompiler # $(CXX) -CXX := $(WTF) $(CXX) -pipe +CXX := $(WTF) ccache $(CXX) -pipe LDFLAGS := -pthread $(GPROF) LDLIBS := -lstx -lst -lresolv -lprotobuf -lgtest \ -lboost_program_options-gcc43-mt -lboost_thread-gcc43-mt \ @@ -94,7 +94,7 @@ all: $(TARGET) $(TARGET): $(OBJS) - $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ + $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) -o $@ %.pb.o: %.pb.cc %.pb.h $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< @@ -137,6 +137,7 @@ clean: rm -f $(GENSRCS) $(GENHDRS) $(OBJS) $(TARGET) main.lzz *.clamp_h + make -C tpcc/ clean distclean: clean rm -f all.h all.h.gch Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 19:25:42 UTC (rev 1306) @@ -1130,7 +1130,6 @@ finally f(lambda () { long long now = current_time_millis(); - stopped_issuing = true; showtput("processed", now, __ref(start_time), __ref(seqno), __ref(init_seqno)); if (!__ref(caught_up)) { @@ -1738,9 +1737,8 @@ // Read the message, but correctly respond to interrupts so that we can // cleanly exit (slightly tricky). - if (stopped_issuing) { - st_intr intr(stop_hub); - readmsg(reader, res); + if (stopped_issuing && last_seqno + 1 == seqno) { + break; } else { st_intr intr(kill_hub); readmsg(reader, res); @@ -2712,6 +2710,7 @@ long long start_time = current_time_millis(); finally f(lambda () { + stopped_issuing = true; showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), 0); }); Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/tpcc/Makefile 2009-03-18 19:25:42 UTC (rev 1306) @@ -1,12 +1,15 @@ WARNINGS = -Werror -Wall -Wextra -Wconversion -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings -Woverloaded-virtual -Wno-sign-compare -Wno-unused-parameter +CXX := ccache $(CXX) + # Debug flags -CXXFLAGS = -g -MD $(WARNINGS) -I.. -std=gnu++0x -# Optimization flags -#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) -std=gnu++0x +ifeq ($(OPT),) + CXXFLAGS = -g3 -MD $(WARNINGS) -std=gnu++0x +else + CXXFLAGS = -g3 -O3 -DNDEBUG -MD $(WARNINGS) -std=gnu++0x +endif # Link withthe C++ standard library -#LDFLAGS=-lstdc++ LDLIBS = -lgtest BINARIES = btree_test.o tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o @@ -18,7 +21,7 @@ %.cc: %.cc.cog cog.py $< > $@ -clean : +clean: rm -f *.o *.d $(BINARIES) -include *.d Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/tpcc/btree.h 2009-03-18 19:25:42 UTC (rev 1306) @@ -280,7 +280,9 @@ private: // Used when debugging +#ifndef NDEBUG enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; +#endif // Leaf nodes store pairs of keys and values. struct LeafNode { Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-18 19:25:42 UTC (rev 1306) @@ -68,7 +68,7 @@ vector<DeliveryOrderInfo> orders; db_->delivery(generateWarehouse(), carrier, now, &orders); -#ifdef NDEBUG +#ifndef NDEBUG if (orders.size() != District::NUM_PER_WAREHOUSE) { printf("Only delivered from %zd districts\n", orders.size()); } Modified: ydb/trunk/src/tpcc/tpccdb.h =================================================================== --- ydb/trunk/src/tpcc/tpccdb.h 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/tpcc/tpccdb.h 2009-03-18 19:25:42 UTC (rev 1306) @@ -211,7 +211,7 @@ int32_t h_w_id; float h_amount; char h_date[DATETIME_SIZE+1]; - char h_data[MAX_DATA]; + char h_data[MAX_DATA+1]; }; // Data returned by the "order status" transaction. Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-03-18 19:25:42 UTC (rev 1306) @@ -351,7 +351,7 @@ assert(neworder->no_d_id == d_id && neworder->no_w_id == warehouse_id); int32_t o_id = neworder->no_o_id; neworders_.erase(iterator); - delete neworder; + if (!within(serbuf_, neworder)) delete neworder; DeliveryOrderInfo order; order.d_id = d_id; Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-18 11:40:56 UTC (rev 1305) +++ ydb/trunk/tools/test.bash 2009-03-18 19:25:42 UTC (rev 1306) @@ -92,6 +92,37 @@ } # +# Access control +# + +node-lock() { + check-remote + if ! node-islocked ; then + sudo su - -c ' + echo AllowUsers root yang >> /etc/ssh/sshd_config && + /etc/init.d/ssh restart + ' + fi +} + +node-unlock() { + check-remote + sudo su - -c ' + sed -i "/^AllowUsers root yang$/ d" /etc/ssh/sshd_config && + /etc/init.d/ssh restart + ' +} + +node-islocked() { + check-remote + sudo grep '^AllowUsers root yang$' /etc/ssh/sshd_config +} + +lock() { parremote node-lock ; } +unlock() { parremote node-unlock ; } +islocked() { parremote node-islocked ; } + +# # Setup # @@ -191,6 +222,11 @@ PPROF=1 OPT=1 make WTF= ydb } +node-setup-cog() { + check-remote + toast --quiet arm 'http://nedbatchelder.com/code/cog/cog-2.1.tar.gz' +} + init-setup() { parremote node-init-setup } @@ -227,6 +263,7 @@ parremote node-setup-clamp parremote node-setup-gtest parremote node-setup-ghash + parremote node-setup-cog } setup-ydb() { @@ -289,17 +326,17 @@ rec-helper() { local leader=$1 shift - : ${seqno:=1000000} ${extraargs:=} - tagssh $leader "ydb/src/ydb -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $extraargs" & + : ${seqno:=100000} ${extraargs:=} + tagssh $leader "set -x; ydb/src/ydb --tpcc -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $extraargs" & sleep .1 # Run initial replicas. while (( $# > 1 )) ; do - tagssh $1 "ydb/src/ydb -H $leader" & + tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader" & shift done sleep .1 # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up $extraargs" & + tagssh $1 "set -x; ydb/src/ydb --tpcc -H $leader --yield-build-up --yield-catch-up $extraargs" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 @@ -389,10 +426,10 @@ scaling-helper() { local leader=$1 shift - tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 1000000 ${extraargs:-}" & + tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 100000 --tpcc ${extraargs:-}" & sleep .1 for rep in "$@" - do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader ${extraargs:-}" & + do tagssh $rep "CPUPROFILE=ydb.prof --args ydb/src/ydb -q -n $# -H $leader --tpcc ${extraargs:-}" & done wait } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-18 20:08:22
|
Revision: 1307 http://assorted.svn.sourceforge.net/assorted/?rev=1307&view=rev Author: yangzhang Date: 2009-03-18 20:08:05 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - main: fixed starting of extraneous recover_joiner's - test.bash: added whos, updated line-counts - removed wc.bash Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Removed Paths: ------------- ydb/trunk/tools/wc.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-18 19:25:42 UTC (rev 1306) +++ ydb/trunk/README 2009-03-18 20:08:05 UTC (rev 1307) @@ -749,8 +749,8 @@ - DONE full TPC-C txn workload - readonly: order_status, stock_level - overwhelming majority are payment/neworder +- DONE prelim measurements on cluster - TODO skip processing readonly txns on catch-up -- TODO prelim measurements on cluster - TODO PAPER!!! Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-18 19:25:42 UTC (rev 1306) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 20:08:05 UTC (rev 1307) @@ -2137,9 +2137,10 @@ ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); - st_joining join_rec(my_spawn(bind(recover_joiner, listener, - ref(send_states)), - "recover_joiner")); + if (init.txnseqno() == 0 && multirecover || mypos == 0) + st_joining join_rec(my_spawn(bind(recover_joiner, listener, + ref(send_states)), + "recover_joiner")); try { // If there's anything to recover. Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-18 19:25:42 UTC (rev 1306) +++ ydb/trunk/tools/test.bash 2009-03-18 20:08:05 UTC (rev 1307) @@ -315,6 +315,15 @@ " } +whos() { + xargs= parssh " + echo + hostname + echo ===== + w + " +} + times() { parssh date +%s.%N } @@ -350,7 +359,7 @@ # Recovery experient. exp-rec() { - for seqno in 500000 400000 300000 200000 100000 ; do # configurations + for seqno in 300000 200000 100000 ; do # configurations stop for i in {1..3} ; do # trials echo === seqno=$seqno i=$i === @@ -429,7 +438,7 @@ tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 100000 --tpcc ${extraargs:-}" & sleep .1 for rep in "$@" - do tagssh $rep "CPUPROFILE=ydb.prof --args ydb/src/ydb -q -n $# -H $leader --tpcc ${extraargs:-}" & + do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader --tpcc ${extraargs:-}" & done wait } @@ -566,7 +575,7 @@ } line-counts() { - wc -l "$(dirname "$0")/../src/"{main.lzz.clamp,ser.{h,cc},p2.cc,ydb.proto,Makefile,serperf.cc} \ + wc -l "$(dirname "$0")/../src/"{main.lzz.clamp,ser.{h,cc},p2.cc,ydb.proto,Makefile,serperf.cc,tpcc/{Makefile,*.{h,cc,cog}}} \ ~/ccom/src/{commons/{,st/}*.h,test/{*.*,Makefile}} } Deleted: ydb/trunk/tools/wc.bash =================================================================== --- ydb/trunk/tools/wc.bash 2009-03-18 19:25:42 UTC (rev 1306) +++ ydb/trunk/tools/wc.bash 2009-03-18 20:08:05 UTC (rev 1307) @@ -1,11 +0,0 @@ -#!/usr/bin/env bash - -cd "$(dirname "$0")/../src/" -wc -l main.lzz.clamp -{ - wc -l main.lzz.clamp - - cat main.lzz.clamp | - perl -n -e 'if (m /#include <commons/s) { s/.*<(.+)>.*/$1/; print }' | - xargs -I_ wc -l /home/yang/ccom/src/_ -} | cut -f1 -d' ' | numsum | xargs -I_ echo _ total This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-19 10:16:38
|
Revision: 1308 http://assorted.svn.sourceforge.net/assorted/?rev=1308&view=rev Author: yangzhang Date: 2009-03-19 10:16:33 +0000 (Thu, 19 Mar 2009) Log Message: ----------- - main: - skip read-only txns when process_tpcc with no response - added twal to process_tpccs - added twal recovery with (broken) network recovery - analysis: - tolerate missing aries-log in scaling() - fixed and simplified rec - updates notes Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc/tpcctables.h ydb/trunk/tools/analysis.py Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/README 2009-03-19 10:16:33 UTC (rev 1308) @@ -752,6 +752,17 @@ - DONE prelim measurements on cluster - TODO skip processing readonly txns on catch-up +- some high level issues in this project + - meeting each week with random new todo's + - many of the todo's - like improving the baseline performance - are pointless + - no clear objective/end goal + - no clear idea of what we're doing + - what's the high-level model of usage? e.g. are we considering disk-based + no-network/no-replica methods? + - is there really a vldb-worthy project here? + - if not, why did we set down this road a year ago? + - cut our losses? + - TODO PAPER!!! - TODO related work Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/src/main.lzz.clamp 2009-03-19 10:16:33 UTC (rev 1308) @@ -22,6 +22,7 @@ #include <cstring> // strsignal #include <fstream> // ofstream #include <google/dense_hash_map> +#include <google/protobuf/io/zero_copy_stream_impl.h> #include <gtest/gtest.h> #include <inttypes.h> // PRId64 #include <iostream> @@ -32,7 +33,7 @@ #include <sys/socket.h> // getpeername #include <sys/types.h> // ssize_t #include <tr1/unordered_map> -#include <unistd.h> // pipe, write +#include <unistd.h> // pipe, write, sync #include <vector> #include "ser.h" #include "tpcc/clock.h" @@ -53,6 +54,7 @@ using namespace boost::archive; using namespace commons; using namespace google; +using namespace google::protobuf::io; using namespace std; using namespace std::tr1; using namespace testing; @@ -86,20 +88,24 @@ // Configuration. st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, - stop_on_seqno, batch_size, handle_responses_display, + stop_on_seqno, batch_size, handle_responses_display, fail_seqno, catch_up_display, issue_display, nwarehouses, process_display; size_t accept_joiner_size, buf_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, do_tpcc, - suppress_txn_msgs, fake_bcast, force_ser, fake_exec; + use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, + suppress_txn_msgs, fake_bcast, force_ser, fake_exec, ship_log; long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; st_bool do_pause; +// On leader, signifies that a node is in fail mode. On replica, signifies that a node is in fail mode/recovering from the twal. +st_bool failed; +// The seqno on which we should resume. +st_channel<int> resume; bool stopped_issuing; // Statistics. @@ -584,6 +590,34 @@ check(msg.ParseFromArray(checkpass(src.read(len)), len)); } +template<typename T> +inline void +readmsg(istream &src, T &msg) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); +#if 0 + IstreamInputStream iis(&src); + LimitingInputStream lis(&iis, len); + check(msg.ParseFromZeroCopyStream(&lis)); +#else + char buf[len]; + src.read(buf, len); + check(msg.ParseFromArray(buf, len)); +#endif +} + +inline uint32_t +readlen(istream &src) +{ + uint32_t len; + src.read(reinterpret_cast<char*>(&len), sizeof len); + len = ntohl(len); + ASSERT(len < 10000); + return len; +} + enum { op_del, op_write, op_commit }; /** @@ -611,14 +645,30 @@ int op = op_commit; out & op; } + void flush() { of.flush(); } private: ofstream of; binary_oarchive out; }; +// TODO? +class txn_wal { +public: + txn_wal(const string &fname) : of(fname.c_str()) {} + void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } + void logbuf(const void *buf, size_t len) { + of.write(reinterpret_cast<const char*>(buf), len); + } + void flush() { of.flush(); } +private: + ofstream of; +}; + // Globals mii g_map; wal *g_wal; +txn_wal *g_twal; +//tpcc_wal *g_tpcc_wal; /** * Keep issuing transactions to the replicas. @@ -645,7 +695,7 @@ reader r(nullptr, rbuf.get(), rbuf.size()); function<void(const void*, size_t)> fn; if (use_twal) - fn = bind(&wal::logbuf, g_wal, _1, _2); + fn = bind(&txn_wal::logbuf, g_twal, _1, _2); else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) @@ -754,7 +804,7 @@ serbuf.clear(); ser(serbuf, batch); if (do_bcast) bcastbuf(fds, serbuf); - if (use_twal) g_wal->logbuf(serbuf); + if (use_twal) g_twal->logbuf(serbuf); } } else { // Reset if we have nobody to send to (incl. disk) or if we actually have @@ -858,14 +908,14 @@ showdatarate(const char *action, streamoff len, long long time) { cout << action << " of " << len << " bytes in " << time << " ms (" - << len / time / 1000 << " MB/s)" << endl; + << double(len) / double(time) / 1000 << " MB/s)" << endl; } void showdatarate(const char *action, size_t len, long long time) { cout << action << " of " << len << " bytes in " << time << " ms (" - << len / time / 1000 << " MB/s)" << endl; + << double(len) / double(time) / 1000 << " MB/s)" << endl; } void @@ -991,23 +1041,29 @@ res->Clear(); res->set_seqno(seqno); } + // First three are read-only txns, so doesn't make sense to exec if no res to + // put results. They constitute only 8% of the workload. if (req.has_stock_level()) { - const StockLevelMsg &sl = req.stock_level(); - int result = g_tables->stockLevel(sl.warehouse_id(), sl.district_id(), sl.threshold()); if (res != nullptr) { + const StockLevelMsg &sl = req.stock_level(); + int result = g_tables->stockLevel(sl.warehouse_id(), sl.district_id(), sl.threshold()); StockLevelOutputMsg &msg = *res->mutable_stock_level(); msg.set_result(result); } } else if (req.has_order_status_1()) { - const OrderStatusMsg1 &os = req.order_status_1(); - OrderStatusOutput output; - g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.customer_id(), &output); - if (res != nullptr) mkres(res, output); + if (res != nullptr) { + const OrderStatusMsg1 &os = req.order_status_1(); + OrderStatusOutput output; + g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.customer_id(), &output); + mkres(res, output); + } } else if (req.has_order_status_2()) { - const OrderStatusMsg2 &os = req.order_status_2(); - OrderStatusOutput output; - g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.c_last().c_str(), &output); - if (res != nullptr) mkres(res, output); + if (res != nullptr) { + const OrderStatusMsg2 &os = req.order_status_2(); + OrderStatusOutput output; + g_tables->orderStatus(os.warehouse_id(), os.district_id(), os.c_last().c_str(), &output); + mkres(res, output); + } } else if (req.has_new_order()) { const NewOrderMsg &no = req.new_order(); vector<NewOrderItem> items(no.item_size()); @@ -1079,7 +1135,11 @@ int mypos, int nnodes) { bool caught_up = init_seqno == 0; + // Means we're currently ignoring the incoming txns until we see a fail-ack + // from the leader. + bool depleting = false; long long start_time = current_time_millis(), + time_failed = -1, time_caught_up = caught_up ? start_time : -1; int seqno_caught_up = caught_up ? seqno : -1; // Used by joiner only to tell where we actually started (init_seqno is just @@ -1090,6 +1150,7 @@ int first_seqno_in_chunk = -1; TpccReq req; TpccRes res; + txn_wal &wal = *g_twal; function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { @@ -1124,8 +1185,7 @@ anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); writer w(lambda(const void *buf, size_t len) { - checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); + st_write(__ref(leader), buf, len); }, wbuf.get(), wbuf.size()); finally f(lambda () { @@ -1142,14 +1202,17 @@ __ref(w).mark_and_flush(); }); + function<void()> send_failure_msg = lambda() { + TpccRes &res = __ref(res); + writer &w = __ref(w); + res.Clear(); + res.set_seqno(-1); + ser(w, res); + w.mark_and_flush(); + }; + while (true) { - // Has replayer just caught up to the start of the chunk? - if (first_seqno_in_chunk == seqno + 1) { - process_buf(reader.anchor(), reader.start(), req, seqno); - reader.set_anchor(); - } - marker = reader.start(); { @@ -1164,33 +1227,80 @@ // Prepare recovery msg. send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); } else { - // Backlog (auto/implicit) or process. - if (!caught_up) { - // If we were at the start of a new buffer (our chunk was recently reset). - if (reader.buf().get() == marker) - first_seqno_in_chunk = req.seqno(); - // If we fully caught up. - if (req.seqno() == seqno + 1) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_tpccs caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; + + if (depleting) { + if (req.seqno() == -3) { + // Fail-ack. Should not be receiving anything until we resume. + failed.waitreset(); + send_failure_msg(); + // Note that we don't reset depleting; we want the next iteration to + // fall through to the next case in this if-else chain.... + + // Adjust reader so that the next xact (the first one after failure) + // will go to the start of the buffer; this is necessary for + // backlogging. + reader.set_anchor(); + shift_reader(reader); + } else if (!failed) { + // This is the first txn after resuming. Tell the recoverer task + // that this is the seqno to build up to (from another replica's + // log). + resume.push(req.seqno()); + depleting = false; } + // Ignore all other messages. } - if (caught_up) { - // Process. - process_tpcc(req, seqno, &res); - ser(w, res); - reader.set_anchor(); + + if (!depleting) { + if (req.seqno() == -3) { + // Ignore the fail-ack. + } else { + if (use_twal) wal.logbuf(marker, reader.start() - marker); + + // Backlog (auto/implicit) or process. + if (!caught_up) { + // If we were at the start of a new buffer (our chunk was recently reset). + if (reader.buf().get() == marker) + first_seqno_in_chunk = req.seqno(); + // If we fully caught up. + if (req.seqno() == seqno + 1) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_tpccs caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + } + if (caught_up) { + // Process. + process_tpcc(req, seqno, &res); + ser(w, res); + reader.set_anchor(); + } + + // Display/yield. + if (check_interval(req.seqno(), process_display)) + cout << (caught_up ? "processed req " : "backlogged req ") + << req.seqno() << endl; + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); + + // Die. + if (fail_seqno > 0 && req.seqno() == fail_seqno) { + cout << "process_tpccs failing on seqno " << fail_seqno; + time_failed = current_time_millis(); + showtput("; live-processed ", time_failed, start_time, seqno, 0); + ASSERT(init_seqno == 0); + caught_up = false; + depleting = true; + seqno = -1; + + failed.set(); + send_failure_msg(); + } + } } - // Display/yield. - if (check_interval(req.seqno(), process_display)) - cout << (caught_up ? "processed req " : "backlogged req ") - << req.seqno() << endl; - if (check_interval(req.seqno(), yield_interval)) st_sleep(0); } } @@ -1698,12 +1808,14 @@ { public: tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) + st_multichannel<long long> &recover_signals, + st_channel<st_netfd_t> &delreps, bool caught_up) : replica(replica), seqno(seqno), rid(rid), recover_signals(recover_signals), + delreps(delreps), caught_up(caught_up), sub(recover_signals.subscribe()), start_time(current_time_millis()), @@ -1744,38 +1856,48 @@ readmsg(reader, res); } - if (res.seqno() < last_seqno) - throw msg_exception(string("response seqno decreased from ") + - lexical_cast<string>(last_seqno) + " to " + - lexical_cast<string>(res.seqno())); + if (res.seqno() == -1) { + st_intr intr(stop_hub); + cout << "got a failed node" << endl; + delreps.push(replica); + readmsg(reader, res); + last_seqno = seqno - 1; + } else { - if (!caught_up) { - long long now = current_time_millis(), time_diff = now - start_time; - caught_up = true; - recover_signals.push(now); - cout << rid << ": " << "recovering node caught up; took " - << time_diff << " ms" << endl; - // This will cause the program to exit eventually, but cleanly, such that - // the recovery time will be set first, before the eventual exit (which - // may not even happen in the current iteration). - if (stop_on_recovery) { - cout << "stopping on recovery" << endl; - stop_hub.set(); + if (res.seqno() < last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(res.seqno())); + + if (!caught_up) { + long long now = current_time_millis(), time_diff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": " << "recovering node caught up; took " + << time_diff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } } - } - if (check_interval(res.seqno(), handle_responses_display)) { - cout << rid << ": " << "got response " << res.seqno() << " from " - << replica << "; "; - long long display_time = current_time_millis(); - showtput("handling", display_time, last_display_time, res.seqno(), - res.seqno() - handle_responses_display); - last_display_time = display_time; + if (check_interval(res.seqno(), handle_responses_display)) { + cout << rid << ": " << "got response " << res.seqno() << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, res.seqno(), + res.seqno() - handle_responses_display); + last_display_time = display_time; + } + if (check_interval(res.seqno(), yield_interval)) { + st_sleep(0); + } + last_seqno = res.seqno(); + } - if (check_interval(res.seqno(), yield_interval)) { - st_sleep(0); - } - last_seqno = res.seqno(); } } @@ -1815,6 +1937,7 @@ const int &seqno; int rid; st_multichannel<long long> &recover_signals; + st_channel<st_netfd_t> &delreps; bool caught_up; st_channel<long long> ⊂ long long start_time, recovery_start_time, recovery_end_time; @@ -1823,12 +1946,18 @@ void handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, - st_multichannel<long long> &recover_signals, bool caught_up) + st_multichannel<long long> &recover_signals, + st_channel<st_netfd_t> &delreps, bool caught_up) { - tpcc_response_handler h(replica, seqno, rid, recover_signals, caught_up); + tpcc_response_handler h(replica, seqno, rid, recover_signals, delreps, + caught_up); h.run(); } +struct recreq { + int start_seqno, end_seqno; +}; + /** * Help the recovering node. * @@ -1846,32 +1975,69 @@ recover_joiner(st_netfd_t listener, st_channel<recovery_t> &send_states) { + cout << "waiting for joiner" << endl; + recovery_t recovery; st_netfd_t joiner; - recovery_t recovery; - { - st_intr intr(stop_hub); - // Wait for the snapshot. - recovery = send_states.take(); - if (recovery == nullptr) { - return; + if (ship_log) { + { + st_intr intr(stop_hub); + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); } - // Wait for the new joiner. - joiner = checkerr(st_accept(listener, nullptr, nullptr, - ST_UTIME_NO_TIMEOUT)); + st_closing closing(joiner); + recreq r; + st_read(joiner, r); + commons::array<char> wbuf(buf_size); + writer writer(lambda(const void *buf, size_t len) { + st_write(__ref(joiner), buf, len); + }, wbuf.get(), wbuf.size()); + cout << "got joiner's connection, sending log from seqnos " + << r.start_seqno << " to " << r.end_seqno << endl; + + g_twal->flush(); + sync(); + ifstream inf("twal"); + long long start_time = current_time_millis(); + for (int seqno = 0; seqno < r.start_seqno; ++seqno) { + ASSERT(inf.good()); + inf.seekg(readlen(inf), ios::cur); + } + long long mid_time = current_time_millis(); + streamoff mid_off = inf.tellg(); + showdatarate("scanned log", mid_off, mid_time - start_time); + for (int seqno = r.start_seqno; seqno < r.end_seqno; ++seqno) { + ASSERT(inf.good()); + uint32_t len = readlen(inf); + inf.read(writer.reserve(len), len); + writer.mark(); + cout << seqno << ' ' << len << endl; + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + writer.mark_and_flush(); + long long end_time = current_time_millis(); + streamoff end_off = inf.tellg(); + showdatarate("shipped log", end_off - mid_off, end_time - mid_time); + } else { + { + st_intr intr(stop_hub); + // Wait for the snapshot. + recovery = send_states.take(); + if (recovery == nullptr) { + return; + } + // Wait for the new joiner. + joiner = checkerr(st_accept(listener, nullptr, nullptr, + ST_UTIME_NO_TIMEOUT)); + } + + st_closing closing(joiner); + cout << "got joiner's connection, sending recovery of " + << recovery.size() << " bytes" << endl; + long long start_time = current_time_millis(); + st_write(joiner, recovery.get(), recovery.size()); + long long diff = current_time_millis() - start_time; + showdatarate("sent recovery", recovery.size(), diff); } - - st_closing closing(joiner); - cout << "got joiner's connection, sending recovery of " - << recovery.size() << " bytes" << endl; - long long start_time = current_time_millis(); - checkeqnneg(st_write(joiner, recovery.get(), recovery.size(), - ST_UTIME_NO_TIMEOUT), - ssize_t(recovery.size())); - long long diff = current_time_millis() - start_time; -#if 0 - sendmsg(joiner, *recovery); -#endif - showdatarate("sent recovery", recovery.size(), diff); } void @@ -1893,8 +2059,10 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; - scoped_ptr<wal> twal(new wal(use_twal ? "twal" : "/dev/null")); - g_wal = twal.get(); + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -1934,10 +2102,13 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; + st_channel<st_netfd_t> delreps; foreach (const replica_info &r, replicas) newreps.push(r); - const function<void()> f = do_tpcc ? - bind(issue_tpcc, ref(newreps), ref(seqno), ref(accept_joiner)) : - bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); + function<void()> f; + if (do_tpcc) + f = bind(issue_tpcc, ref(newreps), ref(delreps), ref(seqno), ref(accept_joiner)); + else + f = bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); st_joining join_issue_txns(my_spawn(f, "issue_txns")); finally fin(bind(summarize, "LEADER", ref(seqno))); @@ -1950,7 +2121,7 @@ function<void()> fn; if (do_tpcc) fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), true); + ref(recover_signals), ref(delreps), true); else fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, ref(recover_signals), true); @@ -1985,7 +2156,7 @@ if (do_tpcc) handle_responses_joiner_fn = bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), false); + ref(recover_signals), ref(delreps), false); else handle_responses_joiner_fn = bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, @@ -2076,6 +2247,7 @@ << current_time_millis() - start_time << " ms" << endl; tables->show(); } + recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); finally f(bind(summarize, "REPLICA", ref(seqno))); st_channel<recovery_t> send_states; @@ -2115,14 +2287,16 @@ INET_ADDRSTRLEN)) << ':' << sa.port() << (is_self ? " (self)" : "") << endl; if (is_self) mypos = i; - if (!is_self && init.txnseqno() > 0) { + if (!is_self && (init.txnseqno() > 0 || rec_twal)) { replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(sa.port()), timeout)); } } - // Initialize physical log. + // Initialize physical or txn log. + scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + g_twal = twal.get(); scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); g_wal = pwal.get(); @@ -2137,29 +2311,99 @@ ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); - if (init.txnseqno() == 0 && multirecover || mypos == 0) - st_joining join_rec(my_spawn(bind(recover_joiner, listener, - ref(send_states)), - "recover_joiner")); + st_joining join_rec(init.txnseqno() == 0 && (multirecover || mypos == 0) ? + my_spawn(bind(recover_joiner, listener, ref(send_states)), + "recover_joiner") : + nullptr); try { // If there's anything to recover. - if (init.txnseqno() > 0) { + if (init.txnseqno() > 0 || fail_seqno > 0) { if (do_tpcc) { // // TPCC txns // - g_tables.reset(new TPCCTables); + function<void()> rec_twal_fn = lambda() { + int &seqno = __ref(seqno); + cout << "recovering from twal" << endl; + long long start_time = current_time_millis(); + g_twal->flush(); + sync(); + ifstream inf("twal"); + TpccReq req; + while (inf.peek() != ifstream::traits_type::eof()) { + ASSERT(inf.good()); + readmsg(inf, req); + process_tpcc(req, seqno, nullptr); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + showdatarate("recovered from twal", inf.tellg(), + current_time_millis() - start_time); + cout << "now at seqno " << seqno << endl; + }; + function<void()> recv_log_fn = lambda() { + st_netfd_t src = __ref(replicas[0]); + int &seqno = __ref(seqno); + ASSERT(fail_seqno == seqno); + recreq r = { fail_seqno + 1, resume.take() }; + st_write(src, r); + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + function<void(anchored_stream_reader &reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + shift_reader(reader); + }; + anchored_stream_reader reader(st_read_fn(src), + st_read_fully_fn(src), + overflow_fn, rbuf.get(), rbuf.size()); + TpccReq req; + while (seqno < r.end_seqno) { + { st_intr intr(stop_hub); readmsg(reader, req); } + process_tpcc(req, seqno, nullptr); + reader.set_anchor(); + if (check_interval(seqno, yield_interval)) st_sleep(0); + } + }; + + if (rec_twal) { + failed.waitset(); + g_tables.reset(new TPCCTables); + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); + commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), + orig.size() - sizeof(tpcc_recovery_header)); + g_tables->deser(mypos, init.node_size(), hdr, body); + body.release(); + rec_twal_fn(); + failed.reset(); + recv_log_fn(); + } + +#if 0 + st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); + st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); + + st_join(rec_twal_thread); + st_join(recv_log_thread); +#endif + if (rec_pwal) { + // Recover from phy log. + } else if (rec_twal) { + // Recover from txn log. } else { + g_tables.reset(new TPCCTables); + // // Build-up // + if (ship_log) { + } else { + // XXX indent + cout << "waiting for recovery message" << (multirecover ? "s" : "") << endl; long long before_recv = current_time_millis(); @@ -2237,6 +2481,8 @@ foreach (st_thread_t t, recovery_builders) { st_join(t); } + + } } // @@ -2558,6 +2804,13 @@ void flush() { writer_.mark_and_flush(); } void set_seqno(int seqno) { seqno_ = seqno; } + void sendStop() { + req_.Clear(); + req_.set_seqno(-1); + ser(writer_, req_); + writer_.mark_and_flush(); + } + void sendRec() { req_.Clear(); req_.set_seqno(-2); @@ -2565,9 +2818,9 @@ writer_.mark_and_flush(); } - void sendStop() { + void sendFailAck() { req_.Clear(); - req_.set_seqno(-1); + req_.set_seqno(-3); ser(writer_, req_); writer_.mark_and_flush(); } @@ -2704,7 +2957,9 @@ }; void -issue_tpcc(st_channel<replica_info> &newreps, int &seqno, +issue_tpcc(st_channel<replica_info> &newreps, + st_channel<st_netfd_t> &delreps, + int &seqno, st_bool &accept_joiner) { vector<st_netfd_t> fds; @@ -2732,7 +2987,7 @@ // one) to prepare to send recovery information (by sending an // empty/default Txn). // XXX rec_pwal - if (!newreps.empty() && seqno > 0 && !rec_pwal) { + if (!newreps.empty() && seqno > 0 && !rec_pwal && !rec_twal) { tables.sendRec(); } @@ -2741,6 +2996,10 @@ while (!newreps.empty()) { fds.push_back(newreps.take().fd()); } + while (!delreps.empty()) { + tables.sendFailAck(); + fds.erase( find(fds.begin(), fds.end(), delreps.take()) ); + } tables.set_seqno(seqno); client.doOne(); @@ -2863,10 +3122,14 @@ "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), "recover from multiple hosts, instead of just one (specified via leader)") + ("rec-twal", po::bool_switch(&rec_twal), + "recover from twal") ("rec-pwal", po::bool_switch(&rec_pwal), "recover from pwal") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") + ("ship-log", po::bool_switch(&ship_log), + "ship the log instead of the complete database state") ("dump,D", po::bool_switch(&dump), "replicas should finally dump their state to a tmp file for " "inspection/diffing") @@ -2930,6 +3193,9 @@ ("warehouses,w", po::value<int>(&nwarehouses)->default_value(1), "number of warehouses to model") + ("fail-seqno", + po::value<int>(&fail_seqno)->default_value(0), + "fail after processing this seqno (for replica only)") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-19 10:16:33 UTC (rev 1308) @@ -68,11 +68,13 @@ vector<DeliveryOrderInfo> orders; db_->delivery(generateWarehouse(), carrier, now, &orders); +#if 0 #ifndef NDEBUG if (orders.size() != District::NUM_PER_WAREHOUSE) { printf("Only delivered from %zd districts\n", orders.size()); } #endif +#endif } void TPCCClient::doPayment() { Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-03-19 10:16:33 UTC (rev 1308) @@ -194,6 +194,8 @@ // Modify the order id to assign it d->d_next_o_id += 1; + //XXX pwal.write(d); + //XXX pwal.write(d->d_next_o_id); Warehouse* w = findWarehouse(warehouse_id); output->w_tax = w->w_tax; Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-03-19 10:16:33 UTC (rev 1308) @@ -152,6 +152,8 @@ std::vector<const History*> history_; commons::sized_array<char> serbuf_; + + //XXX raw_writer pwal; }; #endif Modified: ydb/trunk/tools/analysis.py =================================================================== --- ydb/trunk/tools/analysis.py 2009-03-18 20:08:05 UTC (rev 1307) +++ ydb/trunk/tools/analysis.py 2009-03-19 10:16:33 UTC (rev 1308) @@ -117,27 +117,44 @@ r'=== n=(?P<n>-?\d+) ', r'issued .*\((?P<tps>[.\d]+) tps\)' ]) - print 'file:', getname(ariespath) - res2 = logextract(ariespath, 'n', [ - r'=== n=(?P<n>-?\d+) ', - r'issued .*\((?P<tps>[.\d]+) tps\)' ]) + if path(ariespath).exists(): + print 'file:', getname(ariespath) + res2 = logextract(ariespath, 'n', [ + r'=== n=(?P<n>-?\d+) ', + r'issued .*\((?P<tps>[.\d]+) tps\)' ]) - print hstack([res2['n'], res0['n'][:1], res['n']]) - print hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]) - print hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']]) + print hstack([res2['n'], res0['n'][:1], res['n']]) + print hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]) + print hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']]) - errorbar( - hstack([res2['n'], res0['n'][:1], res['n']]), - hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]), - hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']])) - title('Scaling of baseline throughput with number of nodes') - xlabel('Node count') - ylabel('Mean TPS (stdev error bars)') - xlim(hstack([res2['n'], res['n']]).min() - .5, - hstack([res2['n'], res['n']]).max() + .5) - ylim(ymin = 0) - savefig('scaling.png') + errorbar( + hstack([res2['n'], res0['n'][:1], res['n']]), + hstack([res2['tps mean'], res0['tps mean'][:1], res['tps mean']]), + hstack([res2['tps sd'], res0['tps sd'][:1], res['tps sd']])) + title('Scaling of baseline throughput with number of nodes') + xlabel('Node count') + ylabel('Mean TPS (stdev error bars)') + xlim(hstack([res2['n'], res['n']]).min() - .5, + hstack([res2['n'], res['n']]).max() + .5) + ylim(ymin = 0) + savefig('scaling.png') + else: + print hstack([res0['n'][:1], res['n']]) + print hstack([res0['tps mean'][:1], res['tps mean']]) + print hstack([res0['tps sd'][:1], res['tps sd']]) + errorbar( + hstack([res0['n'][:1], res['n']]), + hstack([res0['tps mean'][:1], res['tps mean']]), + hstack([res0['tps sd'][:1], res['tps sd']])) + title('Scaling of baseline throughput with number of nodes') + xlabel('Node count') + ylabel('Mean TPS (stdev error bars)') + xlim(hstack([res['n']]).min() - .5, + hstack([res['n']]).max() + .5) + ylim(ymin = 0) + savefig('scaling.png') + def run(singlepath, multipath): singlepath, multipath = map(path, [singlepath, multipath]) ress = [] This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |