[Assorted-commits] SF.net SVN: assorted:[1294] ydb/trunk
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-16 22:32:32
|
Revision: 1294 http://assorted.svn.sourceforge.net/assorted/?rev=1294&view=rev Author: yangzhang Date: 2009-03-16 22:32:07 +0000 (Mon, 16 Mar 2009) Log Message: ----------- started integrating tpcc txn processing into ydb; currently only handles new order txns and no recovery Modified Paths: -------------- ydb/trunk/README ydb/trunk/src/main.lzz.clamp ydb/trunk/src/tpcc/tpccclient.cc ydb/trunk/src/ydb.proto Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/README 2009-03-16 22:32:07 UTC (rev 1294) @@ -684,8 +684,10 @@ - stock 100k 306 - item 100k 82 -- TODO try integrating TPC-C +- TODO try integrating TPC-C; just get txns working for now +- TODO implement TPC-C recovery + - TODO faster disk logging using separate threads - TODO show aries-write Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/main.lzz.clamp 2009-03-16 22:32:07 UTC (rev 1294) @@ -93,13 +93,14 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, + use_pb, use_pb_res, g_caught_up, rec_pwal, do_tpcc, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; // Control. st_intr_bool stop_hub, kill_hub; st_bool do_pause; +bool stopped_issuing; // Statistics. int updates; @@ -326,6 +327,17 @@ //typedef string ser_t; typedef ser_array ser_t; +template<typename T> +void +ser(writer &w, const T &msg) +{ + uint32_t len = msg.ByteSize(); + w.mark(); + w.reserve(len); + check(msg.SerializeToArray(w.cur(), len)); + w.skip(len); +} + /** * Serialization. * @@ -564,6 +576,14 @@ check(msg.ParseFromArray(src.read(len), len)); } +template<typename T> +inline void +readmsg(anchored_stream_reader &src, T &msg) +{ + uint32_t len = ntohl(src.read<uint32_t>()); + check(msg.ParseFromArray(src.read(len), len)); +} + enum { op_del, op_write, op_commit }; /** @@ -885,6 +905,145 @@ } #end +unique_ptr<TPCCTables> g_tables; + +void +process_tpcc(const TpccReq &req, int &seqno, TpccRes *res) +{ + checkeq(req.seqno(), seqno + 1); + ++seqno; + if (req.has_new_order()) { + const NewOrderMsg &no = req.new_order(); + vector<NewOrderItem> items(no.item_size()); + for (int i = 0; i < no.item_size(); ++i) { + NewOrderItem &dst = items[i]; + const NewOrderItemMsg &src = no.item(i); + dst.i_id = src.i_id(); + dst.ol_supply_w_id = src.ol_supply_w_id(); + dst.ol_quantity = src.ol_quantity(); + } + NewOrderOutput output; + g_tables->newOrder(no.warehouse_id(), no.district_id(), no.customer_id(), + items, no.now().c_str(), &output); + res->Clear(); + res->set_seqno(seqno); + NewOrderOutputMsg &outmsg = *res->mutable_new_order(); + outmsg.set_w_tax(output.w_tax); + outmsg.set_d_tax(output.d_tax); + outmsg.set_o_id(output.o_id); + outmsg.set_c_discount(output.c_discount); + outmsg.set_total(output.total); + foreach (const NewOrderOutput::ItemInfo &src, output.items) { + ItemInfoMsg &dst = *outmsg.add_item(); + dst.set_s_quantity(src.s_quantity); + dst.set_i_price(src.i_price); + dst.set_ol_amount(src.ol_amount); + dst.set_brand_generic(src.brand_generic); + dst.set_i_name(src.i_name); + } + outmsg.set_c_last(output.c_last); + outmsg.set_c_credit(output.c_credit); + outmsg.set_status(output.status); + } else { + throw_not_implemented(); + } +} + +void +process_tpccs(st_netfd_t leader, int &seqno, + st_channel<recovery_t> &send_states, + st_channel<chunk> &backlog, int init_seqno, + int mypos, int nnodes) +{ + bool caught_up = init_seqno == 0; + long long start_time = current_time_millis(), + time_caught_up = caught_up ? start_time : -1; + int seqno_caught_up = caught_up ? seqno : -1; + // Used by joiner only to tell where we actually started (init_seqno is just + // the seqno reported by the leader in the Init message, but it may have + // issued more since the Init message). + int first_seqno = -1; + + function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { + sized_array<char> buf(new char[reader.buf().size()], reader.buf().size()); + memcpy(buf.get(), reader.start(), reader.unread()); + swap(buf, reader.buf()); + __ref(backlog).push(make_tuple(buf, reader.anchor(), reader.start())); + reader.reset_range(reader.buf().get(), reader.buf().get() + reader.unread()); + }; + + sized_array<char> rbuf(new char[read_buf_size], read_buf_size); + commons::array<char> wbuf(buf_size); + anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), + overflow_fn, rbuf.get(), rbuf.size()); + writer w(lambda(const void *buf, size_t len) { + checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), + static_cast<ssize_t>(len)); + }, wbuf.get(), wbuf.size()); + + finally f(lambda () { + long long now = current_time_millis(); + stopped_issuing = true; + showtput("processed", now, __ref(start_time), __ref(seqno), + __ref(init_seqno)); + if (!__ref(caught_up)) { + cout << "live-processing: never entered this phase (never caught up)" << endl; + } else { + showtput("live-processed", now, __ref(time_caught_up), __ref(seqno), + __ref(seqno_caught_up)); + } + __ref(send_states).push(recovery_t()); + __ref(w).mark_and_flush(); + }); + + TpccReq req; + TpccRes res; + + while (true) + { + { + st_intr intr(stop_hub); + readmsg(reader, req); + } + + if (req.seqno() == -1) { + // End of stream. + break; + } else if (req.seqno() == -2) { + // Prepare recovery msg. + send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); + } else { + // Backlog (auto-implicit) or process. + if (check_interval(req.seqno(), process_display)) + cout << "processing req " << req.seqno() << endl; + if (req.seqno() == seqno + 1) { + if (!caught_up) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_txns caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; + } + // Process. + process_tpcc(req, seqno, &res); + ser(w, res); + reader.set_anchor(); + } + } + } + +} + +// XXX +recovery_t +make_tpcc_recovery(int mypos, int nnodes, int seqno) +{ + mypos = nnodes = seqno; + throw_not_implemented(); + return recovery_t(); +} + /** * Actually do the work of executing a transaction and sending back the reply. * @@ -936,7 +1095,6 @@ sized_array<char> rbuf(new char[read_buf_size], read_buf_size); commons::array<char> wbuf(buf_size); st_reader reader(leader, rbuf.get(), rbuf.size()); - vector<st_netfd_t> leader_v(1, leader); writer w(lambda(const void *buf, size_t len) { checkeqnneg(st_write(__ref(leader), buf, len, ST_UTIME_NO_TIMEOUT), static_cast<ssize_t>(len)); @@ -998,7 +1156,7 @@ } } else { st_intr intr(stop_hub); - prefix = reader.read<uint32_t>(); + prefix = ntohl(reader.read<uint32_t>()); check(prefix < 10000); batch.Clear(); } @@ -1103,7 +1261,7 @@ #if 0 template<typename mii> unique_ptr<Recovery> -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { // TODO make this faster cout << "generating recovery..." << endl; @@ -1134,7 +1292,7 @@ template<typename mii> recovery_t -make_recovery(const mii &map, int mypos, int nnodes, int &seqno) +make_recovery(const mii &map, int mypos, int nnodes, int seqno) { return recovery_t(); } @@ -1156,7 +1314,7 @@ template<> recovery_t -make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int &seqno) +make_recovery(const snap_map<int, int> &map, int mypos, int nnodes, int seqno) { const commons::array<entry> &src = map.get_table(); pair<size_t, size_t> range = recovery_range(src.size(), mypos, nnodes); @@ -1233,7 +1391,7 @@ try { st_intr intr(stop_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } catch (...) { // TODO: only catch interruptions // This check on seqnos is OK for termination since the seqno will // never grow again if stop_hub is set. @@ -1251,7 +1409,7 @@ // to get all the acks back). st_intr intr(kill_hub); if (Types::is_pb()) readmsg(reader, batch); - else { prefix = reader.read<uint32_t>(); batch.Clear(); } + else { prefix = ntohl(reader.read<uint32_t>()); batch.Clear(); } } for (int i = 0; i < batch.res_size(); ++i) { @@ -1353,6 +1511,142 @@ h.run<Types>(); } +class tpcc_response_handler +{ +public: + tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) + : + replica(replica), + seqno(seqno), + rid(rid), + recover_signals(recover_signals), + caught_up(caught_up), + sub(recover_signals.subscribe()), + start_time(current_time_millis()), + recovery_start_time(caught_up ? -1 : start_time), + recovery_end_time(-1), + start_seqno(seqno), + recovery_start_seqno(caught_up ? -1 : seqno), + recovery_end_seqno(-1), + last_seqno(-1) + {} + + void run() { + finally f(bind(&tpcc_response_handler::cleanup, this)); + + commons::array<char> rbuf(read_buf_size), wbuf(buf_size); + st_reader reader(replica, rbuf.get(), rbuf.size()); + writer w(lambda(const void*, size_t) { + throw not_supported_exception("response handler should not be writing"); + }, wbuf.get(), wbuf.size()); + stream s(reader,w); + + long long last_display_time = current_time_millis(); + + function<void()> loop_cleanup = + bind(&tpcc_response_handler::loop_cleanup, this); + + TpccRes res; + + while (true) { + finally f(loop_cleanup); + + // Read the message, but correctly respond to interrupts so that we can + // cleanly exit (slightly tricky). + if (stopped_issuing) { + st_intr intr(stop_hub); + readmsg(reader, res); + } else { + st_intr intr(kill_hub); + readmsg(reader, res); + } + + if (res.seqno() < last_seqno) + throw msg_exception(string("response seqno decreased from ") + + lexical_cast<string>(last_seqno) + " to " + + lexical_cast<string>(res.seqno())); + + if (!caught_up) { + long long now = current_time_millis(), time_diff = now - start_time; + caught_up = true; + recover_signals.push(now); + cout << rid << ": " << "recovering node caught up; took " + << time_diff << " ms" << endl; + // This will cause the program to exit eventually, but cleanly, such that + // the recovery time will be set first, before the eventual exit (which + // may not even happen in the current iteration). + if (stop_on_recovery) { + cout << "stopping on recovery" << endl; + stop_hub.set(); + } + } + + if (check_interval(res.seqno(), handle_responses_display)) { + cout << rid << ": " << "got response " << res.seqno() << " from " + << replica << "; "; + long long display_time = current_time_millis(); + showtput("handling", display_time, last_display_time, res.seqno(), + res.seqno() - handle_responses_display); + last_display_time = display_time; + } + if (check_interval(res.seqno(), yield_interval)) { + st_sleep(0); + } + last_seqno = res.seqno(); + } + } + +private: + void loop_cleanup() { + // The first timestamp that comes down the subscription pipeline is the + // recovery start time, issued by the main thread. The second one is the + // recovery end time, issued by the response handler associated with the + // joiner. + if (recovery_start_time == -1 && !sub.empty()) { + recovery_start_time = sub.take(); + recovery_start_seqno = last_seqno; + cout << rid << ": "; + showtput("before recovery, finished", recovery_start_time, start_time, + recovery_start_seqno, 0); + } else if (recovery_end_time == -1 && !sub.empty()) { + recovery_end_time = sub.take(); + recovery_end_seqno = last_seqno; + cout << rid << ": "; + showtput("during recovery, finished roughly", recovery_end_time, + recovery_start_time, recovery_end_seqno, recovery_start_seqno); + } + } + + void cleanup() { + long long end_time = current_time_millis(); + cout << rid << ": "; + showtput("handled roughly", end_time, start_time, seqno, start_seqno); + if (recovery_end_time > -1) { + cout << rid << ": "; + showtput("after recovery, finished", end_time, recovery_end_time, + seqno, recovery_end_seqno); + } + } + + st_netfd_t replica; + const int &seqno; + int rid; + st_multichannel<long long> &recover_signals; + bool caught_up; + st_channel<long long> ⊂ + long long start_time, recovery_start_time, recovery_end_time; + int start_seqno, recovery_start_seqno, recovery_end_seqno, last_seqno; +}; + +void +handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, + st_multichannel<long long> &recover_signals, bool caught_up) +{ + tpcc_response_handler h(replica, seqno, rid, recover_signals, caught_up); + h.run(); +} + /** * Help the recovering node. * @@ -1458,11 +1752,11 @@ st_bool accept_joiner; int seqno = 0; st_channel<replica_info> newreps; - const function<void()> f = + foreach (const replica_info &r, replicas) newreps.push(r); + const function<void()> f = do_tpcc ? + bind(issue_tpcc, ref(newreps), ref(seqno), ref(accept_joiner)) : bind(issue_txns<Types>, ref(newreps), ref(seqno), ref(accept_joiner)); - st_thread_t issue_txns_thread = my_spawn(f, "issue_txns"); - foreach (const replica_info &r, replicas) newreps.push(r); - st_joining join_issue_txns(issue_txns_thread); + st_joining join_issue_txns(my_spawn(f, "issue_txns")); finally fin(lambda () { cout << "LEADER SUMMARY" << endl; @@ -1485,9 +1779,14 @@ st_thread_group handlers; int rid = 0; foreach (replica_info r, replicas) { - handlers.insert(my_spawn(bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, - ref(recover_signals), true), - "handle_responses")); + function<void()> fn; + if (do_tpcc) + fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + else + fn = bind(handle_responses<RTypes>, r.fd(), ref(seqno), rid++, + ref(recover_signals), true); + handlers.insert(my_spawn(fn, "handle_responses")); } // Accept the recovering node, and tell it about the online replicas. @@ -1560,6 +1859,33 @@ }); st_channel<recovery_t> send_states; + if (do_tpcc) { + TPCCTables *tables = new TPCCTables(); + g_tables.reset(tables); + SystemClock* clock = new SystemClock(); + + // Create a generator for filling the database. + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(cLoad); + + // Generate the data + cout << "loading " << nwarehouses << " warehouses" << endl; + char now[Clock::DATETIME_SIZE+1]; + clock->getDateTimestamp(now); + TPCCGenerator generator(random, now, Item::NUM_ITEMS, + District::NUM_PER_WAREHOUSE, + Customer::NUM_PER_DISTRICT, + NewOrder::INITIAL_NUM_PER_DISTRICT); + long long start_time = current_time_millis(); + generator.makeItemsTable(tables); + for (int i = 0; i < nwarehouses; ++i) { + generator.makeWarehouse(tables, i+1); + } + cout << "loaded " << nwarehouses << " warehouses in " + << current_time_millis() - start_time << " ms" << endl; + } + cout << "starting as replica on port " << listen_port << endl; // Listen for connections from other replicas. @@ -1608,8 +1934,12 @@ // Process txns. st_channel<chunk> backlog; - const function<void()> process_fn = - bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), + function<void()> process_fn; + if (do_tpcc) + process_fn = bind(process_tpccs, leader, ref(seqno), ref(send_states), + ref(backlog), init.txnseqno(), mypos, init.node_size()); + else + process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), ref(send_states), ref(backlog), init.txnseqno(), mypos, init.node_size()); st_joining join_proc(my_spawn(process_fn, "process_txns")); @@ -1775,7 +2105,7 @@ reader.reset_range(chunk.get<1>(), chunk.get<2>()); while (reader.start() < reader.end()) { char *start = reader.start(); - uint32_t prefix = reader.read<uint32_t>(); + uint32_t prefix = ntohl(reader.read<uint32_t>()); ASSERT(prefix < 10000); ASSERT(start + sizeof(uint32_t) + prefix <= reader.end()); batch.Clear(); @@ -1883,9 +2213,200 @@ } } +class st_tpcc : public TPCCDB +{ + private: + commons::array<char> a_; + writer writer_; + TpccReq req_; + int seqno_; + + public: + st_tpcc(const vector<st_netfd_t> &fds) : + a_(buf_size), + writer_(lambda(const void *buf, size_t len) { + foreach (st_netfd_t dst, __ref(fds)) + st_timed_write(dst, buf, len); + }, a_, buf_size) {} + + void flush() { writer_.mark_and_flush(); } + void set_seqno(int seqno) { seqno_ = seqno; } + + void sendRec() { + req_.Clear(); + req_.set_seqno(-2); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + void sendStop() { + req_.Clear(); + req_.set_seqno(-1); + ser(writer_, req_); + writer_.mark_and_flush(); + } + + // Executes the TPC-C "slev" transaction. From the last 20 orders, returns the number of rows in + // the STOCK table that have S_QUANTITY < threshold. See TPC-C 2.8 (page 43). + int stockLevel(int32_t warehouse_id, int32_t district_id, int32_t threshold) { + warehouse_id = district_id = threshold = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + OrderStatusOutput* output) { + warehouse_id = district_id = customer_id = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C order status transaction. Find the customer's last order and check the + // delivery date of each item on the order. See TPC-C 2.6 (page 36). + void orderStatus(int32_t warehouse_id, int32_t district_id, const char* c_last, + OrderStatusOutput* output) { + warehouse_id = district_id = 0; c_last = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C new order transaction. Enter the new order for customer_id into the + // database. See TPC-C 2.4 (page 27). Returns true if the transaction commits. + bool newOrder(int32_t warehouse_id, int32_t district_id, int32_t customer_id, + const vector<NewOrderItem>& items, const char* now, + NewOrderOutput* output) { + req_.Clear(); + req_.set_seqno(seqno_); + + NewOrderMsg &new_order = *req_.mutable_new_order(); + new_order.set_warehouse_id(warehouse_id); + new_order.set_district_id(district_id); + new_order.set_customer_id(customer_id); + foreach (const NewOrderItem &item, items) { + NewOrderItemMsg &msg = *new_order.add_item(); + msg.set_i_id(item.i_id); + msg.set_ol_supply_w_id(item.ol_supply_w_id); + msg.set_ol_quantity(item.ol_quantity); + } + new_order.set_now(now); + + ser(writer_, req_); + output = nullptr; + return true; + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, int32_t customer_id, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_district_id = c_warehouse_id = customer_id = 0; + h_amount = 0; now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C payment transaction. Add h_amount to the customer's account. + // See TPC-C 2.5 (page 32). + void payment(int32_t warehouse_id, int32_t district_id, int32_t c_warehouse_id, + int32_t c_district_id, const char* c_last, float h_amount, const char* now, + PaymentOutput* output) { + warehouse_id = district_id = c_warehouse_id = c_district_id = 0; + h_amount = 0; c_last = now = 0; output = 0; + throw_not_implemented(); + } + + // Executes the TPC-C delivery transaction. Delivers the oldest undelivered transaction in each + // district in warehouse_id. See TPC-C 2.7 (page 39). + void delivery(int32_t warehouse_id, int32_t carrier_id, const char* now, + vector<DeliveryOrderInfo>* orders) { + warehouse_id = carrier_id = 0; now = 0; orders = 0; + throw_not_implemented(); + } +}; + void -run_tpcc() +issue_tpcc(st_channel<replica_info> &newreps, int &seqno, + st_bool &accept_joiner) { + vector<st_netfd_t> fds; + long long start_time = current_time_millis(); + + finally f(lambda () { + showtput("issued", current_time_millis(), __ref(start_time), __ref(seqno), + 0); + }); + + SystemClock* clock = new SystemClock(); + + // Change the constants for run + RealRandomGenerator* random = new RealRandomGenerator(); + NURandC cLoad = NURandC::makeRandom(random); + random->setC(NURandC::makeRandomForRun(random, cLoad)); + + // Client owns all the parameters + st_tpcc &tables = *new st_tpcc(fds); + TPCCClient client(clock, random, &tables, Item::NUM_ITEMS, nwarehouses, + District::NUM_PER_WAREHOUSE, Customer::NUM_PER_DISTRICT); + while (!stop_hub) { + // Did we get a new member? If so, notify an arbitrary member (the first + // one) to prepare to send recovery information (by sending an + // empty/default Txn). + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { + tables.sendRec(); + } + + // Bring in any new members. + // TODO more efficient: copy/extend/append + while (!newreps.empty()) { + fds.push_back(newreps.take().fd()); + } + + tables.set_seqno(seqno); + client.doOne(); + + // Checkpoint. + if (check_interval(seqno, yield_interval)) st_sleep(0); + if (check_interval(seqno, issue_display)) { + cout << "issued txn " << seqno << endl; + if (timelim > 0 && current_time_millis() - start_time > timelim) { + cout << "time's up; issued " << seqno << " txns in " << timelim + << " ms" << endl; + stop_hub.set(); + } + } + + // For debugging purposes. + if (issuing_interval > 0) { + st_sleep(issuing_interval); + tables.flush(); + } + + // Are we to accept a new joiner? + if (seqno == accept_joiner_seqno) { + accept_joiner.set(); + } + + // Set the stopping seqno. + if (seqno == stop_on_seqno) { + cout << "stopping on issue of seqno " << seqno << endl; + stop_hub.set(); + } + + ++seqno; + + // Pause? + if (do_pause) + do_pause.waitreset(); + } + + if (!fds.empty()) { + tables.sendStop(); + } +} + +void +run_tpcc_demo() +{ TPCCTables* tables = new TPCCTables(); SystemClock* clock = new SystemClock(); @@ -1996,6 +2517,8 @@ "exit after the joiner fully recovers (for leader)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader)") + ("tpcc", po::bool_switch(&do_tpcc), + "run the TPCC workload") ("exit-on-seqno,X",po::value<int>(&stop_on_seqno)->default_value(-1), "exit after txn seqno is issued (for leader)") ("accept-joiner-size,s", Modified: ydb/trunk/src/tpcc/tpccclient.cc =================================================================== --- ydb/trunk/src/tpcc/tpccclient.cc 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/tpcc/tpccclient.cc 2009-03-16 22:32:07 UTC (rev 1294) @@ -144,6 +144,8 @@ // This is not strictly accurate: The requirement is for certain *minimum* percentages to be // maintained. This is close to the right thing, but not precisely correct. // See TPC-C 5.2.4 (page 68). + doNewOrder(); +#if 0 int x = generator_->number(1, 100); if (x <= 4) { // 4% doStockLevel(); @@ -157,6 +159,7 @@ ASSERT(x > 100-45); doNewOrder(); } +#endif } int32_t TPCCClient::generateWarehouse() { Modified: ydb/trunk/src/ydb.proto =================================================================== --- ydb/trunk/src/ydb.proto 2009-03-13 19:26:56 UTC (rev 1293) +++ ydb/trunk/src/ydb.proto 2009-03-16 22:32:07 UTC (rev 1294) @@ -85,3 +85,63 @@ message ResponseBatch { repeated Response res = 1; } + +// +// TPCC messages +// + +message NewOrderItemMsg { + required int32 i_id = 1; + required int32 ol_supply_w_id = 2; + required int32 ol_quantity = 3; +} + +message NewOrderMsg { + required int32 warehouse_id = 1; + required int32 district_id = 2; + required int32 customer_id = 3; + repeated NewOrderItemMsg item = 4; + required string now = 5; +} + +message ItemInfoMsg { + required int32 s_quantity = 1; + required float i_price = 2; + // TODO: Client can compute this from other values. + required float ol_amount = 3; + // char + required int32 brand_generic = 4; + required string i_name = 5; +} + +message NewOrderOutputMsg { + required float w_tax = 1; + required float d_tax = 2; + + // From district d_next_o_id + required int32 o_id = 3; + + required float c_discount = 4; + + // TODO: Client can compute this from other values. + required float total = 5; + + repeated ItemInfoMsg item = 6; + required string c_last = 7; + required string c_credit = 8; + + //static const int MAX_STATUS = 25; + //static const char INVALID_ITEM_STATUS[]; + //char status[MAX_STATUS+1]; + required string status = 9; +} + +message TpccReq { + required int32 seqno = 1; + optional NewOrderMsg new_order = 2; +} + +message TpccRes { + required int32 seqno = 1; + optional NewOrderOutputMsg new_order = 2; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |