assorted-commits Mailing List for Assorted projects (Page 25)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-03-18 09:37:13
|
Revision: 1302 http://assorted.svn.sourceforge.net/assorted/?rev=1302&view=rev Author: yangzhang Date: 2009-03-18 09:36:57 +0000 (Wed, 18 Mar 2009) Log Message: ----------- added demo of inheritance Added Paths: ----------- sandbox/trunk/src/cc/inheritance/ sandbox/trunk/src/cc/inheritance/Makefile sandbox/trunk/src/cc/inheritance/README sandbox/trunk/src/cc/inheritance/main.cc sandbox/trunk/src/cc/inheritance/sub.cc sandbox/trunk/src/cc/inheritance/sub.h sandbox/trunk/src/cc/inheritance/super.h Added: sandbox/trunk/src/cc/inheritance/Makefile =================================================================== --- sandbox/trunk/src/cc/inheritance/Makefile (rev 0) +++ sandbox/trunk/src/cc/inheritance/Makefile 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,4 @@ +all: main +main: main.o sub.o +%: %.o + $(LINK.cc) $^ $(LOADLIBES) $(LDLIBS) -o $@ Added: sandbox/trunk/src/cc/inheritance/README =================================================================== --- sandbox/trunk/src/cc/inheritance/README (rev 0) +++ sandbox/trunk/src/cc/inheritance/README 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,2 @@ +Demo that you need to declare the overridden methods in a subclass in the +subclass definition, even if you define the implementation elsewhere. Added: sandbox/trunk/src/cc/inheritance/main.cc =================================================================== --- sandbox/trunk/src/cc/inheritance/main.cc (rev 0) +++ sandbox/trunk/src/cc/inheritance/main.cc 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,2 @@ +#include "sub.h" +int main() { sub s; return 0; } Added: sandbox/trunk/src/cc/inheritance/sub.cc =================================================================== --- sandbox/trunk/src/cc/inheritance/sub.cc (rev 0) +++ sandbox/trunk/src/cc/inheritance/sub.cc 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,2 @@ +#include "sub.h" +void sub::foo() {} Added: sandbox/trunk/src/cc/inheritance/sub.h =================================================================== --- sandbox/trunk/src/cc/inheritance/sub.h (rev 0) +++ sandbox/trunk/src/cc/inheritance/sub.h 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,5 @@ +#include "super.h" +class sub : public super { + // Must include this! + void foo(); +}; Added: sandbox/trunk/src/cc/inheritance/super.h =================================================================== --- sandbox/trunk/src/cc/inheritance/super.h (rev 0) +++ sandbox/trunk/src/cc/inheritance/super.h 2009-03-18 09:36:57 UTC (rev 1302) @@ -0,0 +1,4 @@ +class super { + public: + virtual void foo() = 0; +}; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-18 09:17:15
|
Revision: 1301 http://assorted.svn.sourceforge.net/assorted/?rev=1301&view=rev Author: yangzhang Date: 2009-03-18 09:17:01 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - added recovery, backlogging, catchup to ydb for tpcc - added ser(), deser() to TPCCTables - added separate (and simple) recovery build-up/catch-up logic to the main thread - added marker, first_seqno_in_chunk, proper overflow_fn (can still be simplified/refactored in conj with "anchored_stream_reader" design) - cleaned up memory management so that we don't double-free things that are backed by a recovery message buffer - added tpcc_recovery_header - added process_buf for catching up (replaying) tpccs from a buffer - added yielding to process_tpccs and reduced yield interval so that other things get a chance to run besides the process_tpccs thread - build system - added cog; used extensively for ser/deser - using gnu++0x for tpcc as well - added SECONDARY targets - added null-read-checking in readmsg - ignore null res in process_tpcc - added some typedefs to bplustree - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpcctables.h Added Paths: ----------- ydb/trunk/src/tpcc/tpcctables.cc.cog Removed Paths: ------------- ydb/trunk/src/tpcc/tpcctables.cc Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/README 2009-03-18 09:17:01 UTC (rev 1301) @@ -716,7 +716,7 @@ |stock| = 100000 |districts| = 10 |customers| = 30000 - |orders| = 3099 (almost 30000+1000)1 + |orders| = 30991 (almost 30000+1000) |ordersBC| = 30991 |orderlines| = 309477 (almost 300000+1000*10) |cbn| = 30000 @@ -736,12 +736,48 @@ |neworders| = 107973 |history| = 30000 +- DONE try integrating TPC-C; just get txns working for now + - DONE add iteration to btree - added btree_test for this -- TODO try integrating TPC-C; just get txns working for now +- DONE make process_tpccs switch into shifting mode (instead of buffer-swapping + mode) after catch-up +- DONE make process_tpccs rewind & process the accumulated buffer once seqno + has caught up to the first seqno in the buffer +- DONE implement TPC-C network recovery -- TODO implement TPC-C recovery +- TODO full TPC-C txn workload +- TODO prelim measurements on cluster +- TODO PAPER!!! + +- TODO related work + + [HST+91] Svein-Olaf Hvasshovd, Tore Saeter, Oystein Torboørnsen, Petter + Moe, and Oddvar Risnes. A continously available and highly scalable + transaction server: Design experience from the HypRa project. In + Proceedings of the 4th International Workshop on High Performance + Transaction Systems, September 1991. + + Ricardo Jimenez-Peris , M. Patino-Martinez , Gustavo Alonso , Bettina + Kemme, Are quorums an alternative for data replication?, ACM Transactions + on Database Systems (TODS), v.28 n.3, p.257-294, September 2003 + + ~\cite{peris-srds202}R. Jimenez-Peris , M. Patino-Martinez , G. Alonso, + An algorithm for non-intrusive, parallel recovery of replicated data and + its correctness, Proceedings of the 21st IEEE Symposium on Reliable + Distributed Systems (SRDS'02), p.150, October 13-16, 2002 + + ~\cite{bartoli-dsn01}B. Kemme, A. Bartoli, and O. Babaoglu. Online + Reconfiguration in Replicated Databases Based on Group Communication. In + Proc. of the Int. Conf. on Dependable Systems and Networks (DSN 2001), + Goteborg, Sweden, June 2001. + + ~\cite{amir-thesis} Y. Amir. Replication Using Group Communication Over a + Dynamic Network. PhD thesis, Institute of Computer Science, The Hebrew + University of Jerusalem, Israel. Also available at + http://www.dsn.jhu.edu/~yairamir/Yair_phd.pdf, 1995. + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -104,6 +104,11 @@ tpcc/%.o: tpcc/%.cc make -C tpcc/ +tpcc/%.cc: tpcc/%.cc.cog + make -C tpcc/ + +.SECONDARY: tpcc/tpcctables.cc tpcc/tpcctables.o + %.o: %.cc $(COMPILE.cc) $(OUTPUT_OPTION) $< Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 09:17:01 UTC (rev 1301) @@ -581,7 +581,7 @@ readmsg(anchored_stream_reader &src, T &msg) { uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(src.read(len), len)); + check(msg.ParseFromArray(checkpass(src.read(len)), len)); } enum { op_del, op_write, op_commit }; @@ -923,27 +923,30 @@ dst.ol_quantity = src.ol_quantity(); } NewOrderOutput output; - g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), - items, no.now().c_str(), &output); - res->Clear(); - res->set_seqno(seqno); - NewOrderOutputMsg &outmsg = *res->mutable_new_order(); - outmsg.set_w_tax(output.w_tax); - outmsg.set_d_tax(output.d_tax); - outmsg.set_o_id(output.o_id); - outmsg.set_c_discount(output.c_discount); - outmsg.set_total(output.total); - foreach (const NewOrderOutput::ItemInfo &src, output.items) { - ItemInfoMsg &dst = *outmsg.add_item(); - dst.set_s_quantity(src.s_quantity); - dst.set_i_price(src.i_price); - dst.set_ol_amount(src.ol_amount); - dst.set_brand_generic(src.brand_generic); - dst.set_i_name(src.i_name); + g_tables->newOrder(no.warehouse_id(), no.district_id(), + no.customer_id(), items, no.now().c_str(), + &output); + if (res != nullptr) { + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); } - outmsg.set_c_last(output.c_last); - outmsg.set_c_credit(output.c_credit); - outmsg.set_status(output.status); } else { throw_not_implemented(); } @@ -963,14 +966,38 @@ // the seqno reported by the leader in the Init message, but it may have // issued more since the Init message). int first_seqno = -1; + char *marker = nullptr; + int first_seqno_in_chunk = -1; + TpccReq req; + TpccRes res; - function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { - sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); - memcpy(buf.get(), reader.start(), reader.unread()); - swap(buf, reader.buf()); - __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); - reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); - }; + function<void(anchored_stream_reader& reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + if (__ref(caught_up)) { + // Anchor should already be correctly set, so just shift down. + shift_reader(reader); + } else if (__ref(first_seqno_in_chunk) == __ref(seqno) + 1) { + // Has the replayer just caught up to the start of the chunk? + ASSERT(reader.buf().get() == reader.anchor()); + // Replay all messages up to but not included the current unprocessed + // message (which we may be in the middle of receiving, triggering this + // overflow). + process_buf(reader.anchor(), __ref(marker), __ref(req), __ref(seqno)); + // Update the anchor to point to the unprocessed message, so that we + // shift the unprocessed message down. + reader.anchor() = __ref(marker); + shift_reader(reader); + } else { + // Push onto backlog and put in new buffer. + ASSERT(reader.buf().get() == reader.anchor()); + __ref(backlog).push(make_tuple(reader.buf(), reader.anchor(), __ref(marker))); + reader.anchor() = __ref(marker); + replace_reader(reader); + cout << "added to backlog, now has " << __ref(backlog).queue().size() + << " chunks" << endl; + } + __ref(marker) = reader.buf().get(); + }; sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); @@ -996,11 +1023,16 @@ __ref(w).mark_and_flush(); }); - TpccReq req; - TpccRes res; - while (true) { + // Has replayer just caught up to the start of the chunk? + if (first_seqno_in_chunk == seqno + 1) { + process_buf(reader.anchor(), reader.start(), req, seqno); + reader.set_anchor(); + } + + marker = reader.start(); + { st_intr intr(stop_hub); readmsg(reader, req); @@ -1013,35 +1045,67 @@ // Prepare recovery msg. send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); } else { - // Backlog (auto-implicit) or process. - if (check_interval(req.seqno(), process_display)) - cout << "processing req " << req.seqno() << endl; - if (req.seqno() == seqno + 1) { - if (!caught_up) { + // Backlog (auto/implicit) or process. + if (!caught_up) { + // If we were at the start of a new buffer (our chunk was recently reset). + if (reader.buf().get() == marker) + first_seqno_in_chunk = req.seqno(); + // If we fully caught up. + if (req.seqno() == seqno + 1) { time_caught_up = current_time_millis(); seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", + showtput("process_tpccs caught up; backlogged", time_caught_up, start_time, seqno_caught_up, first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } + } + if (caught_up) { // Process. process_tpcc(req, seqno, &res); ser(w, res); reader.set_anchor(); } + + // Display/yield. + if (check_interval(req.seqno(), process_display)) + cout << (caught_up ? "processed req " : "backlogged req ") + << req.seqno() << endl; + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); } } } -// XXX +void +process_buf(char *begin, char *end, TpccReq &req, int &seqno) +{ + ASSERT(begin < end); + raw_reader reader(begin); + while (reader.ptr() < end) { + uint32_t len = ntohl(reader.read<uint32_t>()); + ASSERT(len < 10000); + ASSERT(reinterpret_cast<char*>(reader.ptr()) + len <= end); + check(req.ParseFromArray(reader.readptr(len), len)); + process_tpcc(req, seqno, nullptr); + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); + if (check_interval(req.seqno(), process_display)) { + cout << "caught up to req " << req.seqno() << endl; + } + } +} + recovery_t make_tpcc_recovery(int mypos, int nnodes, int seqno) { - mypos = nnodes = seqno; - throw_not_implemented(); - return recovery_t(); + long long start_time = current_time_millis(); + cout << "serializing recovery, db state is now at seqno " + << seqno << ":" << endl; + g_tables->show(); + recovery_t recovery = g_tables->ser(mypos, nnodes, seqno); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); + return recovery; } /** @@ -1799,9 +1863,17 @@ // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + else + handle_responses_joiner_fn = + bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, + ref(recover_signals), false); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, - ref(recover_signals), false), + handlers.insert(my_spawn(handle_responses_joiner_fn, "handle_responses_joiner")); } catch (break_exception &ex) { } catch (std::exception &ex) { @@ -1858,6 +1930,7 @@ // Initialize database state. int seqno = -1; mii &map = g_map; + commons::array<char> recarr(0); if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -1953,203 +2026,334 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - if (rec_pwal) { - // Recover from physical log. - cout << "recovering from pwal" << endl; - long long start_time = current_time_millis(); - ifstream inf("pwal"); - binary_iarchive in(inf); - int rseqno = -1; - while (inf.peek() != ifstream::traits_type::eof()) { - int op; - in & op; - switch (op) { - case op_del: - { - int key; - in & key; - mii::iterator it = map.find(key); - map.erase(it); - break; + if (do_tpcc) { + + // + // TPCC txns + // + + g_tables.reset(new TPCCTables); + + if (rec_pwal) { + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + +#if 0 + // Resize the table if necessary. + + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - case op_write: - { - int key, val; - in & key & val; - map[key] = val; - break; - } - case op_commit: - ++rseqno; - break; + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); +#endif + }, "recovery_builder" + lexical_cast<string>(i))); } - if (check_interval(rseqno, yield_interval)) st_sleep(0); + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - seqno = init.txnseqno() - 1; - showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); - cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } else { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + // + // Simple txns + // - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - long long start_time = current_time_millis(); - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); - long long end_time = current_time_millis(); + // + // Build-up + // - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - showdatarate("got recovery message", len, end_time - start_time); - cout << "receive took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - } - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - // XXX - using msg::TxnBatch; - using msg::Txn; - commons::array<char> rbuf(0), wbuf(buf_size); - reader reader(nullptr, rbuf.get(), rbuf.size()); - writer writer(lambda(const void*, size_t) { - throw not_supported_exception("should not be writing responses during catch-up phase"); - }, wbuf.get(), wbuf.size()); - stream s(reader, writer); - TxnBatch batch(s); - while (!backlog.empty()) { - chunk chunk = backlog.take(); - sized_array<char> &buf = chunk.get<0>(); - ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - ASSERT(chunk.get<1>() < chunk.get<2>()); - swap(buf, reader.buf()); - reader.reset_range(chunk.get<1>(), chunk.get<2>()); - while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = ntohl(reader.read<uint32_t>()); - ASSERT(prefix < 10000); - ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); - batch.Clear(); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; - process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } + // + // Catch-up + // - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "caught up txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); + if (fake_exec && !Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - } - g_caught_up = true; + g_caught_up = true; #if 0 - while (!backlog.empty()) { - using pb::Txn; - shared_ptr<Txn> p = backlog.take(); - process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); } -#endif - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); } } catch (std::exception &ex) { cerr_thread_ex(ex) << endl; @@ -2566,7 +2770,7 @@ "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") - ("yield_interval,y", po::value<int>(&yield_interval)->default_value(10000), + ("yield-interval,y", po::value<int>(&yield_interval)->default_value(1000), "number of txns before yielding") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,9 +1,9 @@ WARNINGS = -Werror -Wall -Wextra -Wconversion -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings -Woverloaded-virtual -Wno-sign-compare -Wno-unused-parameter # Debug flags -CXXFLAGS = -g -MD $(WARNINGS) -I.. +CXXFLAGS = -g -MD $(WARNINGS) -I.. -std=gnu++0x # Optimization flags -#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) +#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) -std=gnu++0x # Link withthe C++ standard library #LDFLAGS=-lstdc++ @@ -15,7 +15,12 @@ btree_test: btree_test.o +%.cc: %.cc.cog + cog.py $< > $@ + clean : rm -f *.o *.d $(BINARIES) -include *.d + +.SECONDARY: tpcctables.cc Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/btree.h 2009-03-18 09:17:01 UTC (rev 1301) @@ -46,9 +46,13 @@ template <typename KEY, typename VALUE, unsigned N, unsigned M, unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, unsigned NODE_ALIGNMENT= 64> - class BPlusTree +class BPlusTree { public: + typedef pair<KEY, VALUE> entry_type; + typedef KEY key_type; + typedef VALUE data_type; + // N must be greater than two to make the split of // two inner nodes sensible. BOOST_STATIC_ASSERT(N>2); @@ -222,7 +226,8 @@ public: typedef typename BPlusTree::InnerNode inner_type; typedef typename BPlusTree::LeafNode leaf_type; - iterator(BPlusTree &tree) : tree_(tree) { + typedef typename BPlusTree::entry_type entry_type; + iterator(const BPlusTree &tree) : tree_(tree) { if (tree.size_ > 0) { stack_.push_back(make_pair(tree.root, 0)); // If depth = 0, that means we have only a root node. @@ -248,7 +253,7 @@ #endif return *this; } - pair<KEY, VALUE> operator*() { + entry_type operator*() { return make_pair(leaf().keys[pos()], leaf().values[pos()]); } bool end() { return stack_.empty(); } @@ -268,7 +273,7 @@ leaf().num_keys - 1 : inner().num_keys)) stack_.pop_back(); // back up } - BPlusTree &tree_; + const BPlusTree &tree_; // stack of (node, pos) deque< pair<void*, size_t> > stack_; }; Deleted: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,622 +0,0 @@ -#include "tpcctables.h" - -#include <algorithm> -#include <limits> -#include <vector> - -#include <commons/assert.h> -#include <iostream> -#include "stlutil.h" - -using std::vector; - -bool CustomerByNameOrdering::operator()(const Customer* a, const Customer* b) { - if (a->c_w_id < b->c_w_id) return true; - if (a->c_w_id > b->c_w_id) return false; - assert(a->c_w_id == b->c_w_id); - - if (a->c_d_id < b->c_d_id) return true; - if (a->c_d_id > b->c_d_id) return false; - assert(a->c_d_id == b->c_d_id); - - int diff = strcmp(a->c_last, b->c_last); - if (diff < 0) return true; - if (diff > 0) return false; - assert(diff == 0); - - // Finally delegate to c_first - return strcmp(a->c_first, b->c_first) < 0; -} - -template <typename KeyType, typename ValueType> -static void deleteBTreeValues(BPlusTree<KeyType, ValueType*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* btree) { - KeyType key = std::numeric_limits<KeyType>::max(); - ValueType* value = NULL; - while (btree->findLastLessThan(key, &value, &key)) { - assert(value != NULL); - delete value; - } -} - -TPCCTables::~TPCCTables() { - // Clean up the b-trees with this gross hack - deleteBTreeValues(&warehouses_); - deleteBTreeValues(&stock_); - deleteBTreeValues(&districts_); - deleteBTreeValues(&orders_); - deleteBTreeValues(&orderlines_); - - STLDeleteValues(&neworders_); - STLDeleteElements(&customers_by_name_); - STLDeleteElements(&history_); -} - -int TPCCTables::stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { - /* EXEC SQL SELECT d_next_o_id INTO :o_id FROM district - WHERE d_w_id=:w_id AND d_id=:d_id; */ - //~ printf("stock level %d %d %d\n", warehouse_id, district_id, threshold); - District* d = findDistrict(warehouse_id, district_id); - int32_t o_id = d->d_next_o_id; - - /* EXEC SQL SELECT COUNT(DISTINCT (s_i_id)) INTO :stock_count FROM order_line, stock - WHERE ol_w_id=:w_id AND ol_d_id=:d_id AND ol_o_id<:o_id AND ol_o_id>=:o_id-20 - AND s_w_id=:w_id AND s_i_id=ol_i_id AND s_quantity < :threshold;*/ - - - // retrieve up to 300 tuples from order line, using ( [o_id-20, o_id), d_id, w_id, [1, 15]) - // and for each retrieved tuple, read the corresponding stock tuple using (ol_i_id, w_id) - // NOTE: This is a cheat because it hard codes the maximum number of orders. - // We really should use the ordered b-tree index to find (0, o_id-20, d_id, w_id) then iterate - // until the end. This will also do less work (wasted finds). Since this is only 4%, it probably - // doesn't matter much - - // TODO: Test the performance more carefully. I tried: std::set, std::hash_set, std::vector - // with linear search, and std::vector with binary search using std::lower_bound. The best - // seemed to be to simply save all the s_i_ids, then sort and eliminate duplicates at the end. - std::vector<int32_t> s_i_ids; - // Average size is more like ~30. - s_i_ids.reserve(300); - - // Iterate over [o_id-20, o_id) - for (int order_id = o_id - STOCK_LEVEL_ORDERS; order_id < o_id; ++order_id) { - // HACK: We shouldn't rely on MAX_OL_CNT. See comment above. - for (int line_number = 1; line_number <= Order::MAX_OL_CNT; ++line_number) { - OrderLine* line = findOrderLine(warehouse_id, district_id, order_id, line_number); - if (line == NULL) { - // We can break since we have reached the end of the lines for this order. - // TODO: A btree iterate in (w_id, d_id, o_id) order would be a clean way to do this -#ifndef NDEBUG - for (int test_line_number = line_number + 1; line_number < Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(warehouse_id, district_id, order_id, test_line_number) == NULL); - } -#endif - break; - } - - // Check if s_quantity < threshold - Stock* stock = findStock(warehouse_id, line->ol_i_id); - if (stock->s_quantity < threshold) { - s_i_ids.push_back(line->ol_i_id); - } - } - } - - // Filter out duplicate s_i_id: multiple order lines can have the same item - std::sort(s_i_ids.begin(), s_i_ids.end()); - int num_distinct = 0; - int32_t last = -1; // NOTE: This relies on -1 being an invalid s_i_id - for (size_t i = 0; i < s_i_ids.size(); ++i) { - if (s_i_ids[i] != last) { - last = s_i_ids[i]; - num_distinct += 1; - } - } - - return num_distinct; -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, OrderStatusOutput* output) { - //~ printf("order status %d %d %d\n", warehouse_id, district_id, customer_id); - internalOrderStatus(findCustomer(warehouse_id, district_id, customer_id), output); -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, OrderStatusOutput* output) { - //~ printf("order status %d %d %s\n", warehouse_id, district_id, c_last); - Customer* customer = findCustomerByName(warehouse_id, district_id, c_last); - internalOrderStatus(customer, output); -} - -void TPCCTables::internalOrderStatus(Customer* customer, OrderStatusOutput* output) { - output->c_id = customer->c_id; - // retrieve from customer: balance, first, middle, last - output->c_balance = customer->c_balance; - strcpy(output->c_first, customer->c_first); - strcpy(output->c_middle, customer->c_middle); - strcpy(output->c_last, customer->c_last); - - // Find the row in the order table with largest o_id - Order* order = findLastOrderByCustomer(customer->c_w_id, customer->c_d_id, customer->c_id); - output->o_id = order->o_id; - output->o_carrier_id = order->o_carrier_id; - strcpy(output->o_entry_d, order->o_entry_d); - - output->lines.resize(order->o_ol_cnt); - for (int32_t line_number = 1; line_number <= order->o_ol_cnt; ++line_number) { - OrderLine* line = findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number); - output->lines[line_number-1].ol_i_id = line->ol_i_id; - output->lines[line_number-1].ol_supply_w_id = line->ol_supply_w_id; - output->lines[line_number-1].ol_quantity = line->ol_quantity; - output->lines[line_number-1].ol_amount = line->ol_amount; - strcpy(output->lines[line_number-1].ol_delivery_d, line->ol_delivery_d); - } -#ifndef NDEBUG - // Verify that none of the other OrderLines exist. - for (int32_t line_number = order->o_ol_cnt+1; line_number <= Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number) == NULL); - } -#endif -} - -bool TPCCTables::newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, - const std::vector<NewOrderItem>& items, const char* now, NewOrderOutput* output) { - //~ printf("new order %d %d %d %d %s\n", warehouse_id, district_id, customer_id, items.size(), now); - // 2.4.3.4. requires that we display c_last, c_credit, and o_id for rolled back transactions: - // read those values first - District* d = findDistrict(warehouse_id, district_id); - output->d_tax = d->d_tax; - output->o_id = d->d_next_o_id; - assert(findOrder(warehouse_id, district_id, output->o_id) == NULL); - - Customer* c = findCustomer(warehouse_id, district_id, customer_id); - assert(sizeof(output->c_last) == sizeof(c->c_last)); - memcpy(output->c_last, c->c_last, sizeof(output->c_last)); - memcpy(output->c_credit, c->c_credit, sizeof(output->c_credit)); - output->c_discount = c->c_discount; - - // CHEAT: Validate all items to see if we will need to abort - vector<Item*> item_tuples(items.size()); - bool all_local = true; - for (int i = 0; i < items.size(); ++i) { - item_tuples[i] = findItem(items[i].i_id); - if (item_tuples[i] == NULL) { - strcpy(output->status, NewOrderOutput::INVALID_ITEM_STATUS); - return false; - } - all_local = all_local && items[i].ol_supply_w_id == warehouse_id; - } - - // We will not abort: update the status and the database state - output->status[0] = '\0'; - - // Modify the order id to assign it - d->d_next_o_id += 1; - - Warehouse* w = findWarehouse(warehouse_id); - output->w_tax = w->w_tax; - - Order order; - order.o_w_id = warehouse_id; - order.o_d_id = district_id; - order.o_id = output->o_id; - order.o_c_id = customer_id; - order.o_carrier_id = Order::NULL_CARRIER_ID; - order.o_ol_cnt = static_cast<int32_t>(items.size()); - order.o_all_local = all_local ? 1 : 0; - strcpy(order.o_entry_d, now); - assert(strlen(order.o_entry_d) == DATETIME_SIZE); - insertOrder(order); - insertNewOrder(warehouse_id, district_id, output->o_id); - - OrderLine line; - line.ol_o_id = output->o_id; - line.ol_d_id = district_id; - line.ol_w_id = warehouse_id; - memset(line.ol_delivery_d, 0, DATETIME_SIZE+1); - - output->items.resize(items.size()); - output->total = 0; - for (int i = 0; i < items.size(); ++i) { - line.ol_number = i+1; - line.ol_i_id = items[i].i_id; - line.ol_supply_w_id = items[i].ol_supply_w_id; - line.ol_quantity = items[i].ol_quantity; - - // Read and update stock - Stock* stock = findStock(items[i].ol_supply_w_id, items[i].i_id); - if (stock->s_quantity >= items[i].ol_quantity + 10) { - stock->s_quantity -= items[i].ol_quantity; - } else { - stock->s_quantity = stock->s_quantity - items[i].ol_quantity + 91; - } - output->items[i].s_quantity = stock->s_quantity; - assert(sizeof(line.ol_dist_info) == sizeof(stock->s_dist[district_id])); - memcpy(line.ol_dist_info, stock->s_dist[district_id], sizeof(line.ol_dist_info)); - stock->s_ytd += items[i].ol_quantity; - stock->s_order_cnt += 1; - if (items[i].ol_supply_w_id != warehouse_id) { - // remote order - stock->s_remote_cnt += 1; - } - bool stock_is_original = (strstr(stock->s_data, "ORIGINAL") != NULL); - - assert(sizeof(output->items[i].i_name) == sizeof(item_tuples[i]->i_name)); - memcpy(output->items[i].i_name, item_tuples[i]->i_name, sizeof(output->items[i].i_name)); - output->items[i].i_price = item_tuples[i]->i_price; - output->items[i].ol_amount = - static_cast<float>(items[i].ol_quantity) * item_tuples[i]->i_price; - line.ol_amount = output->items[i].ol_amount; - output->total += output->items[i].ol_amount; - if (stock_is_original && strstr(item_tuples[i]->i_data, "ORIGINAL") != NULL) { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::BRAND; - } else { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::GENERIC; - } - insertOrderLine(line); - } - - return true; -} - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %d %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, customer_id, h_amount, now); - Customer* customer = findCustomer(c_warehouse_id, c_district_id, customer_id); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, const char* c_last, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %s %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, c_last, h_amount, now); - Customer* customer = findCustomerByName(c_warehouse_id, c_district_id, c_last); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - -void TPCCTables::internalPayment(int32_t warehouse_id, int32_t district_id, Customer* c, - float h_amount, const char* now, PaymentOutput* output) { - Warehouse* w = findWarehouse(warehouse_id); - w->w_ytd += h_amount; - output->warehouse = *w; - - District* d = findDistrict(warehouse_id, district_id); - d->d_ytd += h_amount; - output->district = *d; - - c->c_balance -= h_amount; - c->c_ytd_payment += h_amount; - c->c_payment_cnt += 1; - if (strcmp(c->c_credit, Customer::BAD_CREDIT) == 0) { - // Bad credit: insert history into c_data - static const int HISTORY_SIZE = Customer::MAX_DATA+1; - char history[HISTORY_SIZE]; - int characters = snprintf(history, HISTORY_SIZE, "(%d, %d, %d, %d, %d, %.2f)\n", - c->c_id, c->c_d_id, c->c_w_id, district_id, warehouse_id, h_amount); - assert(characters < HISTORY_SIZE); - - // Perform the insert with a move and copy - int current_keep = static_cast<int>(strlen(c->c_data)); - if (current_keep + characters > Customer::MAX_DATA) { - current_keep = Customer::MAX_DATA - characters; - } - assert(current_keep + characters <= Customer::MAX_DATA); - memmove(c->c_data+characters, c->c_data, current_keep); - memcpy(c->c_data, history, characters); - c->c_data[characters + current_keep] = '\0'; - assert(strlen(c->c_data) == characters + current_keep); - } - output->customer = *c; - - // Insert the line into the history table - History h; - h.h_w_id = warehouse_id; - h.h_d_id = district_id; - h.h_c_w_id = c->c_w_id; - h.h_c_d_id = c->c_d_id; - h.h_c_id = c->c_id; - h.h_amount = h_amount; - strcpy(h.h_date, now); - strcpy(h.h_data, w->w_name); - strcat(h.h_data, " "); - strcat(h.h_data, d->d_name); - insertHistory(h); -} - -// forward declaration for delivery -static int64_t makeNewOrderKey(int32_t w_id, int32_t d_id, int32_t o_id); - -void TPCCTables::delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, - std::vector<DeliveryOrderInfo>* orders) { - //~ printf("delivery %d %d %s\n", warehouse_id, carrier_id, now); - orders->clear(); - for (int32_t d_id = 1; d_id <= District::NUM_PER_WAREHOUSE; ++d_id) { - // Find and remove the lowest numbered order for the district - int64_t key = makeNewOrderKey(warehouse_id, d_id, 1); - NewOrderMap::iterator iterator = neworders_.lower_bound(key); - NewOrder* neworder = NULL; - if (iterator != neworders_.end()) { - neworder = iterator->second; - assert(neworder != NULL); - } - if (neworder == NULL || neworder->no_d_id != d_id || neworder->no_w_id != warehouse_id) { - // No orders for this district - // TODO: 2.7.4.2: If this occurs in max(1%, 1) of transactions, report it (???) - continue; - } - assert(neworder->no_d_id == d_id && neworder->no_w_id == warehouse_id); - int32_t o_id = neworder->no_o_id; - neworders_.erase(iterator); - delete neworder; - - DeliveryOrderInfo order; - order.d_id = d_id; - order.o_id = o_id; - orders->push_back(order); - - Order* o = findOrder(warehouse_id, d_id, o_id); - assert(o->o_carrier_id == Order::NULL_CARRIER_ID); - o->o_carrier_id = carrier_id; - - float total = 0; - // TODO: Select based on (w_id, d_id, o_id) rather than using ol_number? - for (int32_t i = 1; i <= o->o_ol_cnt; ++i) { - OrderLine* line = findOrderLine(warehouse_id, d_id, o_id, i); - assert(0 == strlen(line->ol_delivery_d)); - strcpy(line->ol_delivery_d, now); - assert(strlen(line->ol_delivery_d) == DATETIME_SIZE); - total += line->ol_amount; - } - - Customer* c = findCustomer(warehouse_id, d_id, o->o_c_id); - c->c_balance += total; - c->c_delivery_cnt += 1; - } -} - -template <typename T> -static T* insert(BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* tree, int32_t key, const T& item) { - assert(!tree->find(key)); - T* copy = new T(item); - tree->insert(key, copy); - return copy; -} - -template <typename T> -static T* find(const BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>& tree, int32_t key) { - T* output = NULL; - if (tree.find(key, &output)) { - return output; - } - return NULL; -} - -void TPCCTables::insertItem(const Item& item) { - assert(item.i_id == items_.size() + 1); - items_.push_back(item); -} -Item* TPCCTables::findItem(int32_t id) { - assert(1 <= id); - id -= 1; - if (id >= items_.size()) return NULL; - return &items_[id]; -} - -void TPCCTables::insertWarehouse(const Warehouse& w) { - insert(&warehouses_, w.w_id, w); -} -Warehouse* TPCCTables::findWarehouse(int32_t id) { - return find(warehouses_, id); -} - -static int32_t makeStockKey(int32_t w_id, int32_t s_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= s_id && s_id <= Stock::NUM_STOCK_PER_WAREHOUSE); - int32_t id = s_id + (w_id * Stock::NUM_STOCK_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertStock(const Stock& stock) { - insert(&stock_, makeStockKey(stock.s_w_id, stock.s_i_id), stock); -} -Stock* TPCCTables::findStock(int32_t w_id, int32_t s_id) { - return find(stock_, makeStockKey(w_id, s_id)); -} - -static int32_t makeDistrictKey(int32_t w_id, int32_t d_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - int32_t id = d_id + (w_id * District::NUM_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertDistrict(const District& district) { - insert(&districts_, makeDistrictKey(district.d_w_id, district.d_id), district); -} -District* TPCCTables::findDistrict(int32_t w_id, int32_t d_id) { - return find(districts_, makeDistrictKey(w_id, d_id)); -} - -static int32_t makeCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - int32_t id = (w_id * District::NUM_PER_WAREHOUSE + d_id) - * Customer::NUM_PER_DISTRICT + c_id; - assert(id >= 0); - return id; -} - -void TPCCTables::insertCustomer(const Customer& customer) { - Customer* c = insert(&customers_, makeCustomerKey(customer.c_w_id, customer.c_d_id, customer.c_id), customer); - assert(customers_by_name_.find(c) == customers_by_name_.end()); - customers_by_name_.insert(c); -} -Customer* TPCCTables::findCustomer(int32_t w_id, int32_t d_id, int32_t c_id) { - return find(customers_, makeCustomerKey(w_id, d_id, c_id)); -} - -Customer* TPCCTables::findCustomerByName(int32_t w_id, int32_t d_id, const char* c_last) { - // select (w_id, d_id, *, c_last) order by c_first - Customer c; - c.c_w_id = w_id; - c.c_d_id = d_id; - strcpy(c.c_last, c_last); - c.c_first[0] = '\0'; - CustomerByNameSet::const_iterator it = customers_by_name_.lower_bound(&c); - assert(it != customers_by_name_.end()); - assert((*it)->c_w_id == w_id && (*it)->c_d_id == d_id && strcmp((*it)->c_last, c_last) == 0); - - // go to the "next" c_last - // TODO: This is a GROSS hack. Can we do better? - int length = static_cast<int>(strlen(c_last)); - if (length == Customer::MAX_LAST) { - c.c_last[length-1] = static_cast<char>(c.c_last[length-1] + 1); - } else { - c.c_last[length] = 'A'; - c.c_last[length+1] = '\0'; - } - CustomerByNameSet::const_iterator stop = customers_by_name_.lower_bound(&c); - - Customer* customer = NULL; - // Choose position n/2 rounded up (1 based addressing) = floor((n-1)/2) - if (it != stop) { - CustomerByNameSet::const_iterator middle = it; - ++it; - int i = 0; - while (it != stop) { - // Increment the middle iterator on every second iteration - if (i % 2 == 1) { - ++middle; - } - assert(strcmp((*it)->c_last, c_last) == 0); - ++it; - ++i; - } - // There were i+1 matching last names - customer = *middle; - } - - assert(customer->c_w_id == w_id && customer->c_d_id == d_id && - strcmp(customer->c_last, c_last) == 0); - return customer; -} - -static int32_t makeOrderKey(int32_t w_id, int32_t d_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - // TODO: This is bad for locality since o_id is in the most significant position. Larger keys? - int32_t id = (o_id * District::NUM_PER_WAREHOUSE + d_id) - * Warehouse::MAX_WAREHOUSE_ID + w_id; - assert(id >= 0); - return id; -} - -static int64_t makeOrderByCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - int32_t top_id = (w_id * District::NUM_PER_WAREHOUSE + d_id) * Customer::NUM_PER_DISTRICT - + c_id; - assert(top_id >= 0); - int64_t id = (((int64_t) top_id) << 32) | o_id; - assert(id > 0); - return id; -} - -void TPCCTabl... [truncated message content] |
From: <yan...@us...> - 2009-03-17 17:24:41
|
Revision: 1300 http://assorted.svn.sourceforge.net/assorted/?rev=1300&view=rev Author: yangzhang Date: 2009-03-17 17:24:21 +0000 (Tue, 17 Mar 2009) Log Message: ----------- added demo of sizeof static members Added Paths: ----------- sandbox/trunk/src/cc/sizeofstatics.cc Added: sandbox/trunk/src/cc/sizeofstatics.cc =================================================================== --- sandbox/trunk/src/cc/sizeofstatics.cc (rev 0) +++ sandbox/trunk/src/cc/sizeofstatics.cc 2009-03-17 17:24:21 UTC (rev 1300) @@ -0,0 +1,5 @@ +// Demo that static members don't count toward the per-struct sizeof. +#include <iostream> +using namespace std; +struct s { static int x; int y; }; +int main() { cout << sizeof(s) << endl; return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 17:18:53
|
Revision: 1299 http://assorted.svn.sourceforge.net/assorted/?rev=1299&view=rev Author: yangzhang Date: 2009-03-17 17:18:39 +0000 (Tue, 17 Mar 2009) Log Message: ----------- demo of struct initialization Added Paths: ----------- sandbox/trunk/src/cc/init.cc Added: sandbox/trunk/src/cc/init.cc =================================================================== --- sandbox/trunk/src/cc/init.cc (rev 0) +++ sandbox/trunk/src/cc/init.cc 2009-03-17 17:18:39 UTC (rev 1299) @@ -0,0 +1,23 @@ +// Demo of struct initialization. +#include <iostream> +using namespace std; +struct s { int a, b, c, d, e, f, g, h[10]; }; +void show(s &x) { + cout << x.a << ' ' + << x.b << ' ' + << x.c << ' ' + << x.d << ' ' + << x.e << ' ' + << x.f << ' ' + << x.g << ' '; + for (int i = 0; i < 10; ++i) + cout << x.h[i] << ' '; + cout << endl; +} +int main() { + // Garbage initialization. + { s x; show(x); } + // Initializes everything to be zero. + { s x = {0}; show(x); } + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 07:09:00
|
Revision: 1298 http://assorted.svn.sourceforge.net/assorted/?rev=1298&view=rev Author: yangzhang Date: 2009-03-17 07:08:58 +0000 (Tue, 17 Mar 2009) Log Message: ----------- simple linking demo Added Paths: ----------- sandbox/trunk/src/c/linking/ sandbox/trunk/src/c/linking/Makefile sandbox/trunk/src/c/linking/a.c sandbox/trunk/src/c/linking/a.h sandbox/trunk/src/c/linking/b.c Added: sandbox/trunk/src/c/linking/Makefile =================================================================== --- sandbox/trunk/src/c/linking/Makefile (rev 0) +++ sandbox/trunk/src/c/linking/Makefile 2009-03-17 07:08:58 UTC (rev 1298) @@ -0,0 +1,4 @@ +all: a +a: a.o b.o +clean: + rm -f *.o a Added: sandbox/trunk/src/c/linking/a.c =================================================================== --- sandbox/trunk/src/c/linking/a.c (rev 0) +++ sandbox/trunk/src/c/linking/a.c 2009-03-17 07:08:58 UTC (rev 1298) @@ -0,0 +1,2 @@ +#include "a.h" +int main() { foo(); bar(); return 0; } Added: sandbox/trunk/src/c/linking/a.h =================================================================== --- sandbox/trunk/src/c/linking/a.h (rev 0) +++ sandbox/trunk/src/c/linking/a.h 2009-03-17 07:08:58 UTC (rev 1298) @@ -0,0 +1,14 @@ +// Must either make static or leave extern but only the decl (no def), or else you'll get: +// +// $ make +// cc -c -o a.o a.c +// cc -c -o b.o b.c +// cc a.o b.o -o a +// b.o: In function `foo': +// b.c:(.text+0x0): multiple definition of `foo' +// a.o:a.c:(.text+0x0): first defined here +// collect2: ld returned 1 exit status +// make: *** [a] Error 1 + +static int foo() { return 0; } +int bar(); Added: sandbox/trunk/src/c/linking/b.c =================================================================== --- sandbox/trunk/src/c/linking/b.c (rev 0) +++ sandbox/trunk/src/c/linking/b.c 2009-03-17 07:08:58 UTC (rev 1298) @@ -0,0 +1,3 @@ +#include "a.h" +int bar() { return 0; } +int b() { foo(); bar(); return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 06:11:50
|
Revision: 1297 http://assorted.svn.sourceforge.net/assorted/?rev=1297&view=rev Author: yangzhang Date: 2009-03-17 06:11:47 +0000 (Tue, 17 Mar 2009) Log Message: ----------- added iterator (+tests) for btree Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h Added Paths: ----------- ydb/trunk/src/tpcc/btree_test.cc Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/README 2009-03-17 06:11:47 UTC (rev 1297) @@ -736,7 +736,8 @@ |neworders| = 107973 |history| = 30000 -- TODO add iteration to btree +- DONE add iteration to btree + - added btree_test for this - TODO try integrating TPC-C; just get txns working for now - TODO implement TPC-C recovery Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/src/tpcc/Makefile 2009-03-17 06:11:47 UTC (rev 1297) @@ -6,12 +6,15 @@ #CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) # Link withthe C++ standard library -LDFLAGS=-lstdc++ +#LDFLAGS=-lstdc++ +LDLIBS = -lgtest -BINARIES = tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o +BINARIES = btree_test.o tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o all: $(BINARIES) +btree_test: btree_test.o + clean : rm -f *.o *.d $(BINARIES) Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 02:39:31 UTC (rev 1296) +++ ydb/trunk/src/tpcc/btree.h 2009-03-17 06:11:47 UTC (rev 1297) @@ -9,14 +9,25 @@ #include <assert.h> #include <stdlib.h> #include <string.h> +#include <deque> +#include <utility> +#include <iostream> // XXX #include <boost/static_assert.hpp> #include <boost/pool/object_pool.hpp> +#include <commons/nullptr.h> +#ifdef DEBUG +#include <iostream> +using namespace std; // XXX +#endif + // DEBUG -#include <iostream> using std::cout; using std::endl; +using namespace commons; +using std::deque; +using std::pair; #ifdef __linux__ #define HAVE_POSIX_MEMALIGN @@ -206,6 +217,62 @@ size_t size() const { return size_; } + class iterator + { + public: + typedef typename BPlusTree::InnerNode inner_type; + typedef typename BPlusTree::LeafNode leaf_type; + iterator(BPlusTree &tree) : tree_(tree) { + if (tree.size_ > 0) { + stack_.push_back(make_pair(tree.root, 0)); + // If depth = 0, that means we have only a root node. + // 1 isn't <= 0 so we don't do anything, which is good. + // If depth = 1, that means we have a root and children. + // 1 is <= 1 so we do add the first child, which is good. + down(); + } + } + iterator &operator++() { +#ifdef DEBUG + for (int i = 0; i < stack_.size(); ++i) + cout << (i > 0 ? " > " : "") << stack_[i].first << ' ' << stack_[i].second + << '/' << (i == tree_.depth ? as_inner(stack_[i].first).num_keys : as_leaf(stack_[i].first).num_keys); + cout << endl; +#endif + up(); if (!stack_.empty()) down(); +#ifdef DEBUG + for (int i = 0; i < stack_.size(); ++i) + cout << (i > 0 ? " > " : "") << stack_[i].first << ' ' << stack_[i].second + << '/' << (i == tree_.depth ? as_inner(stack_[i].first).num_keys : as_leaf(stack_[i].first).num_keys); + cout << endl; +#endif + return *this; + } + pair<KEY, VALUE> operator*() { + return make_pair(leaf().keys[pos()], leaf().values[pos()]); + } + bool end() { return stack_.empty(); } + private: + inner_type &as_inner(void *p) { return *reinterpret_cast<inner_type*>(p); } + leaf_type &as_leaf(void *p) { return *reinterpret_cast<leaf_type*>(p); } + inner_type &inner() { return as_inner(stack_.back().first); } + leaf_type &leaf() { return as_leaf(stack_.back().first); } + size_t &pos() { return stack_.back().second; } + void down() { + while (stack_.size() <= tree_.depth) + stack_.push_back(make_pair(inner().children[pos()], 0)); + } + void up() { + while (!stack_.empty() && + ++pos() > (stack_.size() == tree_.depth + 1 ? + leaf().num_keys - 1 : inner().num_keys)) + stack_.pop_back(); // back up + } + BPlusTree &tree_; + // stack of (node, pos) + deque< pair<void*, size_t> > stack_; + }; + private: // Used when debugging enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; Added: ydb/trunk/src/tpcc/btree_test.cc =================================================================== --- ydb/trunk/src/tpcc/btree_test.cc (rev 0) +++ ydb/trunk/src/tpcc/btree_test.cc 2009-03-17 06:11:47 UTC (rev 1297) @@ -0,0 +1,61 @@ +#include "btree.h" +#include <gtest/gtest.h> +using namespace std; +using namespace testing; + +typedef BPlusTree<int, int, 8, 8> tree_t; + +TEST(btree, empty) { + tree_t tree; + tree_t::iterator it(tree); + ASSERT_TRUE(it.end()); +} + +TEST(btree, shallow) { + tree_t tree; + for (int i = 1; i < 4; ++i) + tree.insert(2*i, 3*i); // [(2,3),(4,6),(6,9)] + { + tree_t::iterator it(tree); + for (int i = 1; i < 4; ++i) { + EXPECT_EQ(2*i, (*it).first); + EXPECT_EQ(3*i, (*it).second); + ++it; + } + EXPECT_TRUE(it.end()); + } + + tree.del(4); // [(2,3),(4,0),(6,9)] + + { + tree_t::iterator it(tree); + EXPECT_EQ(2, (*it).first); + EXPECT_EQ(3, (*it).second); + ++it; + EXPECT_EQ(4, (*it).first); + EXPECT_EQ(0, (*it).second); + ++it; + EXPECT_EQ(6, (*it).first); + EXPECT_EQ(9, (*it).second); + ++it; + EXPECT_TRUE(it.end()); + } +} + +TEST(btree, deep) { + tree_t tree; + for (int i = 1; i < 100; ++i) + tree.insert(2*i, 3*i); + tree_t::iterator it(tree); + for (int i = 1; i < 100; ++i) { + EXPECT_EQ(2*i, (*it).first); + EXPECT_EQ(3*i, (*it).second); + ++it; + } + EXPECT_TRUE(it.end()); +} + +int main(int argc, char **argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 02:39:49
|
Revision: 1296 http://assorted.svn.sourceforge.net/assorted/?rev=1296&view=rev Author: yangzhang Date: 2009-03-17 02:39:31 +0000 (Tue, 17 Mar 2009) Log Message: ----------- reformat Modified Paths: -------------- ydb/trunk/src/tpcc/btree.h Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 02:34:49 UTC (rev 1295) +++ ydb/trunk/src/tpcc/btree.h 2009-03-17 02:39:31 UTC (rev 1296) @@ -27,491 +27,490 @@ #else // TODO: This is not aligned! It doesn't matter for this implementation, but it could static inline int posix_memalign(void** result, int, size_t bytes) { - *result = malloc(bytes); - return *result == NULL; + *result = malloc(bytes); + return *result == NULL; } #endif template <typename KEY, typename VALUE, unsigned N, unsigned M, - unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, - unsigned NODE_ALIGNMENT= 64> -class BPlusTree + unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, + unsigned NODE_ALIGNMENT= 64> + class BPlusTree { public: - // N must be greater than two to make the split of - // two inner nodes sensible. - BOOST_STATIC_ASSERT(N>2); - // Leaf nodes must be able to hold at least one element - BOOST_STATIC_ASSERT(M>0); + // N must be greater than two to make the split of + // two inner nodes sensible. + BOOST_STATIC_ASSERT(N>2); + // Leaf nodes must be able to hold at least one element + BOOST_STATIC_ASSERT(M>0); - // Builds a new empty tree. - BPlusTree() - : depth(0), - root(new_leaf_node()), - size_(0) - { - // DEBUG - // cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl; - // cout << "sizeof(InnerNode)==" << sizeof(InnerNode) << endl; - } + // Builds a new empty tree. + BPlusTree() + : depth(0), + root(new_leaf_node()), + size_(0) +{ + // DEBUG + // cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl; + // cout << "sizeof(InnerNode)==" << sizeof(InnerNode) << endl; +} - ~BPlusTree() { - // Empty. Memory deallocation is done automatically - // when innerPool and leafPool are destroyed. - } + ~BPlusTree() { + // Empty. Memory deallocation is done automatically + // when innerPool and leafPool are destroyed. + } - // Inserts a pair (key, value). If there is a previous pair with - // the same key, the old value is overwritten with the new one. - void insert(KEY key, VALUE value) { - // GCC warns that this may be used uninitialized, even though that is untrue. - InsertionResult result = { KEY(), 0, 0 }; - bool was_split; - if( depth == 0 ) { - // The root is a leaf node - assert( *reinterpret_cast<NodeType*>(root) == - NODE_LEAF); - was_split= leaf_insert(reinterpret_cast<LeafNode*> - (root), key, value, &result); - } else { - // The root is an inner node - assert( *reinterpret_cast<NodeType*> - (root) == NODE_INNER ); - was_split= inner_insert(reinterpret_cast<InnerNode*> - (root), depth, key, value, &result); - } - if( was_split ) { - // The old root was splitted in two parts. - // We have to create a new root pointing to them - depth++; - root= new_inner_node(); - InnerNode* rootProxy= - reinterpret_cast<InnerNode*>(root); - rootProxy->num_keys= 1; - rootProxy->keys[0]= result.key; - rootProxy->children[0]= result.left; - rootProxy->children[1]= result.right; - } - } - -// Looks for the given key. If it is not found, it returns false, -// if it is found, it returns true and copies the associated value -// unless the pointer is null. -bool find(const KEY& key, VALUE* value= 0) const { - const InnerNode* inner; - register const void* node= root; - register unsigned d= depth, index; - while( d-- != 0 ) { - inner= reinterpret_cast<const InnerNode*>(node); - assert( inner->type == NODE_INNER ); - index= inner_position_for(key, inner->keys, inner->num_keys); - node= inner->children[index]; + // Inserts a pair (key, value). If there is a previous pair with + // the same key, the old value is overwritten with the new one. + void insert(KEY key, VALUE value) { + // GCC warns that this may be used uninitialized, even though that is untrue. + InsertionResult result = { KEY(), 0, 0 }; + bool was_split; + if( depth == 0 ) { + // The root is a leaf node + assert( *reinterpret_cast<NodeType*>(root) == + NODE_LEAF); + was_split= leaf_insert(reinterpret_cast<LeafNode*> + (root), key, value, &result); + } else { + // The root is an inner node + assert( *reinterpret_cast<NodeType*> + (root) == NODE_INNER ); + was_split= inner_insert(reinterpret_cast<InnerNode*> + (root), depth, key, value, &result); + } + if( was_split ) { + // The old root was splitted in two parts. + // We have to create a new root pointing to them + depth++; + root= new_inner_node(); + InnerNode* rootProxy= + reinterpret_cast<InnerNode*>(root); + rootProxy->num_keys= 1; + rootProxy->keys[0]= result.key; + rootProxy->children[0]= result.left; + rootProxy->children[1]= result.right; + } } - const LeafNode* leaf= reinterpret_cast<const LeafNode*>(node); - assert( leaf->type == NODE_LEAF ); - index= leaf_position_for(key, leaf->keys, leaf->num_keys); - if( leaf->keys[index] == key ) { - if( value != 0 ) { - *value= leaf->values[index]; + + // Looks for the given key. If it is not found, it returns false, + // if it is found, it returns true and copies the associated value + // unless the pointer is null. + bool find(const KEY& key, VALUE* value= 0) const { + const InnerNode* inner; + register const void* node= root; + register unsigned d= depth, index; + while( d-- != 0 ) { + inner= reinterpret_cast<const InnerNode*>(node); + assert( inner->type == NODE_INNER ); + index= inner_position_for(key, inner->keys, inner->num_keys); + node= inner->children[index]; } - if (leaf->values[index]) - return true; - else return false; - } else { - return false; + const LeafNode* leaf= reinterpret_cast<const LeafNode*>(node); + assert( leaf->type == NODE_LEAF ); + index= leaf_position_for(key, leaf->keys, leaf->num_keys); + if( leaf->keys[index] == key ) { + if( value != 0 ) { + *value= leaf->values[index]; + } + if (leaf->values[index]) + return true; + else return false; + } else { + return false; + } } -} -// Looks for the given key. If it is not found, it returns false, -// if it is found, it returns true and sets -// the associated value to NULL -// Note: del currently leaks memory. Fix later. -bool del(const KEY& key) { - InnerNode* inner; - register void* node= root; - register unsigned d= depth, index; - while( d-- != 0 ) { - inner= reinterpret_cast<InnerNode*>(node); - assert( inner->type == NODE_INNER ); - index= inner_position_for(key, inner->keys, inner->num_keys); - node= inner->children[index]; + // Looks for the given key. If it is not found, it returns false, + // if it is found, it returns true and sets + // the associated value to NULL + // Note: del currently leaks memory. Fix later. + bool del(const KEY& key) { + InnerNode* inner; + register void* node= root; + register unsigned d= depth, index; + while( d-- != 0 ) { + inner= reinterpret_cast<InnerNode*>(node); + assert( inner->type == NODE_INNER ); + index= inner_position_for(key, inner->keys, inner->num_keys); + node= inner->children[index]; + } + LeafNode* leaf= reinterpret_cast<LeafNode*>(node); + assert( leaf->type == NODE_LEAF ); + index= leaf_position_for(key, leaf->keys, leaf->num_keys); + if( leaf->keys[index] == key ) { + leaf->values[index] = 0; + --size_; + return true; + } else { + return false; + } } - LeafNode* leaf= reinterpret_cast<LeafNode*>(node); - assert( leaf->type == NODE_LEAF ); - index= leaf_position_for(key, leaf->keys, leaf->num_keys); - if( leaf->keys[index] == key ) { - leaf->values[index] = 0; - --size_; - return true; - } else { - return false; - } -} -// Finds the LAST item that is < key. That is, the next item in the tree is not < key, but this -// item is. If we were to insert key into the tree, it would go after this item. This is weird, -// but is easier than implementing iterators. In STL terms, this would be "lower_bound(key)--" -// WARNING: This does *not* work when values are deleted. Thankfully, TPC-C does not use deletes. -bool findLastLessThan(const KEY& key, VALUE* value = 0, KEY* out_key = 0) const { + // Finds the LAST item that is < key. That is, the next item in the tree is not < key, but this + // item is. If we were to insert key into the tree, it would go after this item. This is weird, + // but is easier than implementing iterators. In STL terms, this would be "lower_bound(key)--" + // WARNING: This does *not* work when values are deleted. Thankfully, TPC-C does not use deletes. + bool findLastLessThan(const KEY& key, VALUE* value = 0, KEY* out_key = 0) const { const void* node = root; unsigned int d = depth; while( d-- != 0 ) { - const InnerNode* inner = reinterpret_cast<const InnerNode*>(node); - assert( inner->type == NODE_INNER ); - unsigned int pos = inner_position_for(key, inner->keys, inner->num_keys); - // We need to rewind in the case where they are equal - if (pos > 0 && key == inner->keys[pos-1]) { - pos -= 1; - } - assert(pos == 0 || inner->keys[pos-1] < key); - node = inner->children[pos]; + const InnerNode* inner = reinterpret_cast<const InnerNode*>(node); + assert( inner->type == NODE_INNER ); + unsigned int pos = inner_position_for(key, inner->keys, inner->num_keys); + // We need to rewind in the case where they are equal + if (pos > 0 && key == inner->keys[pos-1]) { + pos -= 1; + } + assert(pos == 0 || inner->keys[pos-1] < key); + node = inner->children[pos]; } const LeafNode* leaf= reinterpret_cast<const LeafNode*>(node); assert( leaf->type == NODE_LEAF ); unsigned int pos = leaf_position_for(key, leaf->keys, leaf->num_keys); if (pos <= leaf->num_keys) { + pos -= 1; + if (pos < leaf->num_keys && key == leaf->keys[pos]) { pos -= 1; - if (pos < leaf->num_keys && key == leaf->keys[pos]) { - pos -= 1; - } + } - if (pos < leaf->num_keys) { - assert(leaf->keys[pos] < key); - if (leaf->values[pos]) { - if (value != NULL) { - *value = leaf->values[pos]; - } - if (out_key != NULL) { - *out_key = leaf->keys[pos]; - } - return true; - } + if (pos < leaf->num_keys) { + assert(leaf->keys[pos] < key); + if (leaf->values[pos]) { + if (value != NULL) { + *value = leaf->values[pos]; + } + if (out_key != NULL) { + *out_key = leaf->keys[pos]; + } + return true; } + } } return false; -} + } - // Returns the size of an inner node - // It is useful when optimizing performance with cache alignment. - unsigned sizeof_inner_node() const { - return sizeof(InnerNode); - } + // Returns the size of an inner node + // It is useful when optimizing performance with cache alignment. + unsigned sizeof_inner_node() const { + return sizeof(InnerNode); + } - // Returns the size of a leaf node. - // It is useful when optimizing performance with cache alignment. - unsigned sizeof_leaf_node() const { - return sizeof(LeafNode); - } + // Returns the size of a leaf node. + // It is useful when optimizing performance with cache alignment. + unsigned sizeof_leaf_node() const { + return sizeof(LeafNode); + } - size_t size() const { return size_; } - + size_t size() const { return size_; } private: - // Used when debugging - enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; + // Used when debugging + enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; - // Leaf nodes store pairs of keys and values. - struct LeafNode { + // Leaf nodes store pairs of keys and values. + struct LeafNode { #ifndef NDEBUG - LeafNode() : type(NODE_LEAF), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} - const NodeType type; + LeafNode() : type(NODE_LEAF), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + const NodeType type; #else - LeafNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + LeafNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} #endif - unsigned num_keys; - KEY keys[M]; - VALUE values[M]; - // unsigned char _pad[LEAF_NODE_PADDING]; - }; + unsigned num_keys; + KEY keys[M]; + VALUE values[M]; + // unsigned char _pad[LEAF_NODE_PADDING]; + }; - // Inner nodes store pointers to other nodes interleaved with keys. - struct InnerNode { + // Inner nodes store pointers to other nodes interleaved with keys. + struct InnerNode { #ifndef NDEBUG - InnerNode() : type(NODE_INNER), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} - const NodeType type; + InnerNode() : type(NODE_INNER), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + const NodeType type; #else - InnerNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + InnerNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} #endif - unsigned num_keys; - KEY keys[N]; - void* children[N+1]; - // unsigned char _pad[INNER_NODE_PADDING]; - }; + unsigned num_keys; + KEY keys[N]; + void* children[N+1]; + // unsigned char _pad[INNER_NODE_PADDING]; + }; - // Custom allocator that returns aligned blocks of memory - template <unsigned ALIGNMENT> - struct AlignedMemoryAllocator { - typedef std::size_t size_type; - typedef std::ptrdiff_t difference_type; - - static char* malloc(const size_type bytes) - { - void* result; - if( posix_memalign(&result, ALIGNMENT, bytes) != 0 ) { - result= 0; - } - // Alternative: result= std::malloc(bytes); - return reinterpret_cast<char*>(result); - } - static void free(char* const block) - { std::free(block); } - }; + // Custom allocator that returns aligned blocks of memory + template <unsigned ALIGNMENT> + struct AlignedMemoryAllocator { + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; - // Returns a pointer to a fresh leaf node. - LeafNode* new_leaf_node() { - LeafNode* result; - //result= new LeafNode(); - result= leafPool.construct(); - //cout << "New LeafNode at " << result << endl; - return result; + static char* malloc(const size_type bytes) + { + void* result; + if( posix_memalign(&result, ALIGNMENT, bytes) != 0 ) { + result= 0; } + // Alternative: result= std::malloc(bytes); + return reinterpret_cast<char*>(result); + } + static void free(char* const block) + { std::free(block); } + }; - // Frees a leaf node previously allocated with new_leaf_node() - void delete_leaf_node(LeafNode* node) { - assert( node->type == NODE_LEAF ); - //cout << "Deleting LeafNode at " << node << endl; - // Alternatively: delete node; - leafPool.destroy(node); - } + // Returns a pointer to a fresh leaf node. + LeafNode* new_leaf_node() { + LeafNode* result; + //result= new LeafNode(); + result= leafPool.construct(); + //cout << "New LeafNode at " << result << endl; + return result; + } - // Returns a pointer to a fresh inner node. - InnerNode* new_inner_node() { - InnerNode* result; - // Alternatively: result= new InnerNode(); - result= innerPool.construct(); - //cout << "New InnerNode at " << result << endl; - return result; - } + // Frees a leaf node previously allocated with new_leaf_node() + void delete_leaf_node(LeafNode* node) { + assert( node->type == NODE_LEAF ); + //cout << "Deleting LeafNode at " << node << endl; + // Alternatively: delete node; + leafPool.destroy(node); + } - // Frees an inner node previously allocated with new_inner_node() - void delete_inner_node(InnerNode* node) { - assert( node->type == NODE_INNER ); - //cout << "Deleting InnerNode at " << node << endl; - // Alternatively: delete node; - innerPool.destroy(node); - } - - // Data type returned by the private insertion methods. - struct InsertionResult { - KEY key; - void* left; - void* right; - }; + // Returns a pointer to a fresh inner node. + InnerNode* new_inner_node() { + InnerNode* result; + // Alternatively: result= new InnerNode(); + result= innerPool.construct(); + //cout << "New InnerNode at " << result << endl; + return result; + } - // Returns the position where 'key' should be inserted in a leaf node - // that has the given keys. - static unsigned leaf_position_for(const KEY& key, const KEY* keys, - unsigned num_keys) { - // Simple linear search. Faster for small values of N or M - unsigned k= 0; - while((k < num_keys) && (keys[k]<key)) { - ++k; - } - return k; - /* - // Binary search. It is faster when N or M is > 100, - // but the cost of the division renders it useless - // for smaller values of N or M. - XXX--- needs to be re-checked because the linear search - has changed since the last update to the following ---XXX - int left= -1, right= num_keys, middle; - while( right -left > 1 ) { - middle= (left+right)/2; - if( keys[middle] < key) { - left= middle; - } else { - right= middle; - } - } - //assert( right == k ); - return unsigned(right); - */ - } + // Frees an inner node previously allocated with new_inner_node() + void delete_inner_node(InnerNode* node) { + assert( node->type == NODE_INNER ); + //cout << "Deleting InnerNode at " << node << endl; + // Alternatively: delete node; + innerPool.destroy(node); + } - // Returns the position where 'key' should be inserted in an inner node - // that has the given keys. - static inline unsigned inner_position_for(const KEY& key, const KEY* keys, - unsigned num_keys) { - // Simple linear search. Faster for small values of N or M - unsigned k= 0; - while((k < num_keys) && ((keys[k]<key) || (keys[k]==key))) { - ++k; - } - return k; - // Binary search is faster when N or M is > 100, - // but the cost of the division renders it useless - // for smaller values of N or M. - } + // Data type returned by the private insertion methods. + struct InsertionResult { + KEY key; + void* left; + void* right; + }; - bool leaf_insert(LeafNode* node, KEY& key, - VALUE& value, InsertionResult* result) { - assert( node->type == NODE_LEAF ); - assert( node->num_keys <= M ); - bool was_split= false; - // Simple linear search - unsigned i= leaf_position_for(key, node->keys, node->num_keys); - if( node->num_keys == M ) { - // The node was full. We must split it - unsigned treshold= (M+1)/2; - LeafNode* new_sibling= new_leaf_node(); - new_sibling->num_keys= node->num_keys -treshold; - for(unsigned j=0; j < new_sibling->num_keys; ++j) { - new_sibling->keys[j]= node->keys[treshold+j]; - new_sibling->values[j]= - node->values[treshold+j]; - } - node->num_keys= treshold; - if( i < treshold ) { - // Inserted element goes to left sibling - leaf_insert_nonfull(node, key, value, i); - } else { - // Inserted element goes to right sibling - leaf_insert_nonfull(new_sibling, key, value, - i-treshold); - } - // Notify the parent about the split - was_split= true; - result->key= new_sibling->keys[0]; - result->left= node; - result->right= new_sibling; - } else { - // The node was not full - leaf_insert_nonfull(node, key, value, i); - } - return was_split; - } + // Returns the position where 'key' should be inserted in a leaf node + // that has the given keys. + static unsigned leaf_position_for(const KEY& key, const KEY* keys, + unsigned num_keys) { + // Simple linear search. Faster for small values of N or M + unsigned k= 0; + while((k < num_keys) && (keys[k]<key)) { + ++k; + } + return k; + /* + // Binary search. It is faster when N or M is > 100, + // but the cost of the division renders it useless + // for smaller values of N or M. + XXX--- needs to be re-checked because the linear search + has changed since the last update to the following ---XXX + int left= -1, right= num_keys, middle; + while( right -left > 1 ) { + middle= (left+right)/2; + if( keys[middle] < key) { + left= middle; + } else { + right= middle; + } + } + //assert( right == k ); + return unsigned(right); + */ + } - void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, - unsigned index) { - assert( node->type == NODE_LEAF ); - assert( node->num_keys < M ); - assert( index <= M ); - assert( index <= node->num_keys ); - if( (index < M) && - (node->keys[index] == key) ) { - // We are inserting a duplicate value. - // Simply overwrite the old one - node->values[index]= value; - } else { - // The key we are inserting is unique - for(unsigned i=node->num_keys; i > index; --i) { - node->keys[i]= node->keys[i-1]; - node->values[i]= node->values[i-1]; - } - node->num_keys++; - node->keys[index]= key; - node->values[index]= value; - ++size_; - } - } + // Returns the position where 'key' should be inserted in an inner node + // that has the given keys. + static inline unsigned inner_position_for(const KEY& key, const KEY* keys, + unsigned num_keys) { + // Simple linear search. Faster for small values of N or M + unsigned k= 0; + while((k < num_keys) && ((keys[k]<key) || (keys[k]==key))) { + ++k; + } + return k; + // Binary search is faster when N or M is > 100, + // but the cost of the division renders it useless + // for smaller values of N or M. + } - bool inner_insert(InnerNode* node, unsigned current_depth, KEY& key, - VALUE& value, InsertionResult* result) { - assert( node->type == NODE_INNER ); - assert( current_depth != 0 ); - // Early split if node is full. - // This is not the canonical algorithm for B+ trees, - // but it is simpler and does not break the definition. - bool was_split= false; - if( node->num_keys == N ) { - // Split - unsigned treshold= (N+1)/2; - InnerNode* new_sibling= new_inner_node(); - new_sibling->num_keys= node->num_keys -treshold; - for(unsigned i=0; i < new_sibling->num_keys; ++i) { - new_sibling->keys[i]= node->keys[treshold+i]; - new_sibling->children[i]= - node->children[treshold+i]; - } - new_sibling->children[new_sibling->num_keys]= - node->children[node->num_keys]; - node->num_keys= treshold-1; - // Set up the return variable - was_split= true; - result->key= node->keys[treshold-1]; - result->left= node; - result->right= new_sibling; - // Now insert in the appropriate sibling - if( key < result->key ) { - inner_insert_nonfull(node, current_depth, key, value); - } else { - inner_insert_nonfull(new_sibling, current_depth, key, - value); - } - } else { - // No split - inner_insert_nonfull(node, current_depth, key, value); - } - return was_split; - } + bool leaf_insert(LeafNode* node, KEY& key, + VALUE& value, InsertionResult* result) { + assert( node->type == NODE_LEAF ); + assert( node->num_keys <= M ); + bool was_split= false; + // Simple linear search + unsigned i= leaf_position_for(key, node->keys, node->num_keys); + if( node->num_keys == M ) { + // The node was full. We must split it + unsigned treshold= (M+1)/2; + LeafNode* new_sibling= new_leaf_node(); + new_sibling->num_keys= node->num_keys -treshold; + for(unsigned j=0; j < new_sibling->num_keys; ++j) { + new_sibling->keys[j]= node->keys[treshold+j]; + new_sibling->values[j]= + node->values[treshold+j]; + } + node->num_keys= treshold; + if( i < treshold ) { + // Inserted element goes to left sibling + leaf_insert_nonfull(node, key, value, i); + } else { + // Inserted element goes to right sibling + leaf_insert_nonfull(new_sibling, key, value, + i-treshold); + } + // Notify the parent about the split + was_split= true; + result->key= new_sibling->keys[0]; + result->left= node; + result->right= new_sibling; + } else { + // The node was not full + leaf_insert_nonfull(node, key, value, i); + } + return was_split; + } - void inner_insert_nonfull(InnerNode* node, unsigned current_depth, KEY& key, - VALUE& value) { - assert( node->type == NODE_INNER ); - assert( node->num_keys < N ); - assert( current_depth != 0 ); - // Simple linear search - unsigned index= inner_position_for(key, node->keys, - node->num_keys); - // GCC warns that this may be used uninitialized, even though that is untrue. - InsertionResult result = { KEY(), 0, 0 }; - bool was_split; - if( current_depth-1 == 0 ) { - // The children are leaf nodes - for(unsigned kk=0; kk < node->num_keys+1; ++kk) { - assert( *reinterpret_cast<NodeType*> - (node->children[kk]) == NODE_LEAF ); - } - was_split= leaf_insert(reinterpret_cast<LeafNode*> - (node->children[index]), key, value, &result); - } else { - // The children are inner nodes - for(unsigned kk=0; kk < node->num_keys+1; ++kk) { - assert( *reinterpret_cast<NodeType*> - (node->children[kk]) == NODE_INNER ); - } - InnerNode* child= reinterpret_cast<InnerNode*> - (node->children[index]); - was_split= inner_insert( child, current_depth-1, key, value, - &result); - } - if( was_split ) { - if( index == node->num_keys ) { - // Insertion at the rightmost key - node->keys[index]= result.key; - node->children[index]= result.left; - node->children[index+1]= result.right; - node->num_keys++; - } else { - // Insertion not at the rightmost key - node->children[node->num_keys+1]= - node->children[node->num_keys]; - for(unsigned i=node->num_keys; i!=index; --i) { - node->children[i]= node->children[i-1]; - node->keys[i]= node->keys[i-1]; - } - node->children[index]= result.left; - node->children[index+1]= result.right; - node->keys[index]= result.key; - node->num_keys++; - } - } // else the current node is not affected + void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, + unsigned index) { + assert( node->type == NODE_LEAF ); + assert( node->num_keys < M ); + assert( index <= M ); + assert( index <= node->num_keys ); + if( (index < M) && + (node->keys[index] == key) ) { + // We are inserting a duplicate value. + // Simply overwrite the old one + node->values[index]= value; + } else { + // The key we are inserting is unique + for(unsigned i=node->num_keys; i > index; --i) { + node->keys[i]= node->keys[i-1]; + node->values[i]= node->values[i-1]; + } + node->num_keys++; + node->keys[index]= key; + node->values[index]= value; + ++size_; + } + } + + bool inner_insert(InnerNode* node, unsigned current_depth, KEY& key, + VALUE& value, InsertionResult* result) { + assert( node->type == NODE_INNER ); + assert( current_depth != 0 ); + // Early split if node is full. + // This is not the canonical algorithm for B+ trees, + // but it is simpler and does not break the definition. + bool was_split= false; + if( node->num_keys == N ) { + // Split + unsigned treshold= (N+1)/2; + InnerNode* new_sibling= new_inner_node(); + new_sibling->num_keys= node->num_keys -treshold; + for(unsigned i=0; i < new_sibling->num_keys; ++i) { + new_sibling->keys[i]= node->keys[treshold+i]; + new_sibling->children[i]= + node->children[treshold+i]; + } + new_sibling->children[new_sibling->num_keys]= + node->children[node->num_keys]; + node->num_keys= treshold-1; + // Set up the return variable + was_split= true; + result->key= node->keys[treshold-1]; + result->left= node; + result->right= new_sibling; + // Now insert in the appropriate sibling + if( key < result->key ) { + inner_insert_nonfull(node, current_depth, key, value); + } else { + inner_insert_nonfull(new_sibling, current_depth, key, + value); + } + } else { + // No split + inner_insert_nonfull(node, current_depth, key, value); + } + return was_split; + } + + void inner_insert_nonfull(InnerNode* node, unsigned current_depth, KEY& key, + VALUE& value) { + assert( node->type == NODE_INNER ); + assert( node->num_keys < N ); + assert( current_depth != 0 ); + // Simple linear search + unsigned index= inner_position_for(key, node->keys, + node->num_keys); + // GCC warns that this may be used uninitialized, even though that is untrue. + InsertionResult result = { KEY(), 0, 0 }; + bool was_split; + if( current_depth-1 == 0 ) { + // The children are leaf nodes + for(unsigned kk=0; kk < node->num_keys+1; ++kk) { + assert( *reinterpret_cast<NodeType*> + (node->children[kk]) == NODE_LEAF ); + } + was_split= leaf_insert(reinterpret_cast<LeafNode*> + (node->children[index]), key, value, &result); + } else { + // The children are inner nodes + for(unsigned kk=0; kk < node->num_keys+1; ++kk) { + assert( *reinterpret_cast<NodeType*> + (node->children[kk]) == NODE_INNER ); + } + InnerNode* child= reinterpret_cast<InnerNode*> + (node->children[index]); + was_split= inner_insert( child, current_depth-1, key, value, + &result); + } + if( was_split ) { + if( index == node->num_keys ) { + // Insertion at the rightmost key + node->keys[index]= result.key; + node->children[index]= result.left; + node->children[index+1]= result.right; + node->num_keys++; + } else { + // Insertion not at the rightmost key + node->children[node->num_keys+1]= + node->children[node->num_keys]; + for(unsigned i=node->num_keys; i!=index; --i) { + node->children[i]= node->children[i-1]; + node->keys[i]= node->keys[i-1]; } + node->children[index]= result.left; + node->children[index+1]= result.right; + node->keys[index]= result.key; + node->num_keys++; + } + } // else the current node is not affected + } - typedef AlignedMemoryAllocator<NODE_ALIGNMENT> AlignedAllocator; + typedef AlignedMemoryAllocator<NODE_ALIGNMENT> AlignedAllocator; - // Node memory allocators. IMPORTANT NOTE: they must be declared - // before the root to make sure that they are properly initialised - // before being used to allocate any node. - boost::object_pool<InnerNode, AlignedAllocator> innerPool; - boost::object_pool<LeafNode, AlignedAllocator> leafPool; - // Depth of the tree. A tree of depth 0 only has a leaf node. - unsigned depth; - // Pointer to the root node. It may be a leaf or an inner node, but - // it is never null. - void* root; - size_t size_; + // Node memory allocators. IMPORTANT NOTE: they must be declared + // before the root to make sure that they are properly initialised + // before being used to allocate any node. + boost::object_pool<InnerNode, AlignedAllocator> innerPool; + boost::object_pool<LeafNode, AlignedAllocator> leafPool; + // Depth of the tree. A tree of depth 0 only has a leaf node. + unsigned depth; + // Pointer to the root node. It may be a leaf or an inner node, but + // it is never null. + void* root; + size_t size_; }; #endif // !defined BPLUSTREE_HPP_227824 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-17 02:35:07
|
Revision: 1295 http://assorted.svn.sourceforge.net/assorted/?rev=1295&view=rev Author: yangzhang Date: 2009-03-17 02:34:49 +0000 (Tue, 17 Mar 2009) Log Message: ----------- added size-tracking to btree Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpcctables.cc ydb/trunk/src/tpcc/tpcctables.h Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/README 2009-03-17 02:34:49 UTC (rev 1295) @@ -684,6 +684,59 @@ - stock 100k 306 - item 100k 82 +- DONE add size-tracking to btree + - new order seems to be causing the store to grow nearly linearly + - initially: + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 30000 + |ordersBC| = 30000 + |orderlines| = 299484 + |cbn| = 30000 + |neworders| = 9000 + |history| = 30000 + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 30000 + |ordersBC| = 30000 + |orderlines| = 299462 + |cbn| = 30000 + |neworders| = 9000 + |history| = 30000 + + - with 1000 txns: + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 3099 (almost 30000+1000)1 + |ordersBC| = 30991 + |orderlines| = 309477 (almost 300000+1000*10) + |cbn| = 30000 + |neworders| = 9991 + |history| = 30000 + + - with 100000 txns (100x greater than 1000): + + |warehouses| = 1 + |stock| = 100000 + |districts| = 10 + |customers| = 30000 + |orders| = 128973 (almost 30000+100000) + |ordersBC| = 128973 + |orderlines| = 1288365 (almost 300000+100000*10) + |cbn| = 30000 + |neworders| = 107973 + |history| = 30000 + +- TODO add iteration to btree - TODO try integrating TPC-C; just get txns working for now - TODO implement TPC-C recovery Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/main.lzz.clamp 2009-03-17 02:34:49 UTC (rev 1295) @@ -1758,21 +1758,7 @@ bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); st_joining join_issue_txns(my_spawn(f, "issue_txns")); - finally fin(lambda () { - cout << "LEADER SUMMARY" << endl; - cout << "- total updates = " << updates << endl; - cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " - << g_map.size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const entry &p, g_map) { - of << p.first << ": " << p.second << endl; - } - } - }); + finally fin(bind(summarize, "LEADER", ref(seqno))); try { // Start handling responses. @@ -1825,6 +1811,36 @@ } } +void +summarize(const char *role, int seqno) +{ + cout << role << " SUMMARY\n"; + if (do_tpcc) { + cout << "seqno: " << seqno << endl; + if (g_tables != nullptr) { + cout << "state:\n"; + g_tables->show(); + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + // XXX iterate & dump + } + } + } else { + cout << "- total updates = " << updates << "\n" + << "- final DB state: seqno = " << seqno << ", size = " + << g_map.size() << endl; + string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); + if (dump) { + cout << "- dumping to " << fname << endl; + ofstream of(fname.c_str()); + of << "seqno: " << seqno << endl; + foreach (const entry &p, g_map) { + of << p.first << ": " << p.second << endl; + } + } + } +} + /** * Run a replica. */ @@ -1840,25 +1856,8 @@ } // Initialize database state. + int seqno = -1; mii &map = g_map; - int seqno = -1; - finally f(lambda () { - cout << "REPLICA SUMMARY" << endl; - cout << "- total updates = " << updates << endl; - cout << "- final DB state: seqno = " << __ref(seqno) << ", size = " - << __ref(map).size() << endl; - string fname = string("/tmp/ydb") + lexical_cast<string>(getpid()); - if (dump) { - cout << "- dumping to " << fname << endl; - ofstream of(fname.c_str()); - of << "seqno: " << __ref(seqno) << endl; - foreach (const entry &p, __ref(map)) { - of << p.first << ": " << p.second << endl; - } - } - }); - st_channel<recovery_t> send_states; - if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -1884,8 +1883,12 @@ } cout << "loaded " << nwarehouses << " warehouses in " << current_time_millis() - start_time << " ms" << endl; + tables->show(); } + finally f(bind(summarize, "REPLICA", ref(seqno))); + st_channel<recovery_t> send_states; + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/btree.h 2009-03-17 02:34:49 UTC (rev 1295) @@ -47,7 +47,8 @@ // Builds a new empty tree. BPlusTree() : depth(0), - root(new_leaf_node()) + root(new_leaf_node()), + size_(0) { // DEBUG // cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl; @@ -140,6 +141,7 @@ index= leaf_position_for(key, leaf->keys, leaf->num_keys); if( leaf->keys[index] == key ) { leaf->values[index] = 0; + --size_; return true; } else { return false; @@ -201,6 +203,8 @@ unsigned sizeof_leaf_node() const { return sizeof(LeafNode); } + + size_t size() const { return size_; } private: @@ -378,7 +382,7 @@ return was_split; } - static void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, + void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, unsigned index) { assert( node->type == NODE_LEAF ); assert( node->num_keys < M ); @@ -398,6 +402,7 @@ node->num_keys++; node->keys[index]= key; node->values[index]= value; + ++size_; } } @@ -506,6 +511,7 @@ // Pointer to the root node. It may be a leaf or an inner node, but // it is never null. void* root; + size_t size_; }; #endif // !defined BPLUSTREE_HPP_227824 Modified: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-17 02:34:49 UTC (rev 1295) @@ -5,6 +5,7 @@ #include <vector> #include <commons/assert.h> +#include <iostream> #include "stlutil.h" using std::vector; @@ -605,3 +606,17 @@ History* h = new History(history); history_.push_back(h); } + +void TPCCTables::show() const { + using namespace std; + cout << "|warehouses| = " << warehouses_.size() << "\n" + << "|stock| = " << stock_.size() << "\n" + << "|districts| = " << districts_.size() << "\n" + << "|customers| = " << customers_.size() << "\n" + << "|orders| = " << orders_.size() << "\n" + << "|ordersBC| = " << orders_by_customer_.size() << "\n" + << "|orderlines| = " << orderlines_.size() << "\n" + << "|cbn| = " << customers_by_name_.size() << "\n" + << "|neworders| = " << neworders_.size() << "\n" + << "|history| = " << history_.size() << endl; +} Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-03-16 22:32:07 UTC (rev 1294) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-03-17 02:34:49 UTC (rev 1295) @@ -67,6 +67,8 @@ const std::vector<const History*>& history() const { return history_; } void insertHistory(const History& history); + void show() const; + static const int KEYS_PER_INTERNAL = 8; static const int KEYS_PER_LEAF = 8; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-16 22:32:32
|
Revision: 1294 http://assorted.svn.sourceforge.net/assorted/?rev=1294&view=rev Author: yangzhang Date: 2009-03-16 22:32:07 +0000 (Mon, 16 Mar 2009) Log Message: ----------- started integrating tpcc txn processing into ydb; currently only handles new order txns and no recovery Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/README 2009-03-16 22:32:07 UTC (rev 1294) @@ -684,8 +684,10 @@ - stock 100k 306 - item 100k 82 -- TODO try integrating TPC-C +- TODO try integrating TPC-C; just get txns working for now +- TODO implement TPC-C recovery + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/main.lzz.clamp 2009-03-16 22:32:07 UTC (rev 1294) @@ -93,13 +93,14 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, + use_pb, use_pb_res, g_caught_up, rec_pwal, do_tpcc, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; st_bool do_pause; +bool stopped_issuing; // Statistics. int updates; @@ -326,6 +327,17 @@ //typedef string ser_t; typedef ser_array ser_t; +template<typename T> +void +ser(writer &w, const T &msg) +{ + uint32_t len = msg.ByteSize(); + w.mark(); + w.reserve(len); + check(msg.SerializeToArray(w.cur(), len)); + w.skip(len); +} + /** * Serialization. * @@ -564,6 +576,14 @@ check(msg.ParseFromArray(src.read(len), len)); } +template<typename T> +inline void +readmsg(anchored_stream_reader &src, T &msg) +{ + uint32_t len = ntohl(src.read<uint32_t>()); + check(msg.ParseFromArray(src.read(len), len)); +} + enum { op_del, op_write, op_commit }; /** @@ -885,6 +905,145 @@ } #end +unique_ptr<TPCCTables> g_tables; + +void +process_tpcc(const TpccReq &req, int &seqno, TpccRes *res) +{ + checkeq(req.seqno(), seqno + 1); + ++seqno; + if (req.has_new_order()) { + const NewOrderMsg &no = req.new_order(); + vector<NewOrderItem> items(no.item_size()); + for (int i = 0; i < no.item_size(); ++i) { + NewOrderItem &dst = items[i]; + const NewOrderItemMsg &src = no.item(i); + dst.i_id = src.i_id(); + dst.ol_supply_w_id = src.ol_supply_w_id(); + dst.ol_quantity = src.ol_quantity(); + } + NewOrderOutput output; + g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), + items, no.now().c_str(), &output); + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); + } else { + throw_not_implemented(); + } +} + +void +process_tpccs(st_netfd_t leader, int &seqno, + st_channel<recovery_t> &send_states, + st_channel<chunk> &backlog, int init_seqno, + int mypos, int nnodes) +{ + bool caught_up = init_seqno == 0; + long long start_time = current_time_millis(), + time_caught_up = caught_up ? start_time : -1; + int seqno_caught_up = caught_up ? seqno : -1; + // Used by joiner only to tell where we actually started (init_seqno is just + // the seqno reported by the leader in the Init message, but it may have + // issued more since the Init message). + int first_seqno = -1; + + function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { + sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); + memcpy(buf.get(), reader.start(), reader.unread()); + swap(buf, reader.buf()); + __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); + reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); + }; + + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + commons::array<char> wbuf(buf_size); + anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), + overflow_fn, rbuf.get(), rbuf.size()); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, wbuf.get(), wbuf.size()); + + finally f(lambda () { + long long now = current_time_millis(); + stopped_issuing = true; + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(recovery_t()); + __ref(w).mark_and_flush(); + }); + + TpccReq req; + TpccRes res; + + while (true) + { + { + st_intr intr(stop_hub); + readmsg(reader, req); + } + + if (req.seqno() == -1) { + // End of stream. + break; + } else if (req.seqno() == -2) { + // Prepare recovery msg. + send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); + } else { + // Backlog (auto-implicit) or process. + if (check_interval(req.seqno(), process_display)) + cout << "processing req " << req.seqno() << endl; + if (req.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + // Process. + process_tpcc(req, seqno, &res); + ser(w, res); + reader.set_anchor(); + } + } + } + +} + +// XXX +recovery_t +make_tpcc_recovery(int mypos, int nnodes, int seqno) +{ + mypos = nnodes = seqno; + throw_not_implemented(); + return recovery_t(); +} + /** * Actually do the work of executing a transaction and sending back the reply. * @@ -936,7 +1095,6 @@ sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); st_reader reader(leader, rbuf.get(), rbuf.size()); - vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), static_cast<ssize_t>(len)); @@ -998,7 +1156,7 @@ } } else { st_intr intr(stop_hub); - prefix = reader.read<uint32_t>(); + prefix = ntohl(reader.read<uint32_t>()); check(prefix < 10000); batch.Clear(); } @@ -1103,7 +1261,7 @@ #if 0 template<typename mii> unique_ptr<Recovery> -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { // TODO make this faster cout << "generating recovery..." << endl; @@ -1134,7 +1292,7 @@ template<typename mii> recovery_t -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { return recovery_t(); } @@ -1156,7 +1314,7 @@ template<> recovery_t -make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int &seqno) +make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int seqno) { const commons::array<entry> &src = map.get_table(); pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); @@ -1233,7 +1391,7 @@ try { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -1251,7 +1409,7 @@ // to get all the acks back). st_intr intr(kill_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } for (int i = 0; i < batch.res_size(); ++i) { @@ -1353,6 +1511,142 @@ h.run<Types>(); } +class tpcc_response_handler +{ +public: + tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + start_seqno(seqno), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + finally f(bind(&tpcc_response_handler::cleanup, this)); + + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(replica, rbuf.get(), rbuf.size()); + writer w(lambda(const void*, size_t) { + throw not_supported_exception("response handler should not be writing"); + }, wbuf.get(), wbuf.size()); + stream s(reader,w); + + long long last_display_time = current_time_millis(); + + function<void()> loop_cleanup = + bind(&tpcc_response_handler::loop_cleanup, this); + + TpccRes res; + + while (true) { + finally f(loop_cleanup); + + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (stopped_issuing) { + st_intr intr(stop_hub); + readmsg(reader, res); + } else { + st_intr intr(kill_hub); + readmsg(reader, res); + } + + if (res.seqno() < last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(res.seqno())); + + if (!caught_up) { + long long now = current_time_millis(), time_diff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": " << "recovering node caught up; took " + << time_diff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + + if (check_interval(res.seqno(), handle_responses_display)) { + cout << rid << ": " << "got response " << res.seqno() << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, res.seqno(), + res.seqno() - handle_responses_display); + last_display_time = display_time; + } + if (check_interval(res.seqno(), yield_interval)) { + st_sleep(0); + } + last_seqno = res.seqno(); + } + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = last_seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = last_seqno; + cout << rid << ": "; + showtput("during recovery, finished roughly", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + } + + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled roughly", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +void +handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + tpcc_response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); +} + /** * Help the recovering node. * @@ -1458,11 +1752,11 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function<void()> f = + foreach (const replica_info &r, replicas) newreps.push(r); + const function<void()> f = do_tpcc ? + bind(issue_tpcc, ref(newreps), ref(seqno), ref(accept_joiner)) : bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_thread_t issue_txns_thread = my_spawn(f, "issue_txns"); - foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_issue_txns(issue_txns_thread); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); finally fin(lambda () { cout << "LEADER SUMMARY" << endl; @@ -1485,9 +1779,14 @@ st_thread_group handlers; int rid = 0; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true), - "handle_responses")); + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + else + fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -1560,6 +1859,33 @@ }); st_channel<recovery_t> send_states; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + } + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. @@ -1608,8 +1934,12 @@ // Process txns. st_channel<chunk> backlog; - const function<void()> process_fn = - bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); @@ -1775,7 +2105,7 @@ reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { char *start = reader.start(); - uint32_t prefix = reader.read<uint32_t>(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); ASSERT(prefix < 10000); ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); @@ -1883,9 +2213,200 @@ } } +class st_tpcc : public TPCCDB +{ + private: + commons::array<char> a_; + writer writer_; + TpccReq req_; + int seqno_; + + public: + st_tpcc(const vector<st_netfd_t> &fds) : + a_(buf_size), + writer_(lambda(const void *buf, size_t len) { + foreach (st_netfd_t dst, __ref(fds)) + st_timed_write(dst, buf, len); + }, a_, buf_size) {} + + void flush() { writer_.mark_and_flush(); } + void set_seqno(int seqno) { seqno_ = seqno; } + + void sendRec() { + req_.Clear(); + req_.set_seqno(-2); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + void sendStop() { + req_.Clear(); + req_.set_seqno(-1); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + // Executes the TPC-C "slev" transaction. From the last 20 orders, returns the number of rows in + // the STOCK table that have S_QUANTITY < threshold. See TPC-C 2.8 (page 43). + int stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { + warehouse_id = district_id = threshold = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + OrderStatusOutput* output) { + warehouse_id = district_id = customer_id = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, + OrderStatusOutput* output) { + warehouse_id = district_id = 0; c_last = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C new order transaction. Enter the new order for customer_id into the + // database. See TPC-C 2.4 (page 27). Returns true if the transaction commits. + bool newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + const vector<NewOrderItem>& items, const char* now, + NewOrderOutput* output) { + req_.Clear(); + req_.set_seqno(seqno_); + + NewOrderMsg &new_order = *req_.mutable_new_order(); + new_order.set_warehouse_id(warehouse_id); + new_order.set_district_id(district_id); + new_order.set_customer_id(customer_id); + foreach (const NewOrderItem &item, items) { + NewOrderItemMsg &msg = *new_order.add_item(); + msg.set_i_id(item.i_id); + msg.set_ol_supply_w_id(item.ol_supply_w_id); + msg.set_ol_quantity(item.ol_quantity); + } + new_order.set_now(now); + + ser(writer_, req_); + output = nullptr; + return true; + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_district_id = c_warehouse_id = customer_id = 0; + h_amount = 0; now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, const char* c_last, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_warehouse_id = c_district_id = 0; + h_amount = 0; c_last = now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C delivery transaction. Delivers the oldest undelivered transaction in each + // district in warehouse_id. See TPC-C 2.7 (page 39). + void delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, + vector<DeliveryOrderInfo>* orders) { + warehouse_id = carrier_id = 0; now = 0; orders = 0; + throw_not_implemented(); + } +}; + void -run_tpcc() +issue_tpcc(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) { + vector<st_netfd_t> fds; + long long start_time = current_time_millis(); + + finally f(lambda () { + showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), + 0); + }); + + SystemClock* clock = new SystemClock(); + + // Change the constants for run + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + st_tpcc &tables = *new st_tpcc(fds); + TPCCClient client(clock, random, &tables, Item::NUM_ITEMS, nwarehouses, + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + while (!stop_hub) { + // Did we get a new member? If so, notify an arbitrary member (the first + // one) to prepare to send recovery information (by sending an + // empty/default Txn). + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { + tables.sendRec(); + } + + // Bring in any new members. + // TODO more efficient: copy/extend/append + while (!newreps.empty()) { + fds.push_back(newreps.take().fd()); + } + + tables.set_seqno(seqno); + client.doOne(); + + // Checkpoint. + if (check_interval(seqno, yield_interval)) st_sleep(0); + if (check_interval(seqno, issue_display)) { + cout << "issued txn " << seqno << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << seqno << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + } + + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + tables.flush(); + } + + // Are we to accept a new joiner? + if (seqno == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (seqno == stop_on_seqno) { + cout << "stopping on issue of seqno " << seqno << endl; + stop_hub.set(); + } + + ++seqno; + + // Pause? + if (do_pause) + do_pause.waitreset(); + } + + if (!fds.empty()) { + tables.sendStop(); + } +} + +void +run_tpcc_demo() +{ TPCCTables* tables = new TPCCTables(); SystemClock* clock = new SystemClock(); @@ -1996,6 +2517,8 @@ "exit after the joiner fully recovers (for leader)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader)") + ("tpcc", po::bool_switch(&do_tpcc), + "run the TPCC workload") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader)") ("accept-joiner-size,s", Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-16 22:32:07 UTC (rev 1294) @@ -144,6 +144,8 @@ // This is not strictly accurate: The requirement is for certain *minimum* percentages to be // maintained. This is close to the right thing, but not precisely correct. // See TPC-C 5.2.4 (page 68). + doNewOrder(); +#if 0 int x = generator_->number(1, 100); if (x <= 4) { // 4% doStockLevel(); @@ -157,6 +159,7 @@ ASSERT(x > 100-45); doNewOrder(); } +#endif } int32_t TPCCClient::generateWarehouse() { Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/ydb.proto 2009-03-16 22:32:07 UTC (rev 1294) @@ -85,3 +85,63 @@ message ResponseBatch { repeated Response res = 1; } + +// +// TPCC messages +// + +message NewOrderItemMsg { + required int32 i_id = 1; + required int32 ol_supply_w_id = 2; + required int32 ol_quantity = 3; +} + +message NewOrderMsg { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 customer_id = 3; + repeated NewOrderItemMsg item = 4; + required string now = 5; +} + +message ItemInfoMsg { + required int32 s_quantity = 1; + required float i_price = 2; + // TODO: Client can compute this from other values. + required float ol_amount = 3; + // char + required int32 brand_generic = 4; + required string i_name = 5; +} + +message NewOrderOutputMsg { + required float w_tax = 1; + required float d_tax = 2; + + // From district d_next_o_id + required int32 o_id = 3; + + required float c_discount = 4; + + // TODO: Client can compute this from other values. + required float total = 5; + + repeated ItemInfoMsg item = 6; + required string c_last = 7; + required string c_credit = 8; + + //static const int MAX_STATUS = 25; + //static const char INVALID_ITEM_STATUS[]; + //char status[MAX_STATUS+1]; + required string status = 9; +} + +message TpccReq { + required int32 seqno = 1; + optional NewOrderMsg new_order = 2; +} + +message TpccRes { + required int32 seqno = 1; + optional NewOrderOutputMsg new_order = 2; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-13 19:27:05
|
Revision: 1293 http://assorted.svn.sourceforge.net/assorted/?rev=1293&view=rev Author: yangzhang Date: 2009-03-13 19:26:56 +0000 (Fri, 13 Mar 2009) Log Message: ----------- integrated tpcc into ydb Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:26 UTC (rev 1292) +++ ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:56 UTC (rev 1293) @@ -1,4 +1,5 @@ #hdr +#define __STDC_FORMAT_MACROS #include <boost/archive/binary_iarchive.hpp> #include <boost/archive/binary_oarchive.hpp> #include <boost/bind.hpp> @@ -19,10 +20,11 @@ #include <csignal> // sigaction etc. #include <cstdio> #include <cstring> // strsignal -#include <iostream> #include <fstream> // ofstream #include <google/dense_hash_map> #include <gtest/gtest.h> +#include <inttypes.h> // PRId64 +#include <iostream> #include <malloc.h> #include <map> #include <netinet/in.h> // in_addr etc. @@ -33,6 +35,11 @@ #include <unistd.h> // pipe, write #include <vector> #include "ser.h" +#include "tpcc/clock.h" +#include "tpcc/randomgenerator.h" +#include "tpcc/tpccclient.h" +#include "tpcc/tpccgenerator.h" +#include "tpcc/tpcctables.h" #include "ydb.pb.h" #define function boost::function @@ -80,7 +87,7 @@ st_utime_t timeout; int yield_interval, accept_joiner_seqno, issuing_interval, min_ops, max_ops, stop_on_seqno, batch_size, handle_responses_display, - catch_up_display, issue_display, + catch_up_display, issue_display, nwarehouses, process_display; size_t accept_joiner_size, buf_size, read_buf_size; bool yield_during_build_up, yield_during_catch_up, dump, show_updates, @@ -1876,6 +1883,51 @@ } } +void +run_tpcc() +{ + TPCCTables* tables = new TPCCTables(); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + printf("Loading %d warehouses... ", nwarehouses); + fflush(stdout); + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + int64_t begin = clock->getMicroseconds(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + int64_t end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); + + // Change the constants for run + random = new RealRandomGenerator(); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + TPCCClient client(clock, random, tables, Item::NUM_ITEMS, static_cast<int>(nwarehouses), + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + printf("Running... "); + fflush(stdout); + begin = clock->getMicroseconds(); + for (int i = 0; i < 200000; ++i) { + client.doOne(); + } + end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); +} + /** * Initialization and command-line parsing. */ @@ -1971,6 +2023,9 @@ ("max-ops,O", po::value<int>(&max_ops)->default_value(5), "upper bound on randomly generated number of operations per txn (for leader)") + ("warehouses,w", + po::value<int>(&nwarehouses)->default_value(1), + "number of warehouses to model") ("accept-joiner-seqno,j", po::value<int>(&accept_joiner_seqno)->default_value(0), "accept recovering joiner (start recovery) after this seqno (for leader " This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-13 19:26:38
|
Revision: 1292 http://assorted.svn.sourceforge.net/assorted/?rev=1292&view=rev Author: yangzhang Date: 2009-03-13 19:26:26 +0000 (Fri, 13 Mar 2009) Log Message: ----------- integrated tpcc into build Modified Paths: -------------- ydb/trunk/src/Makefile Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-13 19:26:08 UTC (rev 1291) +++ ydb/trunk/src/Makefile 2009-03-13 19:26:26 UTC (rev 1292) @@ -15,9 +15,12 @@ GENSRCS := $(LZZSRCS) $(PBSRCS) GENOBJS := $(LZZOBJS) $(PBOBJS) +TPCC_OBJS:= clock randomgenerator tpccclient tpccdb tpccgenerator tpcctables +TPCC_OBJS:= $(foreach i,$(TPCC_OBJS),tpcc/$(i).o) + HDRS := $(GENHDRS) SRCS := $(GENSRCS) -OBJS := $(GENOBJS) +OBJS := $(GENOBJS) $(TPCC_OBJS) ifneq ($(GPROF),) GPROF := -pg @@ -96,7 +99,12 @@ %.pb.o: %.pb.cc %.pb.h $(CXX) -c $(PBCXXFLAGS) $(OUTPUT_OPTION) $< -%.o: %.cc $(PBHDRS) +main.o: main.cc $(PBHDRS) + +tpcc/%.o: tpcc/%.cc + make -C tpcc/ + +%.o: %.cc $(COMPILE.cc) $(OUTPUT_OPTION) $< %.cc %.hh: %.lzz This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-13 19:26:30
|
Revision: 1291 http://assorted.svn.sourceforge.net/assorted/?rev=1291&view=rev Author: yangzhang Date: 2009-03-13 19:26:08 +0000 (Fri, 13 Mar 2009) Log Message: ----------- simplified tpcc build; removed tpcc.cc Modified Paths: -------------- ydb/trunk/src/tpcc/Makefile Removed Paths: ------------- ydb/trunk/src/tpcc/tpcc.cc Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-12 19:54:39 UTC (rev 1290) +++ ydb/trunk/src/tpcc/Makefile 2009-03-13 19:26:08 UTC (rev 1291) @@ -8,17 +8,10 @@ # Link withthe C++ standard library LDFLAGS=-lstdc++ -BINARIES = tpcc # btree_test randomgenerator_test tpccclient_test tpcctables_test tpccgenerator_test tpcc +BINARIES = tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o all: $(BINARIES) -#btree_test: btree_test.o stupidunit.o -#randomgenerator_test: randomgenerator_test.o randomgenerator.o stupidunit.o -#tpccclient_test: tpccclient_test.o tpccclient.o randomgenerator.o stupidunit.o -#tpcctables_test: tpcctables_test.o tpcctables.o tpccdb.o randomgenerator.o stupidunit.o -#tpccgenerator_test: tpccgenerator_test.o tpccgenerator.o tpcctables.o tpccdb.o randomgenerator.o stupidunit.o -tpcc: tpcc.o tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o - clean : rm -f *.o *.d $(BINARIES) Deleted: ydb/trunk/src/tpcc/tpcc.cc =================================================================== --- ydb/trunk/src/tpcc/tpcc.cc 2009-03-12 19:54:39 UTC (rev 1290) +++ ydb/trunk/src/tpcc/tpcc.cc 2009-03-13 19:26:08 UTC (rev 1291) @@ -1,72 +0,0 @@ -#define __STDC_FORMAT_MACROS -#include <climits> -#include <inttypes.h> - -#include "clock.h" -#include "randomgenerator.h" -#include "tpccclient.h" -#include "tpccgenerator.h" -#include "tpcctables.h" - - -int main(int argc, const char* argv[]) { - if (argc != 2) { - fprintf(stderr, "tpcc [num warehouses]\n"); - exit(1); - } - - long num_warehouses = strtol(argv[1], NULL, 10); - if (num_warehouses == LONG_MIN || num_warehouses == LONG_MAX) { - fprintf(stderr, "Bad warehouse number (%s)\n", argv[1]); - exit(1); - } - if (num_warehouses <= 0) { - fprintf(stderr, "Number of warehouses must be > 0 (was %ld)\n", num_warehouses); - exit(1); - } - if (num_warehouses > Warehouse::MAX_WAREHOUSE_ID) { - fprintf(stderr, "Number of warehouses must be <= %d (was %ld)\n", Warehouse::MAX_WAREHOUSE_ID, num_warehouses); - exit(1); - } - - TPCCTables* tables = new TPCCTables(); - SystemClock* clock = new SystemClock(); - - // Create a generator for filling the database. - RealRandomGenerator* random = new RealRandomGenerator(); - NURandC cLoad = NURandC::makeRandom(random); - random->setC(cLoad); - - // Generate the data - printf("Loading %ld warehouses... ", num_warehouses); - fflush(stdout); - char now[Clock::DATETIME_SIZE+1]; - clock->getDateTimestamp(now); - TPCCGenerator generator(random, now, Item::NUM_ITEMS, District::NUM_PER_WAREHOUSE, - Customer::NUM_PER_DISTRICT, NewOrder::INITIAL_NUM_PER_DISTRICT); - int64_t begin = clock->getMicroseconds(); - generator.makeItemsTable(tables); - for (int i = 0; i < num_warehouses; ++i) { - generator.makeWarehouse(tables, i+1); - } - int64_t end = clock->getMicroseconds(); - printf("%"PRId64" ms\n", (end-begin)/1000); - - // Change the constants for run - random = new RealRandomGenerator(); - random->setC(NURandC::makeRandomForRun(random, cLoad)); - - // Client owns all the parameters - TPCCClient client(clock, random, tables, Item::NUM_ITEMS, static_cast<int>(num_warehouses), - District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); - printf("Running... "); - fflush(stdout); - begin = clock->getMicroseconds(); - for (int i = 0; i < 200000; ++i) { - client.doOne(); - } - end = clock->getMicroseconds(); - printf("%"PRId64" ms\n", (end-begin)/1000); - - return 0; -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:54:53
|
Revision: 1290 http://assorted.svn.sourceforge.net/assorted/?rev=1290&view=rev Author: yangzhang Date: 2009-03-12 19:54:39 +0000 (Thu, 12 Mar 2009) Log Message: ----------- using ASSERT instead of assert; using snap_map instead of fast_map Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/clock.cc ydb/trunk/src/tpcc/randomgenerator.cc ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/tpcc/tpcctables.cc Removed Paths: ------------- ydb/trunk/src/tpcc/assert.h Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/main.lzz.clamp 2009-03-12 19:54:39 UTC (rev 1290) @@ -8,10 +8,11 @@ #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> #include <boost/tuple/tuple.hpp> -#include <commons/fast_map.h> +#include <commons/assert.h> #include <commons/memory.h> #include <commons/nullptr.h> #include <commons/rand.h> +#include <commons/snap_map.h> #include <commons/st/st.h> #include <commons/time.h> #include <commons/unique_ptr.h> @@ -40,7 +41,6 @@ #define ref boost::ref #define tuple boost::tuple #define make_tuple boost::make_tuple -#define unused __attribute__((unused)) using namespace boost; using namespace boost::archive; @@ -57,7 +57,7 @@ //#define map_t unordered_map //#define map_t map //#define map_t dense_hash_map -#define map_t fast_map +#define map_t snap_map typedef map_t<int, int> mii; typedef mii::value_type entry; @@ -71,7 +71,7 @@ map.set_empty_key(-1); map.set_deleted_key(-2); } -template<> void init_map(fast_map<int, int> &map) { +template<> void init_map(snap_map<int, int> &map) { map.set_empty_key(-1); map.set_deleted_key(-2); } @@ -758,6 +758,16 @@ } } +#if 0 +template<typename Types, typename RTypes> +void +process_txn_ext(mii &map, const typename Types::Txn &txn, int &seqno, + typename RTypes::Response *res, ext_map ext) +{ + response +} +#endif + /** * Process a transaction: update DB state (incl. seqno) and send response to * leader. @@ -947,7 +957,7 @@ ResponseBatch &resbatch = *presbatch; ser_t serbuf; char *first_start = reader.start(); - assert(first_start == rbuf.get()); + ASSERT(first_start == rbuf.get()); const size_t headerlen = sizeof(uint32_t) + sizeof(short) + sizeof(int); while (true) { uint32_t prefix = 0; @@ -1019,16 +1029,16 @@ // Swap the buffers. swap(tmp, reader.buf()); reader.reset_range(reader.buf().get() + headerlen, reader.buf().get() + headerlen + reader.unread()); - assert(tmp.get() <= first_start && first_start < tmp.end()); - assert(tmp.get() < start && start < tmp.end()); - assert(first_start < start); + ASSERT(tmp.get() <= first_start && first_start < tmp.end()); + ASSERT(tmp.get() < start && start < tmp.end()); + ASSERT(first_start < start); backlog.push(make_tuple(tmp, first_start, start)); first_start = reader.buf().get(); first_seqno = first_txn.seqno(); } // Fill up rest of the message - assert(reader.unread() + reader.rem() >= prefix + sizeof(uint32_t) - headerlen); + ASSERT(reader.unread() + reader.rem() >= prefix + sizeof(uint32_t) - headerlen); check0x(reader.accum(prefix + sizeof(uint32_t) - headerlen)); } else { // Regular transaction batch. @@ -1047,6 +1057,13 @@ const Txn &txn = t == 0 ? first_txn : batch.txn(t); Response *res = resbatch.add_res(); process_txn<Types, RTypes>(map, txn, seqno, res); +#if 0 + if (!sending_recovery) { + process_txn<Types, RTypes>(map, txn, seqno, res); + } else { + process_txn_ext(map, txn, seqno, res, ext); + } +#endif if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); } @@ -1132,12 +1149,12 @@ template<> recovery_t -make_recovery(const fast_map<int, int> &map, int mypos, int nnodes, int &seqno) +make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int &seqno) { const commons::array<entry> &src = map.get_table(); pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); size_t begin = range.first, end = range.second; - assert(end > begin); + ASSERT(end > begin); recovery_header hdr = { seqno, end - begin, src.size(), map.size() }; size_t bodylen = sizeof(entry) * hdr.count; cout << "generating recovery of " << hdr.size << " records in " @@ -1637,7 +1654,7 @@ long long before_recv = current_time_millis(); vector<st_thread_t> recovery_builders; - assert(seqno == -1); + ASSERT(seqno == -1); bool first = true; for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { @@ -1744,16 +1761,16 @@ while (!backlog.empty()) { chunk chunk = backlog.take(); sized_array<char> &buf = chunk.get<0>(); - assert(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - assert(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - assert(chunk.get<1>() < chunk.get<2>()); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); swap(buf, reader.buf()); reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { - unused char *start = reader.start(); - unused uint32_t prefix = reader.read<uint32_t>(); - assert(prefix < 10000); - assert(start + sizeof(uint32_t) + prefix <= reader.end()); + char *start = reader.start(); + uint32_t prefix = reader.read<uint32_t>(); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); @@ -1771,7 +1788,7 @@ << "; backlog.size = " << backlog.queue().size() << endl; } } - assert(start + sizeof(uint32_t) + prefix == reader.start()); + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } } g_caught_up = true; Deleted: ydb/trunk/src/tpcc/assert.h =================================================================== --- ydb/trunk/src/tpcc/assert.h 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/tpcc/assert.h 2009-03-12 19:54:39 UTC (rev 1290) @@ -1,16 +0,0 @@ -#ifndef ASSERT_H__ -#define ASSERT_H__ - -#include <cassert> - -// Wraps the standard assert macro to avoids "unused variable" warnings when compiled away. -// Inspired by: http://powerof2games.com/node/10 -// This is not the "default" because it does not conform to the requirements of the C standard, -// which requires that the NDEBUG version be ((void) 0). -#ifdef NDEBUG -#define ASSERT(x) do { (void)sizeof(x); } while(0) -#else -#define ASSERT(x) assert(x) -#endif - -#endif Modified: ydb/trunk/src/tpcc/clock.cc =================================================================== --- ydb/trunk/src/tpcc/clock.cc 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/tpcc/clock.cc 2009-03-12 19:54:39 UTC (rev 1290) @@ -5,7 +5,7 @@ #include <cstdio> #include <ctime> -#include "assert.h" +#include <commons/assert.h> // Fills output with the base-10 ASCII representation of value, using digits digits. static char* makeInt(char* output, int value, int digits) { Modified: ydb/trunk/src/tpcc/randomgenerator.cc =================================================================== --- ydb/trunk/src/tpcc/randomgenerator.cc 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/tpcc/randomgenerator.cc 2009-03-12 19:54:39 UTC (rev 1290) @@ -6,7 +6,7 @@ #include <cstring> #include <ctime> -#include "assert.h" +#include <commons/assert.h> NURandC NURandC::makeRandom(RandomGenerator* generator) { NURandC c; Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-12 19:54:39 UTC (rev 1290) @@ -3,7 +3,7 @@ #include <cstdio> #include <vector> -#include "assert.h" +#include <commons/assert.h> #include "clock.h" #include "randomgenerator.h" #include "tpccdb.h" Modified: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-12 19:54:31 UTC (rev 1289) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-12 19:54:39 UTC (rev 1290) @@ -4,7 +4,7 @@ #include <limits> #include <vector> -#include "assert.h" +#include <commons/assert.h> #include "stlutil.h" using std::vector; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:54:40
|
Revision: 1289 http://assorted.svn.sourceforge.net/assorted/?rev=1289&view=rev Author: yangzhang Date: 2009-03-12 19:54:31 +0000 (Thu, 12 Mar 2009) Log Message: ----------- using ASSERT instead of assert Modified Paths: -------------- cpp-commons/trunk/src/commons/fast_map.h cpp-commons/trunk/src/commons/rand.h cpp-commons/trunk/src/commons/st/st.h cpp-commons/trunk/src/commons/streamreader.h cpp-commons/trunk/src/commons/streamwriter.h Modified: cpp-commons/trunk/src/commons/fast_map.h =================================================================== --- cpp-commons/trunk/src/commons/fast_map.h 2009-03-12 19:53:53 UTC (rev 1288) +++ cpp-commons/trunk/src/commons/fast_map.h 2009-03-12 19:54:31 UTC (rev 1289) @@ -2,8 +2,8 @@ #define COMMONS_DENSE_HASH_MAP_H #include <boost/functional/hash.hpp> -#include <cassert> #include <commons/array.h> +#include <commons/assert.h> #include <commons/exceptions.h> #include <utility> @@ -167,7 +167,7 @@ for (size_t probe = 0; newtab[pos].first != empty_key; pos = (pos + ++probe) & mask) { - assert(probe < newtab.size()); + ASSERT(probe < newtab.size()); } newtab[pos] = table[i]; } @@ -177,7 +177,7 @@ void grow() { resize(table.size() << 1); } void shrink() { resize(table.size() >> 1); } - void assert_init() const { assert(has_empty_key && has_deleted_key); } + void assert_init() const { ASSERT(has_empty_key && has_deleted_key); } public: fast_map() : @@ -229,7 +229,7 @@ if (table[pos].first == empty_key) return end(); if (table[pos].first == k) return const_iterator(*this, &table[pos]); pos = (pos + ++probe) & mask; - assert(probe < table.size()); + ASSERT(probe < table.size()); } } @@ -240,13 +240,13 @@ if (table[pos].first == empty_key) return end(); if (table[pos].first == k) return iterator(*this, &table[pos]); pos = (pos + ++probe) & mask; - assert(probe < table.size()); + ASSERT(probe < table.size()); } #if 0 for (; table[pos].first != empty_key && table[pos].first != k; pos = (pos + ++probe) & mask) { - assert(probe < table.size()); + ASSERT(probe < table.size()); } if (table[pos].first == empty_key) return end(); else return &table[pos]; @@ -265,14 +265,14 @@ table[pos].first != empty_key && table[pos].first != k; pos = (pos + ++probe) & mask) { - assert(probe < table.size()); + ASSERT(probe < table.size()); } if (table[pos].first == deleted_key) { size_t first_deleted = pos; for (; table[pos].first != empty_key && table[pos].first != k; pos = (pos + ++probe) & mask) { - assert(probe < table.size()); + ASSERT(probe < table.size()); } if (table[pos].first == empty_key) { // Inserting new value_type. Grow table if necessary. Modified: cpp-commons/trunk/src/commons/rand.h =================================================================== --- cpp-commons/trunk/src/commons/rand.h 2009-03-12 19:53:53 UTC (rev 1288) +++ cpp-commons/trunk/src/commons/rand.h 2009-03-12 19:54:31 UTC (rev 1289) @@ -1,7 +1,6 @@ #ifndef COMMONS_RAND_H #define COMMONS_RAND_H -#include <cassert> #include <cstdlib> // random, RAND_MAX namespace commons Modified: cpp-commons/trunk/src/commons/st/st.h =================================================================== --- cpp-commons/trunk/src/commons/st/st.h 2009-03-12 19:53:53 UTC (rev 1288) +++ cpp-commons/trunk/src/commons/st/st.h 2009-03-12 19:54:31 UTC (rev 1289) @@ -6,6 +6,7 @@ #include <boost/function.hpp> #include <boost/shared_ptr.hpp> #include <commons/array.h> +#include <commons/assert.h> #include <commons/delegates.h> #include <commons/nullptr.h> #include <commons/streamreader.h> @@ -297,7 +298,7 @@ void reset() { // If b is true, then any threads that join are immediately // interrupted, so the set must be empty. - assert(!b || threads.empty()); + ASSERT(!b || threads.empty()); b = false; } operator bool() const { return b; } @@ -350,7 +351,7 @@ NONCOPYABLE(st_joining) public: st_joining(st_thread_t t) : t_(t) {} - ~st_joining() { st_join(t_); } + ~st_joining() { if (t_ != nullptr) st_join(t_); } private: st_thread_t t_; }; @@ -375,6 +376,11 @@ std::set<st_thread_t> ts; }; +#if 0 +/// XXX + int count = 0; + size_t glen = 0; + class st_read_fn { private: @@ -384,9 +390,28 @@ st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) : fd_(fd), to_(to) {} size_t operator()(char *buf, size_t len) { + size_t x = size_t(checknnegerr(st_read(fd_, buf, len, to_))); + glen += x; + if ((++count & 0xf) == 0xf) + cout << "count " << count << " len " << len << " read " << x << " glen " << glen << endl; + return x; + // return size_t(checknnegerr(st_read(fd_, buf, len, to_))); + } + }; +#else + class st_read_fn + { + private: + st_netfd_t fd_; + st_utime_t to_; + public: + st_read_fn(st_netfd_t fd, st_utime_t to = ST_UTIME_NO_TIMEOUT) + : fd_(fd), to_(to) {} + size_t operator()(char *buf, size_t len) { return size_t(checknnegerr(st_read(fd_, buf, len, to_))); } }; +#endif class st_read_fully_fn { Modified: cpp-commons/trunk/src/commons/streamreader.h =================================================================== --- cpp-commons/trunk/src/commons/streamreader.h 2009-03-12 19:53:53 UTC (rev 1288) +++ cpp-commons/trunk/src/commons/streamreader.h 2009-03-12 19:54:31 UTC (rev 1289) @@ -3,6 +3,7 @@ #include <boost/function.hpp> #include <commons/array.h> +#include <commons/assert.h> #include <cstring> namespace commons { @@ -31,7 +32,7 @@ res += reader_(buf + res, len - res); if (res == 0) throw eof_exception(); } - assert(res == len); + ASSERT(res == len); } }; @@ -205,7 +206,7 @@ return x; } - assert(req <= buf_.size()); + ASSERT(req <= buf_.size()); // Shift things down if necessary. if (req > static_cast<size_t>(buf_.end() - end_)) Modified: cpp-commons/trunk/src/commons/streamwriter.h =================================================================== --- cpp-commons/trunk/src/commons/streamwriter.h 2009-03-12 19:53:53 UTC (rev 1288) +++ cpp-commons/trunk/src/commons/streamwriter.h 2009-03-12 19:54:31 UTC (rev 1289) @@ -3,6 +3,7 @@ #include <boost/function.hpp> #include <commons/array.h> +#include <commons/assert.h> #include <cstring> #include <iostream> #include <iomanip> @@ -25,7 +26,7 @@ char *reserve(int n, char *p) { if (p + n > a_.end()) { // check that the reserved space will fit - assert(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); + ASSERT(size_t(p - mark_ + n + sizeof(uint32_t)) <= a_.size()); // get rid of what we have flush(); size_t diff = mark_ - (a_.get() + sizeof(uint32_t)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:54:06
|
Revision: 1288 http://assorted.svn.sourceforge.net/assorted/?rev=1288&view=rev Author: yangzhang Date: 2009-03-12 19:53:53 +0000 (Thu, 12 Mar 2009) Log Message: ----------- added snap_map Added Paths: ----------- cpp-commons/trunk/src/commons/snap_map.h Copied: cpp-commons/trunk/src/commons/snap_map.h (from rev 1284, cpp-commons/trunk/src/commons/fast_map.h) =================================================================== --- cpp-commons/trunk/src/commons/snap_map.h (rev 0) +++ cpp-commons/trunk/src/commons/snap_map.h 2009-03-12 19:53:53 UTC (rev 1288) @@ -0,0 +1,323 @@ +#ifndef COMMONS_DENSE_HASH_MAP_H +#define COMMONS_DENSE_HASH_MAP_H + +#include <boost/functional/hash.hpp> +#include <commons/array.h> +#include <commons/assert.h> +#include <commons/exceptions.h> +#include <utility> + +namespace commons { + + using namespace boost; + using namespace std; + + // TODO write unit tests + + template<typename Key, typename Data> class snap_map; + template<typename map_traits> class snap_map_iterator; + template<typename map_traits> class snap_map_const_iterator; + + template<typename Key, typename Data> + struct snap_map_traits + { + typedef snap_map<Key, Data> map_type; + typedef Key key_type; + typedef Data data_type; + typedef pair<key_type, data_type> value_type; + typedef snap_map_traits<Key, Data> traits; + + typedef value_type &reference; + typedef const value_type &const_reference; + typedef value_type *pointer; + typedef const value_type *const_pointer; + typedef snap_map_iterator<traits> iterator; + typedef snap_map_const_iterator<traits> const_iterator; + typedef size_t size_type; + typedef ptrdiff_t difference_type; + }; + + template<typename map_traits> + class snap_map_const_iterator + { + public: + typedef typename map_traits::map_type map_type; + typedef typename map_traits::iterator iterator; + typedef typename map_traits::const_iterator const_iterator; + typedef typename map_traits::value_type value_type; + typedef typename map_traits::reference reference; + typedef typename map_traits::const_reference const_reference; + typedef typename map_traits::pointer pointer; + typedef typename map_traits::const_pointer const_pointer; + typedef typename map_traits::size_type size_type; + typedef typename map_traits::difference_type difference_type; + typedef forward_iterator_tag iterator_category; + private: + const map_type &m; + const value_type *p; + const value_type *end; + public: + snap_map_const_iterator(const map_type &m, const value_type *p) + : m(m), p(p), end(m.get_table().end()) { increment(); } + void increment() { for (; p != end && m.empty_or_deleted(p->first); ++p) {} } + const_iterator &operator++() { + ++p; + increment(); + return *this; + } + const_iterator operator++(int) { + snap_map_const_iterator copy = *this; + ++*this; + return copy; + } + const_reference operator*() const { return *p; } + const_pointer operator->() const { return p; } + bool operator==(const const_iterator &it) const { return p == it.p; } + bool operator!=(const const_iterator &it) const { return p != it.p; } + }; + + template<typename map_traits> + class snap_map_iterator + { + public: + typedef typename map_traits::map_type map_type; + typedef typename map_traits::iterator iterator; + typedef typename map_traits::const_iterator const_iterator; + typedef typename map_traits::value_type value_type; + typedef typename map_traits::reference reference; + typedef typename map_traits::const_reference const_reference; + typedef typename map_traits::pointer pointer; + typedef typename map_traits::const_pointer const_pointer; + typedef typename map_traits::size_type size_type; + typedef typename map_traits::difference_type difference_type; + typedef forward_iterator_tag iterator_category; + private: + map_type &m; + value_type *p; + value_type *end; + public: + snap_map_iterator(map_type &m, value_type *p) + : m(m), p(p), end(m.get_table().end()) { increment(); } + void increment() { for (; p != end && m.empty_or_deleted(p->first); ++p) {} } + iterator &operator++() { + ++p; + increment(); + return *this; + } + iterator operator++(int) { + snap_map_iterator copy = *this; + ++*this; + return copy; + } + reference operator*() const { return *p; } + pointer operator->() const { return p; } + bool operator==(const iterator &it) const { return p == it.p; } + bool operator!=(const iterator &it) const { return p != it.p; } + }; + + // XXX snapshotting is complete hack. based on fast_map. + /** + * Quadratic internal probing with triangular numbers, a la + * google::dense_hash_map. Took the overall design from + * google::dense_hash_map, and also a few speed-up techniques from peeking at + * their source. + * + * This was originally written for ydb, where the map was the bottleneck in + * the application, but more importantly, the interface required the ability + * to serialize the map, and partially. The internals have to be exposed in + * order for fast serialization (i.e. memcpy) to be possible; also, the + * ability to serialize just a certain range of the snap_map was required. + */ + template <typename Key, typename Data> + class snap_map + { + public: + typedef snap_map_traits<Key, Data> traits; + typedef typename traits::map_type map_type; + typedef typename traits::iterator iterator; + typedef typename traits::const_iterator const_iterator; + typedef typename traits::key_type key_type; + typedef typename traits::data_type data_type; + typedef typename traits::value_type value_type; + typedef typename traits::reference reference; + typedef typename traits::const_reference const_reference; + typedef typename traits::pointer pointer; + typedef typename traits::const_pointer const_pointer; + typedef map<size_t, unique_ptr<array<value_type> > > shadowmap; + + private: + static const size_t initsize = 1 << 20, pagesize = 1 << 15, pagemask = pagesize - 1; + commons::array<value_type> table; + key_type empty_key, deleted_key; + size_t count; + boost::hash<key_type> hasher; + shadowmap shadows; + bool has_empty_key, has_deleted_key, snapshot; + + void resize(size_t size) { + ASSERT((size & (size - 1)) == 0); + commons::array<value_type> newtab(0); + char *newtabarr = new char[size * sizeof(value_type)]; + newtab.reset(reinterpret_cast<value_type*>(newtabarr), size); + for (size_t i = 0; i < size; ++i) + newtab[i].first = empty_key; + // Rehash old values over into new table. + size_t mask = newtab.size() - 1; + for (size_t i = 0; i < table.size(); ++i) { + if (table[i].first != empty_key && table[i].first != deleted_key) { + // Don't want to simply reuse the general lookup, since we know + // we're starting with a blank slate. + size_t pos = hasher(table[i].first) & mask; + for (size_t probe = 0; + newtab[pos].first != empty_key; + pos = (pos + ++probe) & mask) { + ASSERT(probe < newtab.size()); + } + newtab[pos] = table[i]; + } + } + swap(newtab, table); + } + + void grow() { resize(table.size() << 1); } + void shrink() { resize(table.size() >> 1); } + void assert_init() const { ASSERT(has_empty_key && has_deleted_key); } + size_t page(size_t pos) { return pagemask & (pos / sizeof(value_type)); } + + public: + snap_map() : + table(0), empty_key(0), deleted_key(0), count(0), has_empty_key(false), + has_deleted_key(false), snapshot(false) {} + void set_empty_key(key_type k) { + has_empty_key = true; + empty_key = k; + resize(initsize); + } + void set_deleted_key(key_type k) { + has_deleted_key = true; + deleted_key = k; + resize(initsize); + } + size_t size() const { return count; } + void set_size(size_t size) { count = size; } + void erase(iterator) { throw_not_implemented(); } + array<value_type> &get_table() { return table; } + const array<value_type> &get_table() const { return table; } + bool empty_or_deleted(key_type k) const { + return k == empty_key || k == deleted_key; + } + + iterator begin() { + assert_init(); + if (count == 0) return end(); + else return iterator(*this, table.begin()); + } + const_iterator begin() const { + assert_init(); + if (count == 0) return end(); + else return const_iterator(*this, table.begin()); + } + + iterator end() { + assert_init(); + return iterator(*this, table.end()); + } + const_iterator end() const { + assert_init(); + return const_iterator(*this, table.end()); + } + + const_iterator find(key_type k) const { + assert_init(); + size_t probe = 0, mask = table.size() - 1, pos = hasher(k) & mask; + while (true) { + if (table[pos].first == empty_key) return end(); + if (table[pos].first == k) return const_iterator(*this, &table[pos]); + pos = (pos + ++probe) & mask; + ASSERT(probe < table.size()); + } + } + + value_type &get(size_t pos) { + typename shadowmap::iterator it = shadows.find(page(pos)); + value_type &v = it == shadows.end() ? table[pos] : (*(it->second))[pos]; + return v; + } + + iterator find(key_type k) { + assert_init(); + size_t probe = 0, mask = table.size() - 1, pos = hasher(k) & mask; + if (snapshot) { + while (true) { + value_type &v = get(pos); + if (v.first == empty_key) return end(); + if (v.first == k) return iterator(*this, &v); + pos = (pos + ++probe) & mask; + ASSERT(probe < table.size()); + } + } else { + while (true) { + if (table[pos].first == empty_key) return end(); + if (table[pos].first == k) return iterator(*this, &table[pos]); + pos = (pos + ++probe) & mask; + ASSERT(probe < table.size()); + } + } +#if 0 + for (; + table[pos].first != empty_key && table[pos].first != k; + pos = (pos + ++probe) & mask) { + ASSERT(probe < table.size()); + } + if (table[pos].first == empty_key) return end(); + else return &table[pos]; +#endif + } + + data_type &operator[](key_type k) { + assert_init(); + size_t probe = 0, mask = table.size() - 1, pos = hasher(k) & mask; + // Lookup. Skip over deleted entries (but remembering the first one + // seen as an open slot if we later decide to insert). If we find an + // empty spot, try returning the earlier deleted spot first. If we + // find the key, return that. + for (; + table[pos].first != deleted_key && + table[pos].first != empty_key && + table[pos].first != k; + pos = (pos + ++probe) & mask) { + ASSERT(probe < table.size()); + } + if (table[pos].first == deleted_key) { + size_t first_deleted = pos; + for (; + table[pos].first != empty_key && table[pos].first != k; + pos = (pos + ++probe) & mask) { + ASSERT(probe < table.size()); + } + if (table[pos].first == empty_key) { + // Inserting new value_type. Grow table if necessary. + if (++count > table.size() * 3 / 4) { + --count; + grow(); + return (*this)[k]; + } + pos = first_deleted; + table[pos].first = k; + } + } else if (table[pos].first == empty_key) { + // Inserting new value_type. Grow table if necessary. + if (++count > table.size() * 3 / 4) { + --count; + grow(); + return (*this)[k]; + } + table[pos].first = k; + } + return table[pos].second; + } + }; + +} + +#endif Property changes on: cpp-commons/trunk/src/commons/snap_map.h ___________________________________________________________________ Added: svn:mergeinfo + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:43:17
|
Revision: 1287 http://assorted.svn.sourceforge.net/assorted/?rev=1287&view=rev Author: yangzhang Date: 2009-03-12 19:43:12 +0000 (Thu, 12 Mar 2009) Log Message: ----------- added assert.h Added Paths: ----------- cpp-commons/trunk/src/commons/assert.h Added: cpp-commons/trunk/src/commons/assert.h =================================================================== --- cpp-commons/trunk/src/commons/assert.h (rev 0) +++ cpp-commons/trunk/src/commons/assert.h 2009-03-12 19:43:12 UTC (rev 1287) @@ -0,0 +1,16 @@ +#ifndef COMMONS_ASSERT_H +#define COMMONS_ASSERT_H + +#include <cassert> + +// Wraps the standard assert macro to avoids "unused variable" warnings when compiled away. +// Inspired by: http://powerof2games.com/node/10 +// This is not the "default" because it does not conform to the requirements of the C standard, +// which requires that the NDEBUG version be ((void) 0). +#ifdef NDEBUG +#define ASSERT(x) do { (void)sizeof(x); } while(0) +#else +#define ASSERT(x) assert(x) +#endif + +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:35:53
|
Revision: 1286 http://assorted.svn.sourceforge.net/assorted/?rev=1286&view=rev Author: yangzhang Date: 2009-03-12 19:35:43 +0000 (Thu, 12 Mar 2009) Log Message: ----------- added notes from second meeting with sam Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-12 19:34:50 UTC (rev 1285) +++ ydb/trunk/README 2009-03-12 19:35:43 UTC (rev 1286) @@ -643,6 +643,49 @@ - implement the chunking/snapshotting - try to get tpcc working +Thu Mar 12 14:13:18 EDT 2009 +meeting with sam + +- experiments + - variables: + - recovery methods: + - txn log + - phy log + - state xfer (+ variants) + - hybrid (choosing between disk/net) + - workloads: + - TPC-C + - simple + - txn mix: + - % read-only txns + - crash time: + - 0 + - ? + - # replicas + - runtime performance: baseline throughput vs. mechanism, baseline throughput vs. db size + - recovery experiments: + - crash time (MB accum or sec passed) + - insertion rate (%) + - DB size (MB) + - xact mix (% r/o) + - system tput during various phases + +- implement incremental snapshotting of the hash table + +- tpcc + - typical row counts and sizes + - warehouse 1 89 + - district 10 95 + - customer 30k 655 + - history 30k 46 + - order 30k 24 + - new-order 9k 8 + - order-line 300k 54 + - stock 100k 306 + - item 100k 82 + +- TODO try integrating TPC-C + - TODO faster disk logging using separate threads - TODO show aries-write This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 19:35:07
|
Revision: 1285 http://assorted.svn.sourceforge.net/assorted/?rev=1285&view=rev Author: yangzhang Date: 2009-03-12 19:34:50 +0000 (Thu, 12 Mar 2009) Log Message: ----------- added tpcc Added Paths: ----------- ydb/trunk/src/tpcc/ ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/assert.h ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/clock.cc ydb/trunk/src/tpcc/clock.h ydb/trunk/src/tpcc/randomgenerator.cc ydb/trunk/src/tpcc/randomgenerator.h ydb/trunk/src/tpcc/stlutil.h ydb/trunk/src/tpcc/tpcc.cc ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/tpcc/tpccclient.h ydb/trunk/src/tpcc/tpccdb.cc ydb/trunk/src/tpcc/tpccdb.h ydb/trunk/src/tpcc/tpccgenerator.cc ydb/trunk/src/tpcc/tpccgenerator.h ydb/trunk/src/tpcc/tpcctables.cc ydb/trunk/src/tpcc/tpcctables.h Added: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile (rev 0) +++ ydb/trunk/src/tpcc/Makefile 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,25 @@ +WARNINGS = -Werror -Wall -Wextra -Wconversion -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings -Woverloaded-virtual -Wno-sign-compare -Wno-unused-parameter + +# Debug flags +CXXFLAGS = -g -MD $(WARNINGS) -I.. +# Optimization flags +#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) + +# Link withthe C++ standard library +LDFLAGS=-lstdc++ + +BINARIES = tpcc # btree_test randomgenerator_test tpccclient_test tpcctables_test tpccgenerator_test tpcc + +all: $(BINARIES) + +#btree_test: btree_test.o stupidunit.o +#randomgenerator_test: randomgenerator_test.o randomgenerator.o stupidunit.o +#tpccclient_test: tpccclient_test.o tpccclient.o randomgenerator.o stupidunit.o +#tpcctables_test: tpcctables_test.o tpcctables.o tpccdb.o randomgenerator.o stupidunit.o +#tpccgenerator_test: tpccgenerator_test.o tpccgenerator.o tpcctables.o tpccdb.o randomgenerator.o stupidunit.o +tpcc: tpcc.o tpccclient.o tpccgenerator.o tpcctables.o tpccdb.o clock.o randomgenerator.o + +clean : + rm -f *.o *.d $(BINARIES) + +-include *.d Added: ydb/trunk/src/tpcc/assert.h =================================================================== --- ydb/trunk/src/tpcc/assert.h (rev 0) +++ ydb/trunk/src/tpcc/assert.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,16 @@ +#ifndef ASSERT_H__ +#define ASSERT_H__ + +#include <cassert> + +// Wraps the standard assert macro to avoids "unused variable" warnings when compiled away. +// Inspired by: http://powerof2games.com/node/10 +// This is not the "default" because it does not conform to the requirements of the C standard, +// which requires that the NDEBUG version be ((void) 0). +#ifdef NDEBUG +#define ASSERT(x) do { (void)sizeof(x); } while(0) +#else +#define ASSERT(x) assert(x) +#endif + +#endif Added: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h (rev 0) +++ ydb/trunk/src/tpcc/btree.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,511 @@ +#if !defined BPLUSTREE_HPP_227824 +#define BPLUSTREE_HPP_227824 + +// This is required for glibc to define std::posix_memalign +#if !defined(_XOPEN_SOURCE) || (_XOPEN_SOURCE < 600) +#define _XOPEN_SOURCE 600 +#endif + +#include <assert.h> +#include <stdlib.h> +#include <string.h> + +#include <boost/static_assert.hpp> +#include <boost/pool/object_pool.hpp> + +// DEBUG +#include <iostream> +using std::cout; +using std::endl; + +#ifdef __linux__ +#define HAVE_POSIX_MEMALIGN +#endif + +#ifdef HAVE_POSIX_MEMALIGN +// Nothing to do +#else +// TODO: This is not aligned! It doesn't matter for this implementation, but it could +static inline int posix_memalign(void** result, int, size_t bytes) { + *result = malloc(bytes); + return *result == NULL; +} +#endif + +template <typename KEY, typename VALUE, unsigned N, unsigned M, + unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, + unsigned NODE_ALIGNMENT= 64> +class BPlusTree +{ +public: + // N must be greater than two to make the split of + // two inner nodes sensible. + BOOST_STATIC_ASSERT(N>2); + // Leaf nodes must be able to hold at least one element + BOOST_STATIC_ASSERT(M>0); + + // Builds a new empty tree. + BPlusTree() + : depth(0), + root(new_leaf_node()) + { + // DEBUG + // cout << "sizeof(LeafNode)==" << sizeof(LeafNode) << endl; + // cout << "sizeof(InnerNode)==" << sizeof(InnerNode) << endl; + } + + ~BPlusTree() { + // Empty. Memory deallocation is done automatically + // when innerPool and leafPool are destroyed. + } + + // Inserts a pair (key, value). If there is a previous pair with + // the same key, the old value is overwritten with the new one. + void insert(KEY key, VALUE value) { + // GCC warns that this may be used uninitialized, even though that is untrue. + InsertionResult result = { KEY(), 0, 0 }; + bool was_split; + if( depth == 0 ) { + // The root is a leaf node + assert( *reinterpret_cast<NodeType*>(root) == + NODE_LEAF); + was_split= leaf_insert(reinterpret_cast<LeafNode*> + (root), key, value, &result); + } else { + // The root is an inner node + assert( *reinterpret_cast<NodeType*> + (root) == NODE_INNER ); + was_split= inner_insert(reinterpret_cast<InnerNode*> + (root), depth, key, value, &result); + } + if( was_split ) { + // The old root was splitted in two parts. + // We have to create a new root pointing to them + depth++; + root= new_inner_node(); + InnerNode* rootProxy= + reinterpret_cast<InnerNode*>(root); + rootProxy->num_keys= 1; + rootProxy->keys[0]= result.key; + rootProxy->children[0]= result.left; + rootProxy->children[1]= result.right; + } + } + +// Looks for the given key. If it is not found, it returns false, +// if it is found, it returns true and copies the associated value +// unless the pointer is null. +bool find(const KEY& key, VALUE* value= 0) const { + const InnerNode* inner; + register const void* node= root; + register unsigned d= depth, index; + while( d-- != 0 ) { + inner= reinterpret_cast<const InnerNode*>(node); + assert( inner->type == NODE_INNER ); + index= inner_position_for(key, inner->keys, inner->num_keys); + node= inner->children[index]; + } + const LeafNode* leaf= reinterpret_cast<const LeafNode*>(node); + assert( leaf->type == NODE_LEAF ); + index= leaf_position_for(key, leaf->keys, leaf->num_keys); + if( leaf->keys[index] == key ) { + if( value != 0 ) { + *value= leaf->values[index]; + } + if (leaf->values[index]) + return true; + else return false; + } else { + return false; + } +} + + +// Looks for the given key. If it is not found, it returns false, +// if it is found, it returns true and sets +// the associated value to NULL +// Note: del currently leaks memory. Fix later. +bool del(const KEY& key) { + InnerNode* inner; + register void* node= root; + register unsigned d= depth, index; + while( d-- != 0 ) { + inner= reinterpret_cast<InnerNode*>(node); + assert( inner->type == NODE_INNER ); + index= inner_position_for(key, inner->keys, inner->num_keys); + node= inner->children[index]; + } + LeafNode* leaf= reinterpret_cast<LeafNode*>(node); + assert( leaf->type == NODE_LEAF ); + index= leaf_position_for(key, leaf->keys, leaf->num_keys); + if( leaf->keys[index] == key ) { + leaf->values[index] = 0; + return true; + } else { + return false; + } +} + +// Finds the LAST item that is < key. That is, the next item in the tree is not < key, but this +// item is. If we were to insert key into the tree, it would go after this item. This is weird, +// but is easier than implementing iterators. In STL terms, this would be "lower_bound(key)--" +// WARNING: This does *not* work when values are deleted. Thankfully, TPC-C does not use deletes. +bool findLastLessThan(const KEY& key, VALUE* value = 0, KEY* out_key = 0) const { + const void* node = root; + unsigned int d = depth; + while( d-- != 0 ) { + const InnerNode* inner = reinterpret_cast<const InnerNode*>(node); + assert( inner->type == NODE_INNER ); + unsigned int pos = inner_position_for(key, inner->keys, inner->num_keys); + // We need to rewind in the case where they are equal + if (pos > 0 && key == inner->keys[pos-1]) { + pos -= 1; + } + assert(pos == 0 || inner->keys[pos-1] < key); + node = inner->children[pos]; + } + const LeafNode* leaf= reinterpret_cast<const LeafNode*>(node); + assert( leaf->type == NODE_LEAF ); + unsigned int pos = leaf_position_for(key, leaf->keys, leaf->num_keys); + if (pos <= leaf->num_keys) { + pos -= 1; + if (pos < leaf->num_keys && key == leaf->keys[pos]) { + pos -= 1; + } + + if (pos < leaf->num_keys) { + assert(leaf->keys[pos] < key); + if (leaf->values[pos]) { + if (value != NULL) { + *value = leaf->values[pos]; + } + if (out_key != NULL) { + *out_key = leaf->keys[pos]; + } + return true; + } + } + } + + return false; +} + + // Returns the size of an inner node + // It is useful when optimizing performance with cache alignment. + unsigned sizeof_inner_node() const { + return sizeof(InnerNode); + } + + // Returns the size of a leaf node. + // It is useful when optimizing performance with cache alignment. + unsigned sizeof_leaf_node() const { + return sizeof(LeafNode); + } + + +private: + // Used when debugging + enum NodeType {NODE_INNER=0xDEADBEEF, NODE_LEAF=0xC0FFEE}; + + // Leaf nodes store pairs of keys and values. + struct LeafNode { +#ifndef NDEBUG + LeafNode() : type(NODE_LEAF), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + const NodeType type; +#else + LeafNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} +#endif + unsigned num_keys; + KEY keys[M]; + VALUE values[M]; + // unsigned char _pad[LEAF_NODE_PADDING]; + }; + + // Inner nodes store pointers to other nodes interleaved with keys. + struct InnerNode { +#ifndef NDEBUG + InnerNode() : type(NODE_INNER), num_keys(0) {memset(keys,0,sizeof(KEY)*M);} + const NodeType type; +#else + InnerNode() : num_keys(0) {memset(keys,0,sizeof(KEY)*M);} +#endif + unsigned num_keys; + KEY keys[N]; + void* children[N+1]; + // unsigned char _pad[INNER_NODE_PADDING]; + }; + + // Custom allocator that returns aligned blocks of memory + template <unsigned ALIGNMENT> + struct AlignedMemoryAllocator { + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + + static char* malloc(const size_type bytes) + { + void* result; + if( posix_memalign(&result, ALIGNMENT, bytes) != 0 ) { + result= 0; + } + // Alternative: result= std::malloc(bytes); + return reinterpret_cast<char*>(result); + } + static void free(char* const block) + { std::free(block); } + }; + + // Returns a pointer to a fresh leaf node. + LeafNode* new_leaf_node() { + LeafNode* result; + //result= new LeafNode(); + result= leafPool.construct(); + //cout << "New LeafNode at " << result << endl; + return result; + } + + // Frees a leaf node previously allocated with new_leaf_node() + void delete_leaf_node(LeafNode* node) { + assert( node->type == NODE_LEAF ); + //cout << "Deleting LeafNode at " << node << endl; + // Alternatively: delete node; + leafPool.destroy(node); + } + + // Returns a pointer to a fresh inner node. + InnerNode* new_inner_node() { + InnerNode* result; + // Alternatively: result= new InnerNode(); + result= innerPool.construct(); + //cout << "New InnerNode at " << result << endl; + return result; + } + + // Frees an inner node previously allocated with new_inner_node() + void delete_inner_node(InnerNode* node) { + assert( node->type == NODE_INNER ); + //cout << "Deleting InnerNode at " << node << endl; + // Alternatively: delete node; + innerPool.destroy(node); + } + + // Data type returned by the private insertion methods. + struct InsertionResult { + KEY key; + void* left; + void* right; + }; + + // Returns the position where 'key' should be inserted in a leaf node + // that has the given keys. + static unsigned leaf_position_for(const KEY& key, const KEY* keys, + unsigned num_keys) { + // Simple linear search. Faster for small values of N or M + unsigned k= 0; + while((k < num_keys) && (keys[k]<key)) { + ++k; + } + return k; + /* + // Binary search. It is faster when N or M is > 100, + // but the cost of the division renders it useless + // for smaller values of N or M. + XXX--- needs to be re-checked because the linear search + has changed since the last update to the following ---XXX + int left= -1, right= num_keys, middle; + while( right -left > 1 ) { + middle= (left+right)/2; + if( keys[middle] < key) { + left= middle; + } else { + right= middle; + } + } + //assert( right == k ); + return unsigned(right); + */ + } + + // Returns the position where 'key' should be inserted in an inner node + // that has the given keys. + static inline unsigned inner_position_for(const KEY& key, const KEY* keys, + unsigned num_keys) { + // Simple linear search. Faster for small values of N or M + unsigned k= 0; + while((k < num_keys) && ((keys[k]<key) || (keys[k]==key))) { + ++k; + } + return k; + // Binary search is faster when N or M is > 100, + // but the cost of the division renders it useless + // for smaller values of N or M. + } + + bool leaf_insert(LeafNode* node, KEY& key, + VALUE& value, InsertionResult* result) { + assert( node->type == NODE_LEAF ); + assert( node->num_keys <= M ); + bool was_split= false; + // Simple linear search + unsigned i= leaf_position_for(key, node->keys, node->num_keys); + if( node->num_keys == M ) { + // The node was full. We must split it + unsigned treshold= (M+1)/2; + LeafNode* new_sibling= new_leaf_node(); + new_sibling->num_keys= node->num_keys -treshold; + for(unsigned j=0; j < new_sibling->num_keys; ++j) { + new_sibling->keys[j]= node->keys[treshold+j]; + new_sibling->values[j]= + node->values[treshold+j]; + } + node->num_keys= treshold; + if( i < treshold ) { + // Inserted element goes to left sibling + leaf_insert_nonfull(node, key, value, i); + } else { + // Inserted element goes to right sibling + leaf_insert_nonfull(new_sibling, key, value, + i-treshold); + } + // Notify the parent about the split + was_split= true; + result->key= new_sibling->keys[0]; + result->left= node; + result->right= new_sibling; + } else { + // The node was not full + leaf_insert_nonfull(node, key, value, i); + } + return was_split; + } + + static void leaf_insert_nonfull(LeafNode* node, KEY& key, VALUE& value, + unsigned index) { + assert( node->type == NODE_LEAF ); + assert( node->num_keys < M ); + assert( index <= M ); + assert( index <= node->num_keys ); + if( (index < M) && + (node->keys[index] == key) ) { + // We are inserting a duplicate value. + // Simply overwrite the old one + node->values[index]= value; + } else { + // The key we are inserting is unique + for(unsigned i=node->num_keys; i > index; --i) { + node->keys[i]= node->keys[i-1]; + node->values[i]= node->values[i-1]; + } + node->num_keys++; + node->keys[index]= key; + node->values[index]= value; + } + } + + bool inner_insert(InnerNode* node, unsigned current_depth, KEY& key, + VALUE& value, InsertionResult* result) { + assert( node->type == NODE_INNER ); + assert( current_depth != 0 ); + // Early split if node is full. + // This is not the canonical algorithm for B+ trees, + // but it is simpler and does not break the definition. + bool was_split= false; + if( node->num_keys == N ) { + // Split + unsigned treshold= (N+1)/2; + InnerNode* new_sibling= new_inner_node(); + new_sibling->num_keys= node->num_keys -treshold; + for(unsigned i=0; i < new_sibling->num_keys; ++i) { + new_sibling->keys[i]= node->keys[treshold+i]; + new_sibling->children[i]= + node->children[treshold+i]; + } + new_sibling->children[new_sibling->num_keys]= + node->children[node->num_keys]; + node->num_keys= treshold-1; + // Set up the return variable + was_split= true; + result->key= node->keys[treshold-1]; + result->left= node; + result->right= new_sibling; + // Now insert in the appropriate sibling + if( key < result->key ) { + inner_insert_nonfull(node, current_depth, key, value); + } else { + inner_insert_nonfull(new_sibling, current_depth, key, + value); + } + } else { + // No split + inner_insert_nonfull(node, current_depth, key, value); + } + return was_split; + } + + void inner_insert_nonfull(InnerNode* node, unsigned current_depth, KEY& key, + VALUE& value) { + assert( node->type == NODE_INNER ); + assert( node->num_keys < N ); + assert( current_depth != 0 ); + // Simple linear search + unsigned index= inner_position_for(key, node->keys, + node->num_keys); + // GCC warns that this may be used uninitialized, even though that is untrue. + InsertionResult result = { KEY(), 0, 0 }; + bool was_split; + if( current_depth-1 == 0 ) { + // The children are leaf nodes + for(unsigned kk=0; kk < node->num_keys+1; ++kk) { + assert( *reinterpret_cast<NodeType*> + (node->children[kk]) == NODE_LEAF ); + } + was_split= leaf_insert(reinterpret_cast<LeafNode*> + (node->children[index]), key, value, &result); + } else { + // The children are inner nodes + for(unsigned kk=0; kk < node->num_keys+1; ++kk) { + assert( *reinterpret_cast<NodeType*> + (node->children[kk]) == NODE_INNER ); + } + InnerNode* child= reinterpret_cast<InnerNode*> + (node->children[index]); + was_split= inner_insert( child, current_depth-1, key, value, + &result); + } + if( was_split ) { + if( index == node->num_keys ) { + // Insertion at the rightmost key + node->keys[index]= result.key; + node->children[index]= result.left; + node->children[index+1]= result.right; + node->num_keys++; + } else { + // Insertion not at the rightmost key + node->children[node->num_keys+1]= + node->children[node->num_keys]; + for(unsigned i=node->num_keys; i!=index; --i) { + node->children[i]= node->children[i-1]; + node->keys[i]= node->keys[i-1]; + } + node->children[index]= result.left; + node->children[index+1]= result.right; + node->keys[index]= result.key; + node->num_keys++; + } + } // else the current node is not affected + } + + typedef AlignedMemoryAllocator<NODE_ALIGNMENT> AlignedAllocator; + + // Node memory allocators. IMPORTANT NOTE: they must be declared + // before the root to make sure that they are properly initialised + // before being used to allocate any node. + boost::object_pool<InnerNode, AlignedAllocator> innerPool; + boost::object_pool<LeafNode, AlignedAllocator> leafPool; + // Depth of the tree. A tree of depth 0 only has a leaf node. + unsigned depth; + // Pointer to the root node. It may be a leaf or an inner node, but + // it is never null. + void* root; +}; + +#endif // !defined BPLUSTREE_HPP_227824 Added: ydb/trunk/src/tpcc/clock.cc =================================================================== --- ydb/trunk/src/tpcc/clock.cc (rev 0) +++ ydb/trunk/src/tpcc/clock.cc 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,58 @@ +#include "clock.h" + +#include <sys/time.h> + +#include <cstdio> +#include <ctime> + +#include "assert.h" + +// Fills output with the base-10 ASCII representation of value, using digits digits. +static char* makeInt(char* output, int value, int digits) { + char* last = output + digits; + char* next = last; + for (int i = 0; i < digits; ++i) { + int digit = value % 10; + value = value / 10; + next -= 1; + *next = static_cast<char>('0' + digit); + } + assert(value == 0); + return last; +} + +void SystemClock::getDateTimestamp(char* now) { + // Get the system time. Convert it to local time + time_t seconds_since_epoch = time(NULL); + assert(seconds_since_epoch != -1); + + struct tm local_calendar; + struct tm* result = localtime_r(&seconds_since_epoch, &local_calendar); + ASSERT(result == &local_calendar); + + // Format the time + // strftime is slow: it ends up consulting timezone info + // snprintf is also slow, since it needs to parse the input string. This is significantly + // faster, saving ~10% of the run time. + //~ int bytes = snprintf(now, DATETIME_SIZE+1, "%04d%02d%02d%02d%02d%02d", + //~ local_calendar.tm_year+1900, local_calendar.tm_mon+1, local_calendar.tm_mday, + //~ local_calendar.tm_hour, local_calendar.tm_min, local_calendar.tm_sec); + //~ int bytes = strftime(now, DATETIME_SIZE+1, "%Y%m%d%H%M%S", &broken_down_local_time); + char* next = makeInt(now, local_calendar.tm_year+1900, 4); + next = makeInt(next, local_calendar.tm_mon+1, 2); + next = makeInt(next, local_calendar.tm_mday, 2); + next = makeInt(next, local_calendar.tm_hour, 2); + next = makeInt(next, local_calendar.tm_min, 2); + next = makeInt(next, local_calendar.tm_sec, 2); + *next = '\0'; + assert(next == now + DATETIME_SIZE); +} + +int64_t SystemClock::getMicroseconds() { + struct timeval time; + int error = gettimeofday(&time, NULL); + ASSERT(error == 0); + int64_t result = time.tv_sec * 1000000; + result += time.tv_usec; + return result; +} Added: ydb/trunk/src/tpcc/clock.h =================================================================== --- ydb/trunk/src/tpcc/clock.h (rev 0) +++ ydb/trunk/src/tpcc/clock.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,28 @@ +#ifndef CLOCK_H__ +#define CLOCK_H__ + +//~ #include <cstdint> +#include <stdint.h> + +// Interface to the real time system clock. +class Clock { +public: + virtual ~Clock() {} + + static const int DATETIME_SIZE = 14; + + // now must have at least DATETIME_SIZE+1 bytes. + virtual void getDateTimestamp(char* now) = 0; + + // Returns the number of microseconds since the epoch. + virtual int64_t getMicroseconds() = 0; +}; + +// Uses gettimeofday. +class SystemClock : public Clock { +public: + virtual void getDateTimestamp(char* now); + virtual int64_t getMicroseconds(); +}; + +#endif Added: ydb/trunk/src/tpcc/randomgenerator.cc =================================================================== --- ydb/trunk/src/tpcc/randomgenerator.cc (rev 0) +++ ydb/trunk/src/tpcc/randomgenerator.cc 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,178 @@ +#include "randomgenerator.h" + +#include <algorithm> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <ctime> + +#include "assert.h" + +NURandC NURandC::makeRandom(RandomGenerator* generator) { + NURandC c; + c.c_last_ = generator->number(0, 255); + c.c_id_ = generator->number(0, 1023); + c.ol_i_id_ = generator->number(0, 8191); + return c; +} + +// Returns true if the C-Run value is valid. See TPC-C 2.1.6.1 (page 20). +static bool validCRun(int cRun, int cLoad) { + int cDelta = abs(cRun - cLoad); + return 65 <= cDelta && cDelta <= 119 && cDelta != 96 && cDelta != 112; +} + +NURandC NURandC::makeRandomForRun(RandomGenerator* generator, const NURandC& c_load) { + NURandC c = makeRandom(generator); + + while (!validCRun(c.c_last_, c_load.c_last_)) { + c.c_last_ = generator->number(0, 255); + } + ASSERT(validCRun(c.c_last_, c_load.c_last_)); + + return c; +} + +int RandomGenerator::numberExcluding(int lower, int upper, int excluding) { + ASSERT(lower < upper); + ASSERT(lower <= excluding && excluding <= upper); + + // Generate 1 less number than the range + int num = number(lower, upper-1); + + // Adjust the numbers to remove excluding + if (num >= excluding) { + num += 1; + } + ASSERT(lower <= num && num <= upper && num != excluding); + return num; +} + +static void generateString(RandomGenerator* generator, char* s, int lower_length, int upper_length, + char base_character, int num_characters) { + int length = generator->number(lower_length, upper_length); + for (int i = 0; i < length; ++i) { + s[i] = static_cast<char>(base_character + generator->number(0, num_characters-1)); + } + s[length] = '\0'; +} + +void RandomGenerator::astring(char* s, int lower_length, int upper_length) { + generateString(this, s, lower_length, upper_length, 'a', 26); +} + +void RandomGenerator::nstring(char* s, int lower_length, int upper_length) { + generateString(this, s, lower_length, upper_length, '0', 10); +} + +void RandomGenerator::lastName(char* c_last, int max_cid) { + makeLastName(NURand(255, 0, std::min(999, max_cid-1)), c_last); +} + +float RandomGenerator::fixedPoint(int digits, float lower, float upper) { + int multiplier = 1; + for (int i = 0; i < digits; ++i) { + multiplier *= 10; + } + + int int_lower = static_cast<int>(lower * static_cast<double>(multiplier) + 0.5); + int int_upper = static_cast<int>(upper * static_cast<double>(multiplier) + 0.5); + return (float) number(int_lower, int_upper) / (float) multiplier; +} + +int RandomGenerator::NURand(int A, int x, int y) { + int C = 0; + switch(A) { + case 255: + C = c_values_.c_last_; + break; + case 1023: + C = c_values_.c_id_; + break; + case 8191: + C = c_values_.ol_i_id_; + break; + default: + fprintf(stderr, "Error: NURand: A = %d not supported\n", A); + exit(1); + } + return (((number(0, A) | number(x, y)) + C) % (y - x + 1)) + x; +} + +int* RandomGenerator::makePermutation(int lower, int upper) { + // initialize with consecutive values + int* array = new int[upper - lower + 1]; + for (int i = 0; i <= upper - lower; ++i) { + array[i] = lower + i; + } + + for (int i = 0; i < upper - lower; ++i) { + // choose a value to go into this position, including this position + int index = number(i, upper - lower); + int temp = array[i]; + array[i] = array[index]; + array[index] = temp; + } + + return array; +} + +// Defined by TPC-C 4.3.2.3. +void makeLastName(int num, char* name) { + static const char* const SYLLABLES[] = { + "BAR", "OUGHT", "ABLE", "PRI", "PRES", "ESE", "ANTI", "CALLY", "ATION", "EING", }; + static const int LENGTHS[] = { 3, 5, 4, 3, 4, 3, 4, 5, 5, 4, }; + + ASSERT(0 <= num && num <= 999); + int indicies[] = { num/100, (num/10)%10, num%10 }; + + int offset = 0; + for (int i = 0; i < sizeof(indicies)/sizeof(*indicies); ++i) { + ASSERT(strlen(SYLLABLES[indicies[i]]) == LENGTHS[indicies[i]]); + memcpy(name + offset, SYLLABLES[indicies[i]], LENGTHS[indicies[i]]); + offset += LENGTHS[indicies[i]]; + } + name[offset] = '\0'; +} + +RealRandomGenerator::RealRandomGenerator() { +#ifdef HAVE_RANDOM_R + // Set the random state to zeros. glibc will attempt to access the old state if not NULL. + memset(&state, 0, sizeof(state)); + int result = initstate_r(static_cast<unsigned int>(time(NULL)), state_array, + sizeof(state_array), &state); + ASSERT(result == 0); +#else + seed(time(NULL)); +#endif +} + +int RealRandomGenerator::number(int lower, int upper) { + int rand_int; +#ifdef HAVE_RANDOM_R + int error = random_r(&state, &rand_int); + ASSERT(error == 0); +#else + rand_int = nrand48(state); +#endif + ASSERT(0 <= rand_int && rand_int <= RAND_MAX); + + // Select a number in [0, range_size-1] + int range_size = upper - lower + 1; + rand_int %= range_size; + ASSERT(0 <= rand_int && rand_int < range_size); + + // Shift the range to [lower, upper] + rand_int += lower; + ASSERT(lower <= rand_int && rand_int <= upper); + return rand_int; +} + +void RealRandomGenerator::seed(unsigned int seed) { +#ifdef HAVE_RANDOM_R + int error = srandom_r(seed, &state); + ASSERT(error == 0); +#else + memcpy(state, &seed, std::min(sizeof(seed), sizeof(state))); +#endif +} Added: ydb/trunk/src/tpcc/randomgenerator.h =================================================================== --- ydb/trunk/src/tpcc/randomgenerator.h (rev 0) +++ ydb/trunk/src/tpcc/randomgenerator.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,102 @@ +#ifndef RANDOMGENERATOR_H__ +#define RANDOMGENERATOR_H__ + +#include <cstdlib> // for struct random_data + +#ifdef __linux__ +#define HAVE_RANDOM_R +#endif + +class RandomGenerator; + +// Constant C values for the NURand function. +struct NURandC { + NURandC() : c_last_(0), c_id_(0), ol_i_id_(0) {} + + int c_last_; + int c_id_; + int ol_i_id_; + + // Sets the fields randomly. + static NURandC makeRandom(RandomGenerator* generator); + + // Sets the fields randomly, in a fashion acceptable for a test run. c_load is the value of + // c_last_ that was used to generate the tables. See TPC-C 2.1.6.1. (page 20). + static NURandC makeRandomForRun(RandomGenerator* generator, const NURandC& c_load); +}; + +class RandomGenerator { +public: + RandomGenerator() : c_values_(NURandC()) {} + virtual ~RandomGenerator() {} + + // Return a random integer in the range [lower, upper]. The range is inclusive. + virtual int number(int lower, int upper) = 0; + + // Return a random integer in the range [lower, upper] excluding excluded. The range is + // inclusive. + int numberExcluding(int lower, int upper, int excluding); + + void astring(char* s, int lower_length, int upper_length); + void nstring(char* s, int lower_length, int upper_length); + + // Fill name with a random last name, generated according to TPC-C rules. Limits the customer + // id for the generated name to cid. + void lastName(char* name, int max_cid); + + float fixedPoint(int digits, float lower, float upper); + + // Non-uniform random number function from TPC-C 2.1.6. (page 20). + int NURand(int A, int x, int y); + + int* makePermutation(int lower, int upper); + + void setC(const NURandC& c) { + c_values_ = c; + } + +private: + NURandC c_values_; +}; + +// A mock RandomGenerator for unit testing. +class MockRandomGenerator : public RandomGenerator { +public: + MockRandomGenerator() : minimum_(true) {} + + virtual int number(int lower, int upper) { + if (minimum_) return lower; + else return upper; + } + + bool minimum_; +}; + +static const int MAX_LAST_NAME = 16; + +// Generate a last name as defined by TPC-C 4.3.2.3. name must be at least MAX_LAST_NAME+1 bytes. +void makeLastName(int num, char* name); + +// A real RandomGenerator that uses random_r. +class RealRandomGenerator : public RandomGenerator { +public: + // Seeds the generator with the current time. + RealRandomGenerator(); + + virtual int number(int lower, int upper); + + // Seed the generator with seed. + void seed(unsigned int seed); + +private: +#ifdef HAVE_RANDOM_R + // man random says optimal sizes are 8, 32, 64, 128, 256 bytes + static const int RANDOM_STATE_SIZE = 64; + char state_array[RANDOM_STATE_SIZE]; + struct random_data state; +#else + unsigned short state[3]; +#endif +}; + +#endif Added: ydb/trunk/src/tpcc/stlutil.h =================================================================== --- ydb/trunk/src/tpcc/stlutil.h (rev 0) +++ ydb/trunk/src/tpcc/stlutil.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,24 @@ +#ifndef STLUTIL_H__ +#define STLUTIL_H__ + +// Deletes all elements in STL container. +template <typename T> +static void STLDeleteElements(T* container) { + const typename T::iterator end = container->end(); + for (typename T::iterator i = container->begin(); i != end; ++i) { + delete *i; + } + container->clear(); +}; + +// Deletes all values (iterator->second) in STL container. +template <typename T> +static void STLDeleteValues(T* container) { + const typename T::iterator end = container->end(); + for (typename T::iterator i = container->begin(); i != end; ++i) { + delete i->second; + } + container->clear(); +}; + +#endif Added: ydb/trunk/src/tpcc/tpcc.cc =================================================================== --- ydb/trunk/src/tpcc/tpcc.cc (rev 0) +++ ydb/trunk/src/tpcc/tpcc.cc 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,72 @@ +#define __STDC_FORMAT_MACROS +#include <climits> +#include <inttypes.h> + +#include "clock.h" +#include "randomgenerator.h" +#include "tpccclient.h" +#include "tpccgenerator.h" +#include "tpcctables.h" + + +int main(int argc, const char* argv[]) { + if (argc != 2) { + fprintf(stderr, "tpcc [num warehouses]\n"); + exit(1); + } + + long num_warehouses = strtol(argv[1], NULL, 10); + if (num_warehouses == LONG_MIN || num_warehouses == LONG_MAX) { + fprintf(stderr, "Bad warehouse number (%s)\n", argv[1]); + exit(1); + } + if (num_warehouses <= 0) { + fprintf(stderr, "Number of warehouses must be > 0 (was %ld)\n", num_warehouses); + exit(1); + } + if (num_warehouses > Warehouse::MAX_WAREHOUSE_ID) { + fprintf(stderr, "Number of warehouses must be <= %d (was %ld)\n", Warehouse::MAX_WAREHOUSE_ID, num_warehouses); + exit(1); + } + + TPCCTables* tables = new TPCCTables(); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + printf("Loading %ld warehouses... ", num_warehouses); + fflush(stdout); + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, NewOrder::INITIAL_NUM_PER_DISTRICT); + int64_t begin = clock->getMicroseconds(); + generator.makeItemsTable(tables); + for (int i = 0; i < num_warehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + int64_t end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); + + // Change the constants for run + random = new RealRandomGenerator(); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + TPCCClient client(clock, random, tables, Item::NUM_ITEMS, static_cast<int>(num_warehouses), + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + printf("Running... "); + fflush(stdout); + begin = clock->getMicroseconds(); + for (int i = 0; i < 200000; ++i) { + client.doOne(); + } + end = clock->getMicroseconds(); + printf("%"PRId64" ms\n", (end-begin)/1000); + + return 0; +} Added: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc (rev 0) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,175 @@ +#include "tpccclient.h" + +#include <cstdio> +#include <vector> + +#include "assert.h" +#include "clock.h" +#include "randomgenerator.h" +#include "tpccdb.h" + +using std::vector; + +// Non-integral constants must be defined in a .cc file. Needed for Mac OS X. +// http://www.research.att.com/~bs/bs_faq2.html#in-class +const float TPCCClient::MIN_PAYMENT_AMOUNT; +const float TPCCClient::MAX_PAYMENT_AMOUNT; + +TPCCClient::TPCCClient(Clock* clock, RandomGenerator* generator, TPCCDB* db, int num_items, + int num_warehouses, int districts_per_warehouse, int customers_per_district) : + clock_(clock), + generator_(generator), + db_(db), + num_items_(num_items), + num_warehouses_(num_warehouses), + districts_per_warehouse_(districts_per_warehouse), + customers_per_district_(customers_per_district) { + ASSERT(clock_ != NULL); + ASSERT(generator_ != NULL); + ASSERT(db_ != NULL); + ASSERT(1 <= num_items_ && num_items_ <= Item::NUM_ITEMS); + ASSERT(1 <= num_warehouses_ && num_warehouses_ <= Warehouse::MAX_WAREHOUSE_ID); + ASSERT(1 <= districts_per_warehouse_ && + districts_per_warehouse_ <= District::NUM_PER_WAREHOUSE); + ASSERT(1 <= customers_per_district_ && customers_per_district_ <= Customer::NUM_PER_DISTRICT); +} + +TPCCClient::~TPCCClient() { + delete clock_; + delete generator_; + delete db_; +} + +void TPCCClient::doStockLevel() { + int32_t threshold = generator_->number(MIN_STOCK_LEVEL_THRESHOLD, MAX_STOCK_LEVEL_THRESHOLD); + int result = db_->stockLevel(generateWarehouse(), generateDistrict(), threshold); + ASSERT(result >= 0); +} + +void TPCCClient::doOrderStatus() { + OrderStatusOutput output; + int y = generator_->number(1, 100); + if (y <= 60) { + // 60%: order status by last name + char c_last[Customer::MAX_LAST+1]; + generator_->lastName(c_last, customers_per_district_); + db_->orderStatus(generateWarehouse(), generateDistrict(), c_last, &output); + } else { + // 40%: order status by id + ASSERT(y > 60); + db_->orderStatus(generateWarehouse(), generateDistrict(), generateCID(), &output); + } +} + +void TPCCClient::doDelivery() { + int carrier = generator_->number(Order::MIN_CARRIER_ID, Order::MAX_CARRIER_ID); + char now[Clock::DATETIME_SIZE+1]; + clock_->getDateTimestamp(now); + + vector<DeliveryOrderInfo> orders; + db_->delivery(generateWarehouse(), carrier, now, &orders); + if (orders.size() != District::NUM_PER_WAREHOUSE) { + printf("Only delivered from %zd districts\n", orders.size()); + } +} + +void TPCCClient::doPayment() { + PaymentOutput output; + int x = generator_->number(1, 100); + int y = generator_->number(1, 100); + + int32_t w_id = generateWarehouse(); + int32_t d_id = generateDistrict(); + + int32_t c_w_id; + int32_t c_d_id; + if (num_warehouses_ == 1 || x <= 85) { + // 85%: paying through own warehouse (or there is only 1 warehouse) + c_w_id = w_id; + c_d_id = d_id; + } else { + // 15%: paying through another warehouse: + // select in range [1, num_warehouses] excluding w_id + c_w_id = generator_->numberExcluding(1, num_warehouses_, w_id); + ASSERT(c_w_id != w_id); + c_d_id = generateDistrict(); + } + float h_amount = generator_->fixedPoint(2, MIN_PAYMENT_AMOUNT, MAX_PAYMENT_AMOUNT); + + char now[Clock::DATETIME_SIZE+1]; + clock_->getDateTimestamp(now); + if (y <= 60) { + // 60%: payment by last name + char c_last[Customer::MAX_LAST+1]; + generator_->lastName(c_last, customers_per_district_); + db_->payment(w_id, d_id, c_w_id, c_d_id, c_last, h_amount, now, &output); + } else { + // 40%: payment by id + ASSERT(y > 60); + db_->payment(w_id, d_id, c_w_id, c_d_id, generateCID(), h_amount, now, &output); + } +} + +void TPCCClient::doNewOrder() { + int32_t w_id = generateWarehouse(); + int ol_cnt = generator_->number(Order::MIN_OL_CNT, Order::MAX_OL_CNT); + + // 1% of transactions roll back + bool rollback = generator_->number(1, 100) == 1; + + vector<NewOrderItem> items(ol_cnt); + for (int i = 0; i < ol_cnt; ++i) { + if (rollback && i+1 == ol_cnt) { + items[i].i_id = Item::NUM_ITEMS + 1; + } else { + items[i].i_id = generateItemID(); + } + + bool remote = generator_->number(1, 100) == 1; + if (num_warehouses_ > 1 && remote) { + items[i].ol_supply_w_id = generator_->numberExcluding(1, num_warehouses_, w_id); + } else { + items[i].ol_supply_w_id = w_id; + } + items[i].ol_quantity = generator_->number(1, MAX_OL_QUANTITY); + } + + char now[Clock::DATETIME_SIZE+1]; + clock_->getDateTimestamp(now); + NewOrderOutput output; + db_->newOrder(w_id, generateDistrict(), generateCID(), items, now, &output); +} + +void TPCCClient::doOne() { + // This is not strictly accurate: The requirement is for certain *minimum* percentages to be + // maintained. This is close to the right thing, but not precisely correct. + // See TPC-C 5.2.4 (page 68). + int x = generator_->number(1, 100); + if (x <= 4) { // 4% + doStockLevel(); + } else if (x <= 8) { // 4% + doDelivery(); + } else if (x <= 12) { // 4% + doOrderStatus(); + } else if (x <= 12+43) { // 43% + doPayment(); + } else { // 45% + ASSERT(x > 100-45); + doNewOrder(); + } +} + +int32_t TPCCClient::generateWarehouse() { + return generator_->number(1, num_warehouses_); +} + +int32_t TPCCClient::generateDistrict() { + return generator_->number(1, districts_per_warehouse_); +} +int32_t TPCCClient::generateCID() { + return generator_->NURand(1023, 1, customers_per_district_); +} + +int32_t TPCCClient::generateItemID() { + return generator_->NURand(8191, 1, num_items_); +} Added: ydb/trunk/src/tpcc/tpccclient.h =================================================================== --- ydb/trunk/src/tpcc/tpccclient.h (rev 0) +++ ydb/trunk/src/tpcc/tpccclient.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,51 @@ +#ifndef TPCCCLIENT_H__ +#define TPCCCLIENT_H__ + +//~ #include <cstdint> +#include <stdint.h> + +class Clock; +class RandomGenerator; +class TPCCDB; + +// Generates transactions according to the TPC-C specification. This ignores the fact that +// terminals have a fixed w_id, d_id, and that requests should be made after a minimum keying time +// and a think time. +class TPCCClient { +public: + // Owns clock, generator and db. + TPCCClient(Clock* clock, RandomGenerator* generator, TPCCDB* db, int num_items, + int num_warehouses, int districts_per_warehouse, int customers_per_district); + ~TPCCClient(); + + void doStockLevel(); + void doOrderStatus(); + void doDelivery(); + void doPayment(); + void doNewOrder(); + + void doOne(); + + static const int32_t MIN_STOCK_LEVEL_THRESHOLD = 10; + static const int32_t MAX_STOCK_LEVEL_THRESHOLD = 20; + // TODO: Should these constants be part of tpccdb.h? + static const float MIN_PAYMENT_AMOUNT = 1.00; + static const float MAX_PAYMENT_AMOUNT = 5000.00; + static const int32_t MAX_OL_QUANTITY = 10; + +private: + int32_t generateWarehouse(); + int32_t generateDistrict(); + int32_t generateCID(); + int32_t generateItemID(); + + Clock* clock_; + RandomGenerator* generator_; + TPCCDB* db_; + int num_items_; + int num_warehouses_; + int districts_per_warehouse_; + int customers_per_district_; +}; + +#endif Added: ydb/trunk/src/tpcc/tpccdb.cc =================================================================== --- ydb/trunk/src/tpcc/tpccdb.cc (rev 0) +++ ydb/trunk/src/tpcc/tpccdb.cc 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,23 @@ +#include "tpccdb.h" + +// Non-integral constants must be defined in a .cc file. Needed for Mac OS X. +// http://www.research.att.com/~bs/bs_faq2.html#in-class +const float Item::MIN_PRICE; +const float Item::MAX_PRICE; +const float Warehouse::MIN_TAX; +const float Warehouse::MAX_TAX; +const float Warehouse::INITIAL_YTD; +const float District::MIN_TAX; +const float District::MAX_TAX; +const float District::INITIAL_YTD; // different from Warehouse +const float Customer::MIN_DISCOUNT; +const float Customer::MAX_DISCOUNT; +const float Customer::INITIAL_BALANCE; +const float Customer::INITIAL_CREDIT_LIM; +const float Customer::INITIAL_YTD_PAYMENT; +const char Customer::GOOD_CREDIT[] = "GC"; +const char Customer::BAD_CREDIT[] = "BC"; +const float OrderLine::MIN_AMOUNT; +const float OrderLine::MAX_AMOUNT; +const char NewOrderOutput::INVALID_ITEM_STATUS[] = "Item number is not valid"; +const float History::INITIAL_AMOUNT; Added: ydb/trunk/src/tpcc/tpccdb.h =================================================================== --- ydb/trunk/src/tpcc/tpccdb.h (rev 0) +++ ydb/trunk/src/tpcc/tpccdb.h 2009-03-12 19:34:50 UTC (rev 1285) @@ -0,0 +1,341 @@ +#ifndef TPCCDB_H__ +#define TPCCDB_H__ + +#include <stdint.h> +#include <vector> + +struct Item { + static const int MIN_IM = 1; + static const int MAX_IM = 10000; + static const float MIN_PRICE = 1.00; + static const float MAX_PRICE = 100.00; + static const int MIN_NAME = 14; + static const int MAX_NAME = 24; + static const int MIN_DATA = 26; + static const int MAX_DATA = 50; + static const int NUM_ITEMS = 100000; + + int32_t i_id; + int32_t i_im_id; + float i_price; + char i_name[MAX_NAME+1]; + char i_data[MAX_DATA+1]; +}; + +struct Warehouse { + static const float MIN_TAX = 0; + static const float MAX_TAX = 0.2000f; + static const float INITIAL_YTD = 300000.00f; + static const int MIN_NAME = 6; + static const int MAX_NAME = 10; + static const int MIN_STREET = 10; + static const int MAX_STREET = 20; + static const int MIN_CITY = 10; + static const int MAX_CITY = 20; + static const int STATE = 2; + static const int ZIP = 9; + // TPC-C 1.3.1 (page 11) requires 2*W. This permits testing up to 50 warehouses. This is an + // arbitrary limit created to pack ids into integers. + static const int MAX_WAREHOUSE_ID = 100; + + int32_t w_id; + float w_tax; + float w_ytd; + char w_name[MAX_NAME+1]; + char w_street_1[MAX_STREET+1]; + char w_street_2[MAX_STREET+1]; + char w_city[MAX_CITY+1]; + char w_state[STATE+1]; + char w_zip[ZIP+1]; +}; + +struct District { + static const float MIN_TAX = 0; + static const float MAX_TAX = 0.2000f; + static const float INITIAL_YTD = 30000.00; // different from Warehouse + static const int INITIAL_NEXT_O_ID = 3001; + static const int MIN_NAME = 6; + static const int MAX_NAME = 10; + static const int MIN_STREET = 10; + static const int MAX_STREET = 20; + static const int MIN_CITY = 10; + static const int MAX_CITY = 20; + static const int STATE = 2; + static const int ZIP = 9; + static const int NUM_PER_WAREHOUSE = 10; + + int32_t d_id; + int32_t d_w_id; + float d_tax; + float d_ytd; + int32_t d_next_o_id; + char d_name[MAX_NAME+1]; + char d_street_1[MAX_STREET+1]; + char d_street_2[MAX_STREET+1]; + char d_city[MAX_CITY+1]; + char d_state[STATE+1]; + char d_zip[ZIP+1]; +}; + +struct Stock { + static const int MIN_QUANTITY = 10; + static const int MAX_QUANTITY = 100; + static const int DIST = 24; + static const int MIN_DATA = 26; + static const int MAX_DATA = 50; + static const int NUM_STOCK_PER_WAREHOUSE = 100000; + + int32_t s_i_id; + int32_t s_w_id; + int32_t s_quantity; + int32_t s_ytd; + int32_t s_order_cnt; + int32_t s_remote_cnt; + char s_dist[District::NUM_PER_WAREHOUSE][DIST+1]; + char s_data[MAX_DATA+1]; +}; + +// YYYY-MM-DD HH:MM:SS This is supposed to be a date/time field from Jan 1st 1900 - +// Dec 31st 2100 with a resolution of 1 second. See TPC-C 1.3.1. +static const int DATETIME_SIZE = 14; + +struct Customer { + static const float INITIAL_CREDIT_LIM = 50000.00; + static const float MIN_DISCOUNT = 0.0000; + static const float MAX_DISCOUNT = 0.5000; + static const float INITIAL_BALANCE = -10.00; + static const float INITIAL_YTD_PAYMENT = 10.00; + static const int INITIAL_PAYMENT_CNT = 1; + static const int INITIAL_DELIVERY_CNT = 0; + static const int MIN_FIRST = 6; + static const int MAX_FIRST = 10; + static const int MIDDLE = 2; + static const int MAX_LAST = 16; + static const int MIN_STREET = 10; + static const int MAX_STREET = 20; + static const int MIN_CITY = 10; + static const int MAX_CITY = 20; + static const int STATE = 2; + static const int ZIP = 9; + static const int PHONE = 16; + static const int CREDIT = 2; + static const int MIN_DATA = 300; + static const int MAX_DATA = 500; + static const int NUM_PER_DISTRICT = 3000; + static const char GOOD_CREDIT[]; + static const char BAD_CREDIT[]; + + int32_t c_id; + int32_t c_d_id; + int32_t c_w_id; + float c_credit_lim; + float c_discount; + float c_balance; + float c_ytd_payment; + int32_t c_payment_cnt; + int32_t c_delivery_cnt; + char c_first[MAX_FIRST+1]; + char c_middle[MIDDLE+1]; + char c_last[MAX_LAST+1]; + char c_street_1[MAX_STREET+1]; + char c_street_2[MAX_STREET+1]; + char c_city[MAX_CITY+1]; + char c_state[STATE+1]; + char c_zip[ZIP+1]; + char c_phone[PHONE+1]; + char c_since[DATETIME_SIZE+1]; + char c_credit[CREDIT+1]; + char c_data[MAX_DATA+1]; +}; + +struct Order { + static const int MIN_CARRIER_ID = 1; + static const int MAX_CARRIER_ID = 10; + // HACK: This is not strictly correct, but it works + static const int NULL_CARRIER_ID = 0; + // Less than this value, carrier != null, >= -> carrier == null + static const int NULL_CARRIER_LOWER_BOUND = 2101; + static const int MIN_OL_CNT = 5; + static const int MAX_OL_CNT = 15; + static const int INITIAL_ALL_LOCAL = 1; + static const int INITIAL_ORDERS_PER_DISTRICT = 3000; + // See TPC-C 1.3.1 (page 15) + static const int MAX_ORDER_ID = 10000000; + + int32_t o_id; + int32_t o_c_id; + int32_t o_d_id; + int32_t o_w_id; + int32_t o_carrier_id; + int32_t o_ol_cnt; + int32_t o_all_local; + char o_entry_d[DATETIME_SIZE+1]; +}; + +struct OrderLine { + static const int MIN_I_ID = 1; + static const int MAX_I_ID = 100000; // Item::NUM_ITEMS + static const int INITIAL_QUANTITY = 5; + static const float MIN_AMOUNT = 0.01f; + static const float MAX_AMOUNT = 9999.99f; + + int32_t ol_o_id; + int32_t ol_d_id; + int32_t ol_w_id; + int32_t ol_number; + int32_t ol_i_id; + int32_t ol_supply_w_id; + int32_t ol_quantity; + float ol_amount; + char ol_delivery_d[DATETIME_SIZE+1]; + char ol_dist_info[Stock::DIST+1]; +}; + +struct NewOrder { + static const int INITIAL_NUM_PER_DISTRICT = 900; + + int32_t no_w_id; + int32_t no_d_id; + int32_t no_o_id; +}; + +struct History { + static const int MIN_DATA = 12; + static const int MAX_DATA = 24; + static const float INITIAL_AMOUNT = 10.00f; + + int32_t h_c_id; + int32_t h_c_d_id; + int32_t h_c_w_id; + int32_t h_d_id; + int32_t h_w_id; + float h_amount; + char h_date[DATETIME_SIZE+1]; + char h_data[MAX_DATA]; +}; + +// Data returned by the "order status" transaction. +struct OrderStatusOutput { + // From customer + int32_t c_id; // unclear if this needs to be returned + float c_balance; + + // From order + int32_t o_id; + int32_t o_carrier_id; + + struct OrderLineSubset { + int32_t ol_i_id; + int32_t ol_supply_w_id; + int32_t ol_quantity; + float ol_amount; + char ol_delivery_d[DATETIME_SIZE+1]; + }; + + std::vector<OrderLineSubset> lines; + + // From customer + char c_first[Customer::MAX_FIRST+1]; + char c_middle[Customer::MIDDLE+1]; + char c_last[Customer::MAX_LAST+1]; + + // From order + char o_entry_d[DATETIME_SIZE+1]; +}; + +struct NewOrderItem { + int32_t i_id; + int32_t ol_supply_w_id; + int32_t ol_quantity; +}; + +struct NewOrderOutput { + float w_tax; + float d_tax; + + // From district d_next_o_id + int32_t o_id; + + float c_discount; + + // TODO: Client can compute this from other values. + float total; + + struct ItemInfo { + static const char BRAND = 'B'; + static const char GENERIC = 'G'; + + int32_t s_quantity; + float i_price; + // TODO: Client can compute this from other values. + float ol_amount; + char brand_generic; + char i_name[Item::MAX_NAME+1]; + }; + + std::vector<ItemInfo> items; + char c_last[Customer::MAX_LAST+1]; + char c_credit[Customer::CREDIT+1]; + + static const int MAX_STATUS = 25; + static const char INVALID_ITEM_STATUS[]; + char status[MAX_STATUS+1]; +}; + +struct PaymentOutput { + // Return entire tuples since Payment requires most of the data. This returns more than + // necessary, but is easy. + Warehouse warehouse; + District district; + Customer customer; +}; + +struct DeliveryOrderInfo { + int32_t d_id; + int32_t o_id; +}; + +// Interface to the TPC-C transaction implementation. +class TPCCDB { +public: + virtual ~TPCCDB() {} + + // Executes the TPC-C "slev" transaction. From the last 20 orders, returns the number of rows in + // the STOCK table that have S_QUANTITY < threshold. See TPC-C 2.8 (page 43). + virtual int stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) = 0; + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + virtual void orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + OrderStatusOutput* output) = 0; + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + virtual void orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, + OrderStatusOutput* output) = 0; + + // Executes the TPC-C new order transaction. Enter the new order for customer_id into the + // database. See TPC-C 2.4 (page 27). Returns true if the transaction commits. + virtual bool newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + const std::vector<NewOrderItem>& items, const char* now, + NewOrderOutput* output) = 0; + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + virtual void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, + PaymentOutput* output) = 0; + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + virtual void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, const char* c_last, float h_amount, const char* now, + PaymentOutput* output) = 0; + + // Executes the TPC-C delivery transaction. Delivers the oldest undelivered transaction in each + // district in warehouse_id. See TPC-C 2.7 (page 39). + virtual void deli... [truncated message content] |
From: <yan...@us...> - 2009-03-12 17:24:01
|
Revision: 1284 http://assorted.svn.sourceforge.net/assorted/?rev=1284&view=rev Author: yangzhang Date: 2009-03-12 17:23:53 +0000 (Thu, 12 Mar 2009) Log Message: ----------- reformatting Modified Paths: -------------- cpp-commons/trunk/src/commons/fast_map.h Modified: cpp-commons/trunk/src/commons/fast_map.h =================================================================== --- cpp-commons/trunk/src/commons/fast_map.h 2009-03-12 02:13:29 UTC (rev 1283) +++ cpp-commons/trunk/src/commons/fast_map.h 2009-03-12 17:23:53 UTC (rev 1284) @@ -153,7 +153,8 @@ void resize(size_t size) { commons::array<value_type> newtab(0); - newtab.reset(reinterpret_cast<value_type*>(new char[size * sizeof(value_type)]), size); + char *newtabarr = new char[size * sizeof(value_type)]; + newtab.reset(reinterpret_cast<value_type*>(newtabarr), size); for (size_t i = 0; i < size; ++i) newtab[i].first = empty_key; // Rehash old values over into new table. @@ -197,7 +198,9 @@ void erase(iterator) { throw_not_implemented(); } array<value_type> &get_table() { return table; } const array<value_type> &get_table() const { return table; } - bool empty_or_deleted(key_type k) const { return k == empty_key || k == deleted_key; } + bool empty_or_deleted(key_type k) const { + return k == empty_key || k == deleted_key; + } iterator begin() { assert_init(); @@ -258,7 +261,9 @@ // empty spot, try returning the earlier deleted spot first. If we // find the key, return that. for (; - table[pos].first != deleted_key && table[pos].first != empty_key && table[pos].first != k; + table[pos].first != deleted_key && + table[pos].first != empty_key && + table[pos].first != k; pos = (pos + ++probe) & mask) { assert(probe < table.size()); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 02:13:52
|
Revision: 1283 http://assorted.svn.sourceforge.net/assorted/?rev=1283&view=rev Author: yangzhang Date: 2009-03-12 02:13:29 +0000 (Thu, 12 Mar 2009) Log Message: ----------- added a bunch of notes/todos Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-12 02:13:21 UTC (rev 1282) +++ ydb/trunk/README 2009-03-12 02:13:29 UTC (rev 1283) @@ -545,6 +545,106 @@ - decided to leave this as-is; getting an accurate number is too much effort - relabeled the messages to be "roughly" +Wed Mar 11 00:46:35 EDT 2009 + +- DONE experiment on the new set of machines, farm1-4 + - scaling + - 3: 401Ktps + - 2: 418Ktps + - 1: 425Ktps + - 0: 401Ktps + - twal: 360Ktps + - pwal: 264Ktps + - pwal on rep: 240Ktps + - recovery + - 1 rep + - before: 413Ktps + - during: 220-250Ktps + - serializing: 203ms + - recv: 1955ms + - catchup: 5133ms 300Ktps + - 2 reps + - before: 419Ktps + - during: 236Ktps + - serializing: 106ms + - recv: 1500ms + - catchup: 5023ms 311Ktps + - problem: noticed that there's a lot of long blocking during the + - blocking to replica 0: why? serialization only takes 200ms... + - blocking to replica 1: why? we should not be blocked on anything.... + +- current state of the system: + - replicas can do pwal + - solo can do twal; may also implement for replicas + - strange slowdown + - replicas can recover over network + - cannot recover from disk + +- DONE get to the bottom of the blocking issues + - turns out this really is due to the CPU load + - when copying, the CPU is distracted from process_txns by the recover_joiner + thread, which is repeatedly making write syscalls to send a large message + - i can add a backlog, but not sure what good that will do, really + - the problem is prioritizing the live txn processing over the sending, but + then nothing will ever be sent, because even in normal (non-recovery) + operation, the issuer is faster than the processor + - need to apply *some* back-pressure, and TCP's buffer limits are a natural + and easy way to achieve that + +- DONE implement recovery (replay) from disk + - 2 + - before: 419Ktps + - during: 191Ktps + - replay: 3260ms (19MB/s) + - catchup: 4908ms (319Ktps) + +Wed Mar 11 16:38:24 EDT 2009 + +- meeting with sam +- got the numbers + - fixed up/understood various issues with earlier implementation + - pwal, twal, on replicas + - new machines + - proper recovery/backlogging + - custom map storage structure + - the blocking problem + - got new numbers + - disk is slow; still some things i can do to speed up + - try multiprocessing? +- outline + - two approaches to paper: + - "here are some recovery methods" + - "network based recovery is the new method, better than (or at least as + good as) disk" + - intro + - main-memory distributed databases for oltp [h-store] + - replication for high availability and durability [harbor] + - recovery methods for main-memory database + - disk logging vs network replication + - network bandwidth > disk bandwidth + - flexibility: bringing back orig node + - new system is cpu bound + - snapshots + - mechanisms/design + - network recovery + - pwal recovery + - no redo (no steal) + - twal recovery + - hybrid recovery + - prelim experiments show: all pages become dirty, fast + - realistic workload? + - evaluation + - each of the above mechanisms + - related work + - aries + - harbor + - harp +- new goals + - implement the chunking/snapshotting + - try to get tpcc working + +- TODO faster disk logging using separate threads + - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 02:13:38
|
Revision: 1282 http://assorted.svn.sourceforge.net/assorted/?rev=1282&view=rev Author: yangzhang Date: 2009-03-12 02:13:21 +0000 (Thu, 12 Mar 2009) Log Message: ----------- - added pwal recovery (--rec-pwal) - added show_sockaddr for more useful debug outputs in timed writes - changed behavior of logging; by default does not clobber the log file, but writes instead to /dev/null - using timed writes in process_txns - added showdatarate - fixed recovery_start_seqno/recovery_end_seqno setting in response_handler Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-12 02:08:04 UTC (rev 1281) +++ ydb/trunk/src/main.lzz.clamp 2009-03-12 02:13:21 UTC (rev 1282) @@ -86,7 +86,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, + use_pb, use_pb_res, g_caught_up, rec_pwal, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -219,12 +219,32 @@ return t; } +char * +show_sockaddr(st_netfd_t fd) +{ + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd), + reinterpret_cast<sockaddr*>(&sa), + &salen)); + return inet_ntoa(sa.sin_addr); +} + +map<st_netfd_t, string> nfdnames; + +inline const string& +nfd2name(st_netfd_t fd) +{ + return nfdnames[fd]; +} + /** * Used by the leader to bookkeep information about replicas. */ class replica_info { public: + /** port is the replica's listen port, not the port bound to the fd socket. */ replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} st_netfd_t fd() const { return fd_; } /** The port on which the replica is listening. */ @@ -404,7 +424,8 @@ long long write_time = current_time_millis() - before_write; if (write_time > write_thresh) { cout << "thread " << threadname() << " write of " << len - << " bytes took " << write_time << " ms" << endl; + << " bytes to dst " << show_sockaddr(dst) << " blocked for " + << write_time << " ms" << endl; } } } @@ -536,13 +557,15 @@ check(msg.ParseFromArray(src.read(len), len)); } +enum { op_del, op_write, op_commit }; + /** * ARIES write-ahead log. No undo logging necessary (no steal). */ class wal { public: - wal() : of("wal"), out(of) {} + wal(const string &fname) : of(fname.c_str()), out(of) {} template <typename T> void log(const T &msg) { ser(of, msg); } void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } @@ -562,7 +585,6 @@ out & op; } private: - enum { op_del, op_write, op_commit }; ofstream of; binary_oarchive out; }; @@ -600,8 +622,7 @@ else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); + st_timed_write(dst, buf, len); }; char *real_wbuf = newreps.empty() ? rbuf.get() : wbuf.get(); @@ -622,7 +643,8 @@ // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). - if (!newreps.empty() && seqno > 0) { + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { start_txn(batch); fin_txn(batch); // TODO: verify that this made the catch-up stream more efficient, @@ -796,7 +818,21 @@ } void -showtput(const string &action, long long stop_time, long long start_time, +showdatarate(const char *action, streamoff len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showdatarate(const char *action, size_t len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showtput(const char *action, long long stop_time, long long start_time, int stop_count, int start_count) { long long time_diff = stop_time - start_time; @@ -918,7 +954,7 @@ char *start = reader.start(); // Will overflow on next few reads ("header")? - if (reader.unread() + reader.rem() < headerlen) { + if (!caught_up && reader.unread() + reader.rem() < headerlen) { sized_array<char> buf(new char[read_buf_size], read_buf_size); memcpy(buf.get(), reader.start(), reader.unread()); swap(buf, reader.buf()); @@ -1108,11 +1144,14 @@ << hdr.count << " slots (" << bodylen << " bytes); range is [" << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; + long long start_time = current_time_millis(); commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); raw_writer ser(recovery.begin()); ser.write(recovery.size()); ser.write(hdr); memcpy(ser.ptr(), src.begin() + begin, bodylen); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); return recovery; } @@ -1206,12 +1245,12 @@ cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; } if (!caught_up && rcaught_up) { - long long now = current_time_millis(), timediff = now - start_time; + long long now = current_time_millis(), time_diff = now - start_time; caught_up = true; recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " - << timediff << " ms" << endl; + << time_diff << " ms" << endl; // This will cause the program to exit eventually, but cleanly, such that // the recovery time will be set first, before the eventual exit (which // may not even happen in the current iteration). @@ -1244,13 +1283,13 @@ // joiner. if (recovery_start_time == -1 && !sub.empty()) { recovery_start_time = sub.take(); - recovery_start_seqno = seqno; + recovery_start_seqno = last_seqno; cout << rid << ": "; showtput("before recovery, finished", recovery_start_time, start_time, recovery_start_seqno, 0); } else if (recovery_end_time == -1 && !sub.empty()) { recovery_end_time = sub.take(); - recovery_end_seqno = seqno; + recovery_end_seqno = last_seqno; cout << rid << ": "; showtput("during recovery, finished roughly", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); @@ -1332,7 +1371,7 @@ #if 0 sendmsg(joiner, *recovery); #endif - cout << "sent recovery in " << diff << " ms" << endl; + showdatarate("sent recovery", recovery.size(), diff); } void @@ -1354,8 +1393,8 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; - scoped_ptr<wal> pwal(new wal); - g_wal = pwal.get(); + scoped_ptr<wal> twal(new wal(use_twal ? "twal" : "/dev/null")); + g_wal = twal.get(); // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -1539,8 +1578,11 @@ } } + // Initialize physical log. + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + // Process txns. - // XXX st_channel<shared_ptr<pb::Txn> > backlog; st_channel<chunk> backlog; const function<void()> process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), @@ -1554,103 +1596,140 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - assert(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + vector<st_thread_t> recovery_builders; + assert(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - } - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); - long long tm = current_time_millis(); - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - cout << "got recovery message of " << len << " bytes (" - << hdr.size << " records in " << hdr.count << " slots) in " - << tm - __ref(before_recv) << " ms; now at seqno " - << hdr.seqno << endl; + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } + long long mid_time = current_time_millis(); - int mid_seqno = seqno; // XXX using msg::TxnBatch; @@ -1678,6 +1757,7 @@ batch.Clear(); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); @@ -1812,6 +1892,8 @@ "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), "recover from multiple hosts, instead of just one (specified via leader)") + ("rec-pwal", po::bool_switch(&rec_pwal), + "recover from pwal") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") ("dump,D", po::bool_switch(&dump), @@ -2026,9 +2108,3 @@ return 1; } } - -/* - * Compile-time options: - * - * - map, unordered_map, dense_hash_map - */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-12 02:08:41
|
Revision: 1281 http://assorted.svn.sourceforge.net/assorted/?rev=1281&view=rev Author: yangzhang Date: 2009-03-12 02:08:04 +0000 (Thu, 12 Mar 2009) Log Message: ----------- tweaks; restore rec to normal Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-11 16:56:20 UTC (rev 1280) +++ ydb/trunk/tools/test.bash 2009-03-12 02:08:04 UTC (rev 1281) @@ -289,8 +289,8 @@ rec-helper() { local leader=$1 shift - : ${seqno:=1000000} - tagssh $leader "ydb/src/ydb -l -x --accept-joiner-seqno $seqno -n $(( $# - 1 )) -o 1 -O 1 ${extraargs:-}" & + : ${seqno:=1000000} ${extraargs:=} + tagssh $leader "ydb/src/ydb -l --exit-on-recovery --accept-joiner-seqno $seqno -n $(( $# - 1 )) $extraargs" & sleep .1 # Run initial replicas. while (( $# > 1 )) ; do @@ -299,7 +299,7 @@ done sleep .1 # Run joiner. - tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up ${extraargs:-}" & + tagssh $1 "ydb/src/ydb -H $leader --yield-build-up --yield-catch-up $extraargs" & if false ; then if [[ ${wait2:-} ]] then sleep $wait2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-11 16:56:33
|
Revision: 1280 http://assorted.svn.sourceforge.net/assorted/?rev=1280&view=rev Author: yangzhang Date: 2009-03-11 16:56:20 +0000 (Wed, 11 Mar 2009) Log Message: ----------- updated boost_serialization Modified Paths: -------------- sandbox/trunk/src/cc/boost_serialization.cc sandbox/trunk/src/cc/boost_serialization.mk Modified: sandbox/trunk/src/cc/boost_serialization.cc =================================================================== --- sandbox/trunk/src/cc/boost_serialization.cc 2009-03-10 23:32:14 UTC (rev 1279) +++ sandbox/trunk/src/cc/boost_serialization.cc 2009-03-11 16:56:20 UTC (rev 1280) @@ -69,6 +69,10 @@ binary_iarchive ia(sb); ia & x; + // This will throw an archive_exception. + // long long y; + // ia & y; + cout << x << endl; } Modified: sandbox/trunk/src/cc/boost_serialization.mk =================================================================== --- sandbox/trunk/src/cc/boost_serialization.mk 2009-03-10 23:32:14 UTC (rev 1279) +++ sandbox/trunk/src/cc/boost_serialization.mk 2009-03-11 16:56:20 UTC (rev 1280) @@ -4,5 +4,5 @@ g++ -O3 -Wall -pipe -o boost_serialization \ -I/opt/boost-head-2007.03.25/include \ -L/opt/boost-head-2007.03.25/lib \ - -lboost_serialization-gcc41-mt \ + -lboost_serialization-gcc43-mt \ boost_serialization.cc This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-10 23:32:32
|
Revision: 1279 http://assorted.svn.sourceforge.net/assorted/?rev=1279&view=rev Author: yangzhang Date: 2009-03-10 23:32:14 +0000 (Tue, 10 Mar 2009) Log Message: ----------- - fixed optimized build issues with unused variables - fixed bug: issue_txns not updating seqno correctly on final batch - renamed methods of and reinstated pwal - removed unnecessary sleep at end of process_txns - relabeled messages to make explicit their approximating nature - added clamp to setup-deps - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/tools/test.bash Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/README 2009-03-10 23:32:14 UTC (rev 1279) @@ -536,6 +536,15 @@ - TODO async (threaded) wal - TODO 0-node 0-copy (don't need to use threads, just process each batch immed) +- DONE figure out why i'm not getting all the responses + - because issue_txns was not incrementing seqno correctly on the last batch + - fixed + - because start_seqno in handle_responses was not actually being initialized + to the correct starting seqno (there is some delay between the time the + issue_txns begins issuing and the time the response_handler thread starts) + - decided to leave this as-is; getting an accurate number is too much effort + - relabeled the messages to be "roughly" + - TODO show aries-write - TODO checkpointing + replaying log from replicas (not from disk) - TODO scale-up on multicore Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/src/main.lzz.clamp 2009-03-10 23:32:14 UTC (rev 1279) @@ -40,6 +40,7 @@ #define ref boost::ref #define tuple boost::tuple #define make_tuple boost::make_tuple +#define unused __attribute__((unused)) using namespace boost; using namespace boost::archive; @@ -548,15 +549,15 @@ void logbuf(const void *buf, size_t len) { of.write(reinterpret_cast<const char*>(buf), len); } - void del(int key) { + void logdel(int key) { int op = op_del; // TODO: is this really necessary? out & op & key; } - void write(int key, int val) { + void logwrite(int key, int val) { int op = op_write; out & op & key & val; } - void commit() { + void logcommit() { int op = op_commit; out & op; } @@ -691,7 +692,6 @@ if (seqno == stop_on_seqno) { cout << "stopping on issue of seqno " << seqno << endl; stop_hub.set(); - break; } ++seqno; @@ -776,14 +776,14 @@ case Op::write: { int value = op.value(); - if (use_pwal) g_wal->write(key, value); + if (use_pwal) g_wal->logwrite(key, value); if (it == map.end()) map[key] = value; else it->second = value; break; } case Op::del: if (it != map.end()) { - if (use_pwal) g_wal->del(key); + if (use_pwal) g_wal->logdel(key); map.erase(it); } break; @@ -792,7 +792,7 @@ } if (res != nullptr) fin_result(*res); - if (use_pwal) g_wal->commit(); + if (use_pwal) g_wal->logcommit(); } void @@ -902,7 +902,6 @@ } __ref(send_states).push(recovery_t()); __ref(w).mark_and_flush(); - st_sleep(1); }); try { @@ -1253,7 +1252,7 @@ recovery_end_time = sub.take(); recovery_end_seqno = seqno; cout << rid << ": "; - showtput("during recovery, finished", recovery_end_time, + showtput("during recovery, finished roughly", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); } } @@ -1261,7 +1260,7 @@ void cleanup() { long long end_time = current_time_millis(); cout << rid << ": "; - showtput("handled", end_time, start_time, seqno, start_seqno); + showtput("handled roughly", end_time, start_time, seqno, start_seqno); if (recovery_end_time > -1) { cout << rid << ": "; showtput("after recovery, finished", end_time, recovery_end_time, @@ -1672,8 +1671,8 @@ swap(buf, reader.buf()); reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = reader.read<uint32_t>(); + unused char *start = reader.start(); + unused uint32_t prefix = reader.read<uint32_t>(); assert(prefix < 10000); assert(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-03-09 23:02:12 UTC (rev 1278) +++ ydb/trunk/tools/test.bash 2009-03-10 23:32:14 UTC (rev 1279) @@ -214,6 +214,7 @@ /tmp/st-1.8.tar.gz \ /tmp/protobuf-2.0.2.tar.bz2 \ /tmp/boost_1_37_0.tar.bz2 \ + /tmp/clamp_053_src.tar.gz \ clamp.patch \ ^:/tmp/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-03-09 23:02:19
|
Revision: 1278 http://assorted.svn.sourceforge.net/assorted/?rev=1278&view=rev Author: yangzhang Date: 2009-03-09 23:02:12 +0000 (Mon, 09 Mar 2009) Log Message: ----------- updated readme Modified Paths: -------------- cpp-commons/trunk/README Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2009-03-09 22:59:57 UTC (rev 1277) +++ cpp-commons/trunk/README 2009-03-09 23:02:12 UTC (rev 1278) @@ -26,21 +26,25 @@ - error handling, such as `die()`, which leverages `strerror` - file I/O utilities, such as reading complete files - function delegates (for use with C functions that take `(void*)(void*)`) +- generic binary stream readers and writers that are more efficient than + std::streambuf - hash functions - low-level system information from `cpuid` +- micro-utilities: noncopyable, expander annotations - `nullptr`: from C++0x +- portable re-implementations of pthread primitives such as barriers - pseudo-random number generators +- raw memory buffer readers and writers for simple serialization - region-based memory management - socket utilities -- portable re-implementations of pthread primitives such as barriers - time utilities, including timers and simpler interfaces to system clocks - utilities for streams - utilities for [tamer] -- micro-utilities: noncopyable, expander annotations - x86 architecture-specific tools Third-party code: +- GCC's [C++0x `unique_ptr`], backported from 144716 - Howard Hinnant's [C++03-emulated TR1 `unique_ptr.hpp`] - [Yonat's STL extensions] - pointainer: auto-cleaning STL container of pointers. Many times this is a @@ -50,6 +54,7 @@ iterating pointainers and containers of smart pointers. - stringizer: turns an object into a std::string. +[C++0x `unique_ptr`]: svn://gcc.gnu.org/svn/gcc/trunk/libstdc++-v3/include/bits/unique_ptr.h [C++03-emulated TR1 `unique_ptr.hpp`]: http://home.roadrunner.com/~hinnant/unique_ptr03.html [Yonat's STL extensions]: http://ootips.org/yonat/4dev/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |