[Assorted-commits] SF.net SVN: assorted:[1301] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-18 09:17:15
|
Revision: 1301 http://assorted.svn.sourceforge.net/assorted/?rev=1301&view=rev Author: yangzhang Date: 2009-03-18 09:17:01 +0000 (Wed, 18 Mar 2009) Log Message: ----------- - added recovery, backlogging, catchup to ydb for tpcc - added ser(), deser() to TPCCTables - added separate (and simple) recovery build-up/catch-up logic to the main thread - added marker, first_seqno_in_chunk, proper overflow_fn (can still be simplified/refactored in conj with "anchored_stream_reader" design) - cleaned up memory management so that we don't double-free things that are backed by a recovery message buffer - added tpcc_recovery_header - added process_buf for catching up (replaying) tpccs from a buffer - added yielding to process_tpccs and reduced yield interval so that other things get a chance to run besides the process_tpccs thread - build system - added cog; used extensively for ser/deser - using gnu++0x for tpcc as well - added SECONDARY targets - added null-read-checking in readmsg - ignore null res in process_tpcc - added some typedefs to bplustree - updated notes/todos Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/Makefile ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/Makefile ydb/trunk/src/tpcc/btree.h ydb/trunk/src/tpcc/tpcctables.h Added Paths: ----------- ydb/trunk/src/tpcc/tpcctables.cc.cog Removed Paths: ------------- ydb/trunk/src/tpcc/tpcctables.cc Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/README 2009-03-18 09:17:01 UTC (rev 1301) @@ -716,7 +716,7 @@ |stock| = 100000 |districts| = 10 |customers| = 30000 - |orders| = 3099 (almost 30000+1000)1 + |orders| = 30991 (almost 30000+1000) |ordersBC| = 30991 |orderlines| = 309477 (almost 300000+1000*10) |cbn| = 30000 @@ -736,12 +736,48 @@ |neworders| = 107973 |history| = 30000 +- DONE try integrating TPC-C; just get txns working for now + - DONE add iteration to btree - added btree_test for this -- TODO try integrating TPC-C; just get txns working for now +- DONE make process_tpccs switch into shifting mode (instead of buffer-swapping + mode) after catch-up +- DONE make process_tpccs rewind & process the accumulated buffer once seqno + has caught up to the first seqno in the buffer +- DONE implement TPC-C network recovery -- TODO implement TPC-C recovery +- TODO full TPC-C txn workload +- TODO prelim measurements on cluster +- TODO PAPER!!! + +- TODO related work + + [HST+91] Svein-Olaf Hvasshovd, Tore Saeter, Oystein Torboørnsen, Petter + Moe, and Oddvar Risnes. A continously available and highly scalable + transaction server: Design experience from the HypRa project. In + Proceedings of the 4th International Workshop on High Performance + Transaction Systems, September 1991. + + Ricardo Jimenez-Peris , M. Patino-Martinez , Gustavo Alonso , Bettina + Kemme, Are quorums an alternative for data replication?, ACM Transactions + on Database Systems (TODS), v.28 n.3, p.257-294, September 2003 + + ~\cite{peris-srds202}R. Jimenez-Peris , M. Patino-Martinez , G. Alonso, + An algorithm for non-intrusive, parallel recovery of replicated data and + its correctness, Proceedings of the 21st IEEE Symposium on Reliable + Distributed Systems (SRDS'02), p.150, October 13-16, 2002 + + ~\cite{bartoli-dsn01}B. Kemme, A. Bartoli, and O. Babaoglu. Online + Reconfiguration in Replicated Databases Based on Group Communication. In + Proc. of the Int. Conf. on Dependable Systems and Networks (DSN 2001), + Goteborg, Sweden, June 2001. + + ~\cite{amir-thesis} Y. Amir. Replication Using Group Communication Over a + Dynamic Network. PhD thesis, Institute of Computer Science, The Hebrew + University of Jerusalem, Israel. Also available at + http://www.dsn.jhu.edu/~yairamir/Yair_phd.pdf, 1995. + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -104,6 +104,11 @@ tpcc/%.o: tpcc/%.cc make -C tpcc/ +tpcc/%.cc: tpcc/%.cc.cog + make -C tpcc/ + +.SECONDARY: tpcc/tpcctables.cc tpcc/tpcctables.o + %.o: %.cc $(COMPILE.cc) $(OUTPUT_OPTION) $< Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/main.lzz.clamp 2009-03-18 09:17:01 UTC (rev 1301) @@ -581,7 +581,7 @@ readmsg(anchored_stream_reader &src, T &msg) { uint32_t len = ntohl(src.read<uint32_t>()); - check(msg.ParseFromArray(src.read(len), len)); + check(msg.ParseFromArray(checkpass(src.read(len)), len)); } enum { op_del, op_write, op_commit }; @@ -923,27 +923,30 @@ dst.ol_quantity = src.ol_quantity(); } NewOrderOutput output; - g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), - items, no.now().c_str(), &output); - res->Clear(); - res->set_seqno(seqno); - NewOrderOutputMsg &outmsg = *res->mutable_new_order(); - outmsg.set_w_tax(output.w_tax); - outmsg.set_d_tax(output.d_tax); - outmsg.set_o_id(output.o_id); - outmsg.set_c_discount(output.c_discount); - outmsg.set_total(output.total); - foreach (const NewOrderOutput::ItemInfo &src, output.items) { - ItemInfoMsg &dst = *outmsg.add_item(); - dst.set_s_quantity(src.s_quantity); - dst.set_i_price(src.i_price); - dst.set_ol_amount(src.ol_amount); - dst.set_brand_generic(src.brand_generic); - dst.set_i_name(src.i_name); + g_tables->newOrder(no.warehouse_id(), no.district_id(), + no.customer_id(), items, no.now().c_str(), + &output); + if (res != nullptr) { + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); } - outmsg.set_c_last(output.c_last); - outmsg.set_c_credit(output.c_credit); - outmsg.set_status(output.status); } else { throw_not_implemented(); } @@ -963,14 +966,38 @@ // the seqno reported by the leader in the Init message, but it may have // issued more since the Init message). int first_seqno = -1; + char *marker = nullptr; + int first_seqno_in_chunk = -1; + TpccReq req; + TpccRes res; - function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { - sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); - memcpy(buf.get(), reader.start(), reader.unread()); - swap(buf, reader.buf()); - __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); - reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); - }; + function<void(anchored_stream_reader& reader)> overflow_fn = + lambda(anchored_stream_reader &reader) { + if (__ref(caught_up)) { + // Anchor should already be correctly set, so just shift down. + shift_reader(reader); + } else if (__ref(first_seqno_in_chunk) == __ref(seqno) + 1) { + // Has the replayer just caught up to the start of the chunk? + ASSERT(reader.buf().get() == reader.anchor()); + // Replay all messages up to but not included the current unprocessed + // message (which we may be in the middle of receiving, triggering this + // overflow). + process_buf(reader.anchor(), __ref(marker), __ref(req), __ref(seqno)); + // Update the anchor to point to the unprocessed message, so that we + // shift the unprocessed message down. + reader.anchor() = __ref(marker); + shift_reader(reader); + } else { + // Push onto backlog and put in new buffer. + ASSERT(reader.buf().get() == reader.anchor()); + __ref(backlog).push(make_tuple(reader.buf(), reader.anchor(), __ref(marker))); + reader.anchor() = __ref(marker); + replace_reader(reader); + cout << "added to backlog, now has " << __ref(backlog).queue().size() + << " chunks" << endl; + } + __ref(marker) = reader.buf().get(); + }; sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); @@ -996,11 +1023,16 @@ __ref(w).mark_and_flush(); }); - TpccReq req; - TpccRes res; - while (true) { + // Has replayer just caught up to the start of the chunk? + if (first_seqno_in_chunk == seqno + 1) { + process_buf(reader.anchor(), reader.start(), req, seqno); + reader.set_anchor(); + } + + marker = reader.start(); + { st_intr intr(stop_hub); readmsg(reader, req); @@ -1013,35 +1045,67 @@ // Prepare recovery msg. send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); } else { - // Backlog (auto-implicit) or process. - if (check_interval(req.seqno(), process_display)) - cout << "processing req " << req.seqno() << endl; - if (req.seqno() == seqno + 1) { - if (!caught_up) { + // Backlog (auto/implicit) or process. + if (!caught_up) { + // If we were at the start of a new buffer (our chunk was recently reset). + if (reader.buf().get() == marker) + first_seqno_in_chunk = req.seqno(); + // If we fully caught up. + if (req.seqno() == seqno + 1) { time_caught_up = current_time_millis(); seqno_caught_up = seqno; - showtput("process_txns caught up; backlogged", + showtput("process_tpccs caught up; backlogged", time_caught_up, start_time, seqno_caught_up, first_seqno == -1 ? init_seqno - 1 : first_seqno); caught_up = true; } + } + if (caught_up) { // Process. process_tpcc(req, seqno, &res); ser(w, res); reader.set_anchor(); } + + // Display/yield. + if (check_interval(req.seqno(), process_display)) + cout << (caught_up ? "processed req " : "backlogged req ") + << req.seqno() << endl; + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); } } } -// XXX +void +process_buf(char *begin, char *end, TpccReq &req, int &seqno) +{ + ASSERT(begin < end); + raw_reader reader(begin); + while (reader.ptr() < end) { + uint32_t len = ntohl(reader.read<uint32_t>()); + ASSERT(len < 10000); + ASSERT(reinterpret_cast<char*>(reader.ptr()) + len <= end); + check(req.ParseFromArray(reader.readptr(len), len)); + process_tpcc(req, seqno, nullptr); + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); + if (check_interval(req.seqno(), process_display)) { + cout << "caught up to req " << req.seqno() << endl; + } + } +} + recovery_t make_tpcc_recovery(int mypos, int nnodes, int seqno) { - mypos = nnodes = seqno; - throw_not_implemented(); - return recovery_t(); + long long start_time = current_time_millis(); + cout << "serializing recovery, db state is now at seqno " + << seqno << ":" << endl; + g_tables->show(); + recovery_t recovery = g_tables->ser(mypos, nnodes, seqno); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); + return recovery; } /** @@ -1799,9 +1863,17 @@ // Start streaming txns to joiner. cout << "start streaming txns to joiner" << endl; + function<void()> handle_responses_joiner_fn; + if (do_tpcc) + handle_responses_joiner_fn = + bind(handle_tpcc_responses, joiner, ref(seqno), rid++, + ref(recover_signals), false); + else + handle_responses_joiner_fn = + bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, + ref(recover_signals), false); newreps.push(replicas.back()); - handlers.insert(my_spawn(bind(handle_responses<RTypes>, joiner, ref(seqno), rid++, - ref(recover_signals), false), + handlers.insert(my_spawn(handle_responses_joiner_fn, "handle_responses_joiner")); } catch (break_exception &ex) { } catch (std::exception &ex) { @@ -1858,6 +1930,7 @@ // Initialize database state. int seqno = -1; mii &map = g_map; + commons::array<char> recarr(0); if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -1953,203 +2026,334 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - if (rec_pwal) { - // Recover from physical log. - cout << "recovering from pwal" << endl; - long long start_time = current_time_millis(); - ifstream inf("pwal"); - binary_iarchive in(inf); - int rseqno = -1; - while (inf.peek() != ifstream::traits_type::eof()) { - int op; - in & op; - switch (op) { - case op_del: - { - int key; - in & key; - mii::iterator it = map.find(key); - map.erase(it); - break; + if (do_tpcc) { + + // + // TPCC txns + // + + g_tables.reset(new TPCCTables); + + if (rec_pwal) { + } else { + + // + // Build-up + // + + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + tpcc_recovery_header hdr; + checkeqnneg(st_read_fully(__ref(replicas[i]), + &hdr, sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof hdr)); + check(hdr.seqno >= 0); + + cout << "receiving recovery of " << hdr.len << " bytes" << endl; + + long long start_time = current_time_millis(); + __ref(recarr).reset(new char[hdr.len], hdr.len); + checkeqnneg(st_read_fully(__ref(replicas[i]), + __ref(recarr).get(), hdr.len, + ST_UTIME_NO_TIMEOUT), + ssize_t(hdr.len)); + + long long before_deser = current_time_millis(); + showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); + + if (__ref(seqno) == -1) + __ref(seqno) = hdr.seqno; + else + checkeq(__ref(seqno), hdr.seqno); + + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + + long long end_time = current_time_millis(); + showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); + cout << "receive & deserialize took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); + +#if 0 + // Resize the table if necessary. + + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - case op_write: - { - int key, val; - in & key & val; - map[key] = val; - break; - } - case op_commit: - ++rseqno; - break; + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); +#endif + }, "recovery_builder" + lexical_cast<string>(i))); } - if (check_interval(rseqno, yield_interval)) st_sleep(0); + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - seqno = init.txnseqno() - 1; - showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); - cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + + // + // Catch-up + // + + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + TpccReq req; + while (!backlog.empty()) { + chunk chunk = backlog.take(); + cout << "took from backlog, now has " << backlog.queue().size() + << " chunks" << endl; + sized_array<char> &buf = chunk.get<0>(); + char *begin = chunk.get<1>(), *end = chunk.get<2>(); + ASSERT(buf.get() <= begin && begin < buf.end()); + ASSERT(buf.get() < end && end < buf.end()); + process_buf(begin, end, req, seqno); + } + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); + } else { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + // + // Simple txns + // - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); - } + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - long long start_time = current_time_millis(); - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); - long long end_time = current_time_millis(); + // + // Build-up + // - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - showdatarate("got recovery message", len, end_time - start_time); - cout << "receive took " << end_time - __ref(before_recv) - << " ms total; now at seqno " << hdr.seqno << endl; + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); + + vector<st_thread_t> recovery_builders; + ASSERT(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); + + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(first) = false; + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } + } + + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); + + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } - } - long long mid_time = current_time_millis(); - int mid_seqno = seqno; - // XXX - using msg::TxnBatch; - using msg::Txn; - commons::array<char> rbuf(0), wbuf(buf_size); - reader reader(nullptr, rbuf.get(), rbuf.size()); - writer writer(lambda(const void*, size_t) { - throw not_supported_exception("should not be writing responses during catch-up phase"); - }, wbuf.get(), wbuf.size()); - stream s(reader, writer); - TxnBatch batch(s); - while (!backlog.empty()) { - chunk chunk = backlog.take(); - sized_array<char> &buf = chunk.get<0>(); - ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); - ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); - ASSERT(chunk.get<1>() < chunk.get<2>()); - swap(buf, reader.buf()); - reader.reset_range(chunk.get<1>(), chunk.get<2>()); - while (reader.start() < reader.end()) { - char *start = reader.start(); - uint32_t prefix = ntohl(reader.read<uint32_t>()); - ASSERT(prefix < 10000); - ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); - batch.Clear(); - for (int t = 0; t < batch.txn_size(); ++t) { - const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; - process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); - if (fake_exec && !Types::is_pb()) { - reader.skip(txn.op_size() * Op_Size); - } + // + // Catch-up + // - if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); - if (check_interval(txn.seqno(), process_display)) { - cout << "caught up txn " << txn.seqno() - << "; db size = " << map.size() - << "; seqno = " << seqno - << "; backlog.size = " << backlog.queue().size() << endl; + long long mid_time = current_time_millis(); + int mid_seqno = seqno; + // XXX + using msg::TxnBatch; + using msg::Txn; + commons::array<char> rbuf(0), wbuf(buf_size); + reader reader(nullptr, rbuf.get(), rbuf.size()); + writer writer(lambda(const void*, size_t) { + throw not_supported_exception("should not be writing responses during catch-up phase"); + }, wbuf.get(), wbuf.size()); + stream s(reader, writer); + TxnBatch batch(s); + while (!backlog.empty()) { + chunk chunk = backlog.take(); + sized_array<char> &buf = chunk.get<0>(); + ASSERT(buf.get() <= chunk.get<1>() && chunk.get<1>() < buf.end()); + ASSERT(buf.get() < chunk.get<2>() && chunk.get<2>() < buf.end()); + ASSERT(chunk.get<1>() < chunk.get<2>()); + swap(buf, reader.buf()); + reader.reset_range(chunk.get<1>(), chunk.get<2>()); + while (reader.start() < reader.end()) { + char *start = reader.start(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); + ASSERT(prefix < 10000); + ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); + batch.Clear(); + for (int t = 0; t < batch.txn_size(); ++t) { + const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; + process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); + if (fake_exec && !Types::is_pb()) { + reader.skip(txn.op_size() * Op_Size); + } + + if (check_interval(txn.seqno(), yield_interval)) st_sleep(0); + if (check_interval(txn.seqno(), process_display)) { + cout << "caught up txn " << txn.seqno() + << "; db size = " << map.size() + << "; seqno = " << seqno + << "; backlog.size = " << backlog.queue().size() << endl; + } } + ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - ASSERT(start + sizeof(uint32_t) + prefix == reader.start()); } - } - g_caught_up = true; + g_caught_up = true; #if 0 - while (!backlog.empty()) { - using pb::Txn; - shared_ptr<Txn> p = backlog.take(); - process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); - if (check_interval(p->seqno(), catch_up_display)) { - cout << "processed txn " << p->seqno() << " off the backlog; " - << "backlog.size = " << backlog.queue().size() << endl; + while (!backlog.empty()) { + using pb::Txn; + shared_ptr<Txn> p = backlog.take(); + process_txn<pb_traits, pb_traits>(map, *p, seqno, nullptr); + if (check_interval(p->seqno(), catch_up_display)) { + cout << "processed txn " << p->seqno() << " off the backlog; " + << "backlog.size = " << backlog.queue().size() << endl; + } + if (check_interval(p->seqno(), yield_interval)) { + // Explicitly yield. (Note that yielding does still effectively + // happen anyway because process_txn is a yield point.) + st_sleep(0); + } } - if (check_interval(p->seqno(), yield_interval)) { - // Explicitly yield. (Note that yielding does still effectively - // happen anyway because process_txn is a yield point.) - st_sleep(0); - } +#endif + showtput("replayer caught up; from backlog replayed", + current_time_millis(), mid_time, seqno, mid_seqno); } -#endif - showtput("replayer caught up; from backlog replayed", - current_time_millis(), mid_time, seqno, mid_seqno); } } catch (std::exception &ex) { cerr_thread_ex(ex) << endl; @@ -2566,7 +2770,7 @@ "size of the incoming (read) buffer in bytes") ("write-buf", po::value<size_t>(&buf_size)->default_value(1e5), "size of the outgoing (write) buffer in bytes") - ("yield_interval,y", po::value<int>(&yield_interval)->default_value(10000), + ("yield-interval,y", po::value<int>(&yield_interval)->default_value(1000), "number of txns before yielding") ("timelim,T", po::value<long long>(&timelim)->default_value(0), "general network IO time limit in milliseconds, or 0 for none") Modified: ydb/trunk/src/tpcc/Makefile =================================================================== --- ydb/trunk/src/tpcc/Makefile 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/Makefile 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,9 +1,9 @@ WARNINGS = -Werror -Wall -Wextra -Wconversion -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings -Woverloaded-virtual -Wno-sign-compare -Wno-unused-parameter # Debug flags -CXXFLAGS = -g -MD $(WARNINGS) -I.. +CXXFLAGS = -g -MD $(WARNINGS) -I.. -std=gnu++0x # Optimization flags -#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) +#CXXFLAGS = -g -O3 -DNDEBUG -MD $(WARNINGS) -std=gnu++0x # Link withthe C++ standard library #LDFLAGS=-lstdc++ @@ -15,7 +15,12 @@ btree_test: btree_test.o +%.cc: %.cc.cog + cog.py $< > $@ + clean : rm -f *.o *.d $(BINARIES) -include *.d + +.SECONDARY: tpcctables.cc Modified: ydb/trunk/src/tpcc/btree.h =================================================================== --- ydb/trunk/src/tpcc/btree.h 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/btree.h 2009-03-18 09:17:01 UTC (rev 1301) @@ -46,9 +46,13 @@ template <typename KEY, typename VALUE, unsigned N, unsigned M, unsigned INNER_NODE_PADDING= 0, unsigned LEAF_NODE_PADDING= 0, unsigned NODE_ALIGNMENT= 64> - class BPlusTree +class BPlusTree { public: + typedef pair<KEY, VALUE> entry_type; + typedef KEY key_type; + typedef VALUE data_type; + // N must be greater than two to make the split of // two inner nodes sensible. BOOST_STATIC_ASSERT(N>2); @@ -222,7 +226,8 @@ public: typedef typename BPlusTree::InnerNode inner_type; typedef typename BPlusTree::LeafNode leaf_type; - iterator(BPlusTree &tree) : tree_(tree) { + typedef typename BPlusTree::entry_type entry_type; + iterator(const BPlusTree &tree) : tree_(tree) { if (tree.size_ > 0) { stack_.push_back(make_pair(tree.root, 0)); // If depth = 0, that means we have only a root node. @@ -248,7 +253,7 @@ #endif return *this; } - pair<KEY, VALUE> operator*() { + entry_type operator*() { return make_pair(leaf().keys[pos()], leaf().values[pos()]); } bool end() { return stack_.empty(); } @@ -268,7 +273,7 @@ leaf().num_keys - 1 : inner().num_keys)) stack_.pop_back(); // back up } - BPlusTree &tree_; + const BPlusTree &tree_; // stack of (node, pos) deque< pair<void*, size_t> > stack_; }; Deleted: ydb/trunk/src/tpcc/tpcctables.cc =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc 2009-03-17 17:24:21 UTC (rev 1300) +++ ydb/trunk/src/tpcc/tpcctables.cc 2009-03-18 09:17:01 UTC (rev 1301) @@ -1,622 +0,0 @@ -#include "tpcctables.h" - -#include <algorithm> -#include <limits> -#include <vector> - -#include <commons/assert.h> -#include <iostream> -#include "stlutil.h" - -using std::vector; - -bool CustomerByNameOrdering::operator()(const Customer* a, const Customer* b) { - if (a->c_w_id < b->c_w_id) return true; - if (a->c_w_id > b->c_w_id) return false; - assert(a->c_w_id == b->c_w_id); - - if (a->c_d_id < b->c_d_id) return true; - if (a->c_d_id > b->c_d_id) return false; - assert(a->c_d_id == b->c_d_id); - - int diff = strcmp(a->c_last, b->c_last); - if (diff < 0) return true; - if (diff > 0) return false; - assert(diff == 0); - - // Finally delegate to c_first - return strcmp(a->c_first, b->c_first) < 0; -} - -template <typename KeyType, typename ValueType> -static void deleteBTreeValues(BPlusTree<KeyType, ValueType*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* btree) { - KeyType key = std::numeric_limits<KeyType>::max(); - ValueType* value = NULL; - while (btree->findLastLessThan(key, &value, &key)) { - assert(value != NULL); - delete value; - } -} - -TPCCTables::~TPCCTables() { - // Clean up the b-trees with this gross hack - deleteBTreeValues(&warehouses_); - deleteBTreeValues(&stock_); - deleteBTreeValues(&districts_); - deleteBTreeValues(&orders_); - deleteBTreeValues(&orderlines_); - - STLDeleteValues(&neworders_); - STLDeleteElements(&customers_by_name_); - STLDeleteElements(&history_); -} - -int TPCCTables::stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { - /* EXEC SQL SELECT d_next_o_id INTO :o_id FROM district - WHERE d_w_id=:w_id AND d_id=:d_id; */ - //~ printf("stock level %d %d %d\n", warehouse_id, district_id, threshold); - District* d = findDistrict(warehouse_id, district_id); - int32_t o_id = d->d_next_o_id; - - /* EXEC SQL SELECT COUNT(DISTINCT (s_i_id)) INTO :stock_count FROM order_line, stock - WHERE ol_w_id=:w_id AND ol_d_id=:d_id AND ol_o_id<:o_id AND ol_o_id>=:o_id-20 - AND s_w_id=:w_id AND s_i_id=ol_i_id AND s_quantity < :threshold;*/ - - - // retrieve up to 300 tuples from order line, using ( [o_id-20, o_id), d_id, w_id, [1, 15]) - // and for each retrieved tuple, read the corresponding stock tuple using (ol_i_id, w_id) - // NOTE: This is a cheat because it hard codes the maximum number of orders. - // We really should use the ordered b-tree index to find (0, o_id-20, d_id, w_id) then iterate - // until the end. This will also do less work (wasted finds). Since this is only 4%, it probably - // doesn't matter much - - // TODO: Test the performance more carefully. I tried: std::set, std::hash_set, std::vector - // with linear search, and std::vector with binary search using std::lower_bound. The best - // seemed to be to simply save all the s_i_ids, then sort and eliminate duplicates at the end. - std::vector<int32_t> s_i_ids; - // Average size is more like ~30. - s_i_ids.reserve(300); - - // Iterate over [o_id-20, o_id) - for (int order_id = o_id - STOCK_LEVEL_ORDERS; order_id < o_id; ++order_id) { - // HACK: We shouldn't rely on MAX_OL_CNT. See comment above. - for (int line_number = 1; line_number <= Order::MAX_OL_CNT; ++line_number) { - OrderLine* line = findOrderLine(warehouse_id, district_id, order_id, line_number); - if (line == NULL) { - // We can break since we have reached the end of the lines for this order. - // TODO: A btree iterate in (w_id, d_id, o_id) order would be a clean way to do this -#ifndef NDEBUG - for (int test_line_number = line_number + 1; line_number < Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(warehouse_id, district_id, order_id, test_line_number) == NULL); - } -#endif - break; - } - - // Check if s_quantity < threshold - Stock* stock = findStock(warehouse_id, line->ol_i_id); - if (stock->s_quantity < threshold) { - s_i_ids.push_back(line->ol_i_id); - } - } - } - - // Filter out duplicate s_i_id: multiple order lines can have the same item - std::sort(s_i_ids.begin(), s_i_ids.end()); - int num_distinct = 0; - int32_t last = -1; // NOTE: This relies on -1 being an invalid s_i_id - for (size_t i = 0; i < s_i_ids.size(); ++i) { - if (s_i_ids[i] != last) { - last = s_i_ids[i]; - num_distinct += 1; - } - } - - return num_distinct; -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, OrderStatusOutput* output) { - //~ printf("order status %d %d %d\n", warehouse_id, district_id, customer_id); - internalOrderStatus(findCustomer(warehouse_id, district_id, customer_id), output); -} - -void TPCCTables::orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, OrderStatusOutput* output) { - //~ printf("order status %d %d %s\n", warehouse_id, district_id, c_last); - Customer* customer = findCustomerByName(warehouse_id, district_id, c_last); - internalOrderStatus(customer, output); -} - -void TPCCTables::internalOrderStatus(Customer* customer, OrderStatusOutput* output) { - output->c_id = customer->c_id; - // retrieve from customer: balance, first, middle, last - output->c_balance = customer->c_balance; - strcpy(output->c_first, customer->c_first); - strcpy(output->c_middle, customer->c_middle); - strcpy(output->c_last, customer->c_last); - - // Find the row in the order table with largest o_id - Order* order = findLastOrderByCustomer(customer->c_w_id, customer->c_d_id, customer->c_id); - output->o_id = order->o_id; - output->o_carrier_id = order->o_carrier_id; - strcpy(output->o_entry_d, order->o_entry_d); - - output->lines.resize(order->o_ol_cnt); - for (int32_t line_number = 1; line_number <= order->o_ol_cnt; ++line_number) { - OrderLine* line = findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number); - output->lines[line_number-1].ol_i_id = line->ol_i_id; - output->lines[line_number-1].ol_supply_w_id = line->ol_supply_w_id; - output->lines[line_number-1].ol_quantity = line->ol_quantity; - output->lines[line_number-1].ol_amount = line->ol_amount; - strcpy(output->lines[line_number-1].ol_delivery_d, line->ol_delivery_d); - } -#ifndef NDEBUG - // Verify that none of the other OrderLines exist. - for (int32_t line_number = order->o_ol_cnt+1; line_number <= Order::MAX_OL_CNT; ++line_number) { - assert(findOrderLine(customer->c_w_id, customer->c_d_id, order->o_id, line_number) == NULL); - } -#endif -} - -bool TPCCTables::newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, - const std::vector<NewOrderItem>& items, const char* now, NewOrderOutput* output) { - //~ printf("new order %d %d %d %d %s\n", warehouse_id, district_id, customer_id, items.size(), now); - // 2.4.3.4. requires that we display c_last, c_credit, and o_id for rolled back transactions: - // read those values first - District* d = findDistrict(warehouse_id, district_id); - output->d_tax = d->d_tax; - output->o_id = d->d_next_o_id; - assert(findOrder(warehouse_id, district_id, output->o_id) == NULL); - - Customer* c = findCustomer(warehouse_id, district_id, customer_id); - assert(sizeof(output->c_last) == sizeof(c->c_last)); - memcpy(output->c_last, c->c_last, sizeof(output->c_last)); - memcpy(output->c_credit, c->c_credit, sizeof(output->c_credit)); - output->c_discount = c->c_discount; - - // CHEAT: Validate all items to see if we will need to abort - vector<Item*> item_tuples(items.size()); - bool all_local = true; - for (int i = 0; i < items.size(); ++i) { - item_tuples[i] = findItem(items[i].i_id); - if (item_tuples[i] == NULL) { - strcpy(output->status, NewOrderOutput::INVALID_ITEM_STATUS); - return false; - } - all_local = all_local && items[i].ol_supply_w_id == warehouse_id; - } - - // We will not abort: update the status and the database state - output->status[0] = '\0'; - - // Modify the order id to assign it - d->d_next_o_id += 1; - - Warehouse* w = findWarehouse(warehouse_id); - output->w_tax = w->w_tax; - - Order order; - order.o_w_id = warehouse_id; - order.o_d_id = district_id; - order.o_id = output->o_id; - order.o_c_id = customer_id; - order.o_carrier_id = Order::NULL_CARRIER_ID; - order.o_ol_cnt = static_cast<int32_t>(items.size()); - order.o_all_local = all_local ? 1 : 0; - strcpy(order.o_entry_d, now); - assert(strlen(order.o_entry_d) == DATETIME_SIZE); - insertOrder(order); - insertNewOrder(warehouse_id, district_id, output->o_id); - - OrderLine line; - line.ol_o_id = output->o_id; - line.ol_d_id = district_id; - line.ol_w_id = warehouse_id; - memset(line.ol_delivery_d, 0, DATETIME_SIZE+1); - - output->items.resize(items.size()); - output->total = 0; - for (int i = 0; i < items.size(); ++i) { - line.ol_number = i+1; - line.ol_i_id = items[i].i_id; - line.ol_supply_w_id = items[i].ol_supply_w_id; - line.ol_quantity = items[i].ol_quantity; - - // Read and update stock - Stock* stock = findStock(items[i].ol_supply_w_id, items[i].i_id); - if (stock->s_quantity >= items[i].ol_quantity + 10) { - stock->s_quantity -= items[i].ol_quantity; - } else { - stock->s_quantity = stock->s_quantity - items[i].ol_quantity + 91; - } - output->items[i].s_quantity = stock->s_quantity; - assert(sizeof(line.ol_dist_info) == sizeof(stock->s_dist[district_id])); - memcpy(line.ol_dist_info, stock->s_dist[district_id], sizeof(line.ol_dist_info)); - stock->s_ytd += items[i].ol_quantity; - stock->s_order_cnt += 1; - if (items[i].ol_supply_w_id != warehouse_id) { - // remote order - stock->s_remote_cnt += 1; - } - bool stock_is_original = (strstr(stock->s_data, "ORIGINAL") != NULL); - - assert(sizeof(output->items[i].i_name) == sizeof(item_tuples[i]->i_name)); - memcpy(output->items[i].i_name, item_tuples[i]->i_name, sizeof(output->items[i].i_name)); - output->items[i].i_price = item_tuples[i]->i_price; - output->items[i].ol_amount = - static_cast<float>(items[i].ol_quantity) * item_tuples[i]->i_price; - line.ol_amount = output->items[i].ol_amount; - output->total += output->items[i].ol_amount; - if (stock_is_original && strstr(item_tuples[i]->i_data, "ORIGINAL") != NULL) { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::BRAND; - } else { - output->items[i].brand_generic = NewOrderOutput::ItemInfo::GENERIC; - } - insertOrderLine(line); - } - - return true; -} - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %d %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, customer_id, h_amount, now); - Customer* customer = findCustomer(c_warehouse_id, c_district_id, customer_id); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - - -void TPCCTables::payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, - int32_t c_district_id, const char* c_last, float h_amount, const char* now, - PaymentOutput* output) { - //~ printf("payment %d %d %d %d %s %f %s\n", warehouse_id, district_id, c_warehouse_id, c_district_id, c_last, h_amount, now); - Customer* customer = findCustomerByName(c_warehouse_id, c_district_id, c_last); - internalPayment(warehouse_id, district_id, customer, h_amount, now, output); -} - -void TPCCTables::internalPayment(int32_t warehouse_id, int32_t district_id, Customer* c, - float h_amount, const char* now, PaymentOutput* output) { - Warehouse* w = findWarehouse(warehouse_id); - w->w_ytd += h_amount; - output->warehouse = *w; - - District* d = findDistrict(warehouse_id, district_id); - d->d_ytd += h_amount; - output->district = *d; - - c->c_balance -= h_amount; - c->c_ytd_payment += h_amount; - c->c_payment_cnt += 1; - if (strcmp(c->c_credit, Customer::BAD_CREDIT) == 0) { - // Bad credit: insert history into c_data - static const int HISTORY_SIZE = Customer::MAX_DATA+1; - char history[HISTORY_SIZE]; - int characters = snprintf(history, HISTORY_SIZE, "(%d, %d, %d, %d, %d, %.2f)\n", - c->c_id, c->c_d_id, c->c_w_id, district_id, warehouse_id, h_amount); - assert(characters < HISTORY_SIZE); - - // Perform the insert with a move and copy - int current_keep = static_cast<int>(strlen(c->c_data)); - if (current_keep + characters > Customer::MAX_DATA) { - current_keep = Customer::MAX_DATA - characters; - } - assert(current_keep + characters <= Customer::MAX_DATA); - memmove(c->c_data+characters, c->c_data, current_keep); - memcpy(c->c_data, history, characters); - c->c_data[characters + current_keep] = '\0'; - assert(strlen(c->c_data) == characters + current_keep); - } - output->customer = *c; - - // Insert the line into the history table - History h; - h.h_w_id = warehouse_id; - h.h_d_id = district_id; - h.h_c_w_id = c->c_w_id; - h.h_c_d_id = c->c_d_id; - h.h_c_id = c->c_id; - h.h_amount = h_amount; - strcpy(h.h_date, now); - strcpy(h.h_data, w->w_name); - strcat(h.h_data, " "); - strcat(h.h_data, d->d_name); - insertHistory(h); -} - -// forward declaration for delivery -static int64_t makeNewOrderKey(int32_t w_id, int32_t d_id, int32_t o_id); - -void TPCCTables::delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, - std::vector<DeliveryOrderInfo>* orders) { - //~ printf("delivery %d %d %s\n", warehouse_id, carrier_id, now); - orders->clear(); - for (int32_t d_id = 1; d_id <= District::NUM_PER_WAREHOUSE; ++d_id) { - // Find and remove the lowest numbered order for the district - int64_t key = makeNewOrderKey(warehouse_id, d_id, 1); - NewOrderMap::iterator iterator = neworders_.lower_bound(key); - NewOrder* neworder = NULL; - if (iterator != neworders_.end()) { - neworder = iterator->second; - assert(neworder != NULL); - } - if (neworder == NULL || neworder->no_d_id != d_id || neworder->no_w_id != warehouse_id) { - // No orders for this district - // TODO: 2.7.4.2: If this occurs in max(1%, 1) of transactions, report it (???) - continue; - } - assert(neworder->no_d_id == d_id && neworder->no_w_id == warehouse_id); - int32_t o_id = neworder->no_o_id; - neworders_.erase(iterator); - delete neworder; - - DeliveryOrderInfo order; - order.d_id = d_id; - order.o_id = o_id; - orders->push_back(order); - - Order* o = findOrder(warehouse_id, d_id, o_id); - assert(o->o_carrier_id == Order::NULL_CARRIER_ID); - o->o_carrier_id = carrier_id; - - float total = 0; - // TODO: Select based on (w_id, d_id, o_id) rather than using ol_number? - for (int32_t i = 1; i <= o->o_ol_cnt; ++i) { - OrderLine* line = findOrderLine(warehouse_id, d_id, o_id, i); - assert(0 == strlen(line->ol_delivery_d)); - strcpy(line->ol_delivery_d, now); - assert(strlen(line->ol_delivery_d) == DATETIME_SIZE); - total += line->ol_amount; - } - - Customer* c = findCustomer(warehouse_id, d_id, o->o_c_id); - c->c_balance += total; - c->c_delivery_cnt += 1; - } -} - -template <typename T> -static T* insert(BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* tree, int32_t key, const T& item) { - assert(!tree->find(key)); - T* copy = new T(item); - tree->insert(key, copy); - return copy; -} - -template <typename T> -static T* find(const BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>& tree, int32_t key) { - T* output = NULL; - if (tree.find(key, &output)) { - return output; - } - return NULL; -} - -void TPCCTables::insertItem(const Item& item) { - assert(item.i_id == items_.size() + 1); - items_.push_back(item); -} -Item* TPCCTables::findItem(int32_t id) { - assert(1 <= id); - id -= 1; - if (id >= items_.size()) return NULL; - return &items_[id]; -} - -void TPCCTables::insertWarehouse(const Warehouse& w) { - insert(&warehouses_, w.w_id, w); -} -Warehouse* TPCCTables::findWarehouse(int32_t id) { - return find(warehouses_, id); -} - -static int32_t makeStockKey(int32_t w_id, int32_t s_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= s_id && s_id <= Stock::NUM_STOCK_PER_WAREHOUSE); - int32_t id = s_id + (w_id * Stock::NUM_STOCK_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertStock(const Stock& stock) { - insert(&stock_, makeStockKey(stock.s_w_id, stock.s_i_id), stock); -} -Stock* TPCCTables::findStock(int32_t w_id, int32_t s_id) { - return find(stock_, makeStockKey(w_id, s_id)); -} - -static int32_t makeDistrictKey(int32_t w_id, int32_t d_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - int32_t id = d_id + (w_id * District::NUM_PER_WAREHOUSE); - assert(id >= 0); - return id; -} - -void TPCCTables::insertDistrict(const District& district) { - insert(&districts_, makeDistrictKey(district.d_w_id, district.d_id), district); -} -District* TPCCTables::findDistrict(int32_t w_id, int32_t d_id) { - return find(districts_, makeDistrictKey(w_id, d_id)); -} - -static int32_t makeCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - int32_t id = (w_id * District::NUM_PER_WAREHOUSE + d_id) - * Customer::NUM_PER_DISTRICT + c_id; - assert(id >= 0); - return id; -} - -void TPCCTables::insertCustomer(const Customer& customer) { - Customer* c = insert(&customers_, makeCustomerKey(customer.c_w_id, customer.c_d_id, customer.c_id), customer); - assert(customers_by_name_.find(c) == customers_by_name_.end()); - customers_by_name_.insert(c); -} -Customer* TPCCTables::findCustomer(int32_t w_id, int32_t d_id, int32_t c_id) { - return find(customers_, makeCustomerKey(w_id, d_id, c_id)); -} - -Customer* TPCCTables::findCustomerByName(int32_t w_id, int32_t d_id, const char* c_last) { - // select (w_id, d_id, *, c_last) order by c_first - Customer c; - c.c_w_id = w_id; - c.c_d_id = d_id; - strcpy(c.c_last, c_last); - c.c_first[0] = '\0'; - CustomerByNameSet::const_iterator it = customers_by_name_.lower_bound(&c); - assert(it != customers_by_name_.end()); - assert((*it)->c_w_id == w_id && (*it)->c_d_id == d_id && strcmp((*it)->c_last, c_last) == 0); - - // go to the "next" c_last - // TODO: This is a GROSS hack. Can we do better? - int length = static_cast<int>(strlen(c_last)); - if (length == Customer::MAX_LAST) { - c.c_last[length-1] = static_cast<char>(c.c_last[length-1] + 1); - } else { - c.c_last[length] = 'A'; - c.c_last[length+1] = '\0'; - } - CustomerByNameSet::const_iterator stop = customers_by_name_.lower_bound(&c); - - Customer* customer = NULL; - // Choose position n/2 rounded up (1 based addressing) = floor((n-1)/2) - if (it != stop) { - CustomerByNameSet::const_iterator middle = it; - ++it; - int i = 0; - while (it != stop) { - // Increment the middle iterator on every second iteration - if (i % 2 == 1) { - ++middle; - } - assert(strcmp((*it)->c_last, c_last) == 0); - ++it; - ++i; - } - // There were i+1 matching last names - customer = *middle; - } - - assert(customer->c_w_id == w_id && customer->c_d_id == d_id && - strcmp(customer->c_last, c_last) == 0); - return customer; -} - -static int32_t makeOrderKey(int32_t w_id, int32_t d_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - // TODO: This is bad for locality since o_id is in the most significant position. Larger keys? - int32_t id = (o_id * District::NUM_PER_WAREHOUSE + d_id) - * Warehouse::MAX_WAREHOUSE_ID + w_id; - assert(id >= 0); - return id; -} - -static int64_t makeOrderByCustomerKey(int32_t w_id, int32_t d_id, int32_t c_id, int32_t o_id) { - assert(1 <= w_id && w_id <= Warehouse::MAX_WAREHOUSE_ID); - assert(1 <= d_id && d_id <= District::NUM_PER_WAREHOUSE); - assert(1 <= c_id && c_id <= Customer::NUM_PER_DISTRICT); - assert(1 <= o_id && o_id <= Order::MAX_ORDER_ID); - int32_t top_id = (w_id * District::NUM_PER_WAREHOUSE + d_id) * Customer::NUM_PER_DISTRICT - + c_id; - assert(top_id >= 0); - int64_t id = (((int64_t) top_id) << 32) | o_id; - assert(id > 0); - return id; -} - -void TPCCTabl... [truncated message content] |