[Assorted-commits] SF.net SVN: assorted:[1282] ydb/trunk/src/main.lzz.clamp
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-03-12 02:13:38
|
Revision: 1282 http://assorted.svn.sourceforge.net/assorted/?rev=1282&view=rev Author: yangzhang Date: 2009-03-12 02:13:21 +0000 (Thu, 12 Mar 2009) Log Message: ----------- - added pwal recovery (--rec-pwal) - added show_sockaddr for more useful debug outputs in timed writes - changed behavior of logging; by default does not clobber the log file, but writes instead to /dev/null - using timed writes in process_txns - added showdatarate - fixed recovery_start_seqno/recovery_end_seqno setting in response_handler Modified Paths: -------------- ydb/trunk/src/main.lzz.clamp Modified: ydb/trunk/src/main.lzz.clamp =================================================================== --- ydb/trunk/src/main.lzz.clamp 2009-03-12 02:08:04 UTC (rev 1281) +++ ydb/trunk/src/main.lzz.clamp 2009-03-12 02:13:21 UTC (rev 1282) @@ -86,7 +86,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, profile_threads, debug_threads, multirecover, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, + use_pb, use_pb_res, g_caught_up, rec_pwal, suppress_txn_msgs, fake_bcast, force_ser, fake_exec; long long timelim, read_thresh, write_thresh; @@ -219,12 +219,32 @@ return t; } +char * +show_sockaddr(st_netfd_t fd) +{ + sockaddr_in sa; + socklen_t salen = sizeof sa; + check0x(getpeername(st_netfd_fileno(fd), + reinterpret_cast<sockaddr*>(&sa), + &salen)); + return inet_ntoa(sa.sin_addr); +} + +map<st_netfd_t, string> nfdnames; + +inline const string& +nfd2name(st_netfd_t fd) +{ + return nfdnames[fd]; +} + /** * Used by the leader to bookkeep information about replicas. */ class replica_info { public: + /** port is the replica's listen port, not the port bound to the fd socket. */ replica_info(st_netfd_t fd, uint16_t port) : fd_(fd), port_(port) {} st_netfd_t fd() const { return fd_; } /** The port on which the replica is listening. */ @@ -404,7 +424,8 @@ long long write_time = current_time_millis() - before_write; if (write_time > write_thresh) { cout << "thread " << threadname() << " write of " << len - << " bytes took " << write_time << " ms" << endl; + << " bytes to dst " << show_sockaddr(dst) << " blocked for " + << write_time << " ms" << endl; } } } @@ -536,13 +557,15 @@ check(msg.ParseFromArray(src.read(len), len)); } +enum { op_del, op_write, op_commit }; + /** * ARIES write-ahead log. No undo logging necessary (no steal). */ class wal { public: - wal() : of("wal"), out(of) {} + wal(const string &fname) : of(fname.c_str()), out(of) {} template <typename T> void log(const T &msg) { ser(of, msg); } void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } @@ -562,7 +585,6 @@ out & op; } private: - enum { op_del, op_write, op_commit }; ofstream of; binary_oarchive out; }; @@ -600,8 +622,7 @@ else fn = lambda(const void *buf, size_t len) { foreach (st_netfd_t dst, __ref(fds)) - checkeqnneg(st_write(dst, buf, len, ST_UTIME_NO_TIMEOUT), - static_cast<ssize_t>(len)); + st_timed_write(dst, buf, len); }; char *real_wbuf = newreps.empty() ? rbuf.get() : wbuf.get(); @@ -622,7 +643,8 @@ // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). - if (!newreps.empty() && seqno > 0) { + // XXX rec_pwal + if (!newreps.empty() && seqno > 0 && !rec_pwal) { start_txn(batch); fin_txn(batch); // TODO: verify that this made the catch-up stream more efficient, @@ -796,7 +818,21 @@ } void -showtput(const string &action, long long stop_time, long long start_time, +showdatarate(const char *action, streamoff len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showdatarate(const char *action, size_t len, long long time) +{ + cout << action << " of " << len << " bytes in " << time << " ms (" + << len / time / 1000 << " MB/s)" << endl; +} + +void +showtput(const char *action, long long stop_time, long long start_time, int stop_count, int start_count) { long long time_diff = stop_time - start_time; @@ -918,7 +954,7 @@ char *start = reader.start(); // Will overflow on next few reads ("header")? - if (reader.unread() + reader.rem() < headerlen) { + if (!caught_up && reader.unread() + reader.rem() < headerlen) { sized_array<char> buf(new char[read_buf_size], read_buf_size); memcpy(buf.get(), reader.start(), reader.unread()); swap(buf, reader.buf()); @@ -1108,11 +1144,14 @@ << hdr.count << " slots (" << bodylen << " bytes); range is [" << begin << ".." << end << "]; seqno is " << hdr.seqno << endl; + long long start_time = current_time_millis(); commons::array<char> recovery(sizeof(size_t) + sizeof hdr + bodylen); raw_writer ser(recovery.begin()); ser.write(recovery.size()); ser.write(hdr); memcpy(ser.ptr(), src.begin() + begin, bodylen); + showdatarate("serialized recovery", recovery.size(), + current_time_millis() - start_time); return recovery; } @@ -1206,12 +1245,12 @@ cout << rseqno << last_seqno << res.result_size() << " " << r << " " << res.result(r) << endl; } if (!caught_up && rcaught_up) { - long long now = current_time_millis(), timediff = now - start_time; + long long now = current_time_millis(), time_diff = now - start_time; caught_up = true; recover_signals.push(now); cout << rid << ": "; cout << "recovering node caught up; took " - << timediff << " ms" << endl; + << time_diff << " ms" << endl; // This will cause the program to exit eventually, but cleanly, such that // the recovery time will be set first, before the eventual exit (which // may not even happen in the current iteration). @@ -1244,13 +1283,13 @@ // joiner. if (recovery_start_time == -1 && !sub.empty()) { recovery_start_time = sub.take(); - recovery_start_seqno = seqno; + recovery_start_seqno = last_seqno; cout << rid << ": "; showtput("before recovery, finished", recovery_start_time, start_time, recovery_start_seqno, 0); } else if (recovery_end_time == -1 && !sub.empty()) { recovery_end_time = sub.take(); - recovery_end_seqno = seqno; + recovery_end_seqno = last_seqno; cout << rid << ": "; showtput("during recovery, finished roughly", recovery_end_time, recovery_start_time, recovery_end_seqno, recovery_start_seqno); @@ -1332,7 +1371,7 @@ #if 0 sendmsg(joiner, *recovery); #endif - cout << "sent recovery in " << diff << " ms" << endl; + showdatarate("sent recovery", recovery.size(), diff); } void @@ -1354,8 +1393,8 @@ cout << "starting as leader" << endl; st_multichannel<long long> recover_signals; - scoped_ptr<wal> pwal(new wal); - g_wal = pwal.get(); + scoped_ptr<wal> twal(new wal(use_twal ? "twal" : "/dev/null")); + g_wal = twal.get(); // Wait until all replicas have joined. st_netfd_t listener = st_tcp_listen(leader_port); @@ -1539,8 +1578,11 @@ } } + // Initialize physical log. + scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); + g_wal = pwal.get(); + // Process txns. - // XXX st_channel<shared_ptr<pb::Txn> > backlog; st_channel<chunk> backlog; const function<void()> process_fn = bind(process_txns<Types, RTypes>, leader, ref(map), ref(seqno), @@ -1554,103 +1596,140 @@ try { // If there's anything to recover. if (init.txnseqno() > 0) { - cout << "waiting for recovery message" << (multirecover ? "s" : "") - << endl; - long long before_recv = current_time_millis(); + if (rec_pwal) { + // Recover from physical log. + cout << "recovering from pwal" << endl; + long long start_time = current_time_millis(); + ifstream inf("pwal"); + binary_iarchive in(inf); + int rseqno = -1; + while (inf.peek() != ifstream::traits_type::eof()) { + int op; + in & op; + switch (op) { + case op_del: + { + int key; + in & key; + mii::iterator it = map.find(key); + map.erase(it); + break; + } + case op_write: + { + int key, val; + in & key & val; + map[key] = val; + break; + } + case op_commit: + ++rseqno; + break; + } + if (check_interval(rseqno, yield_interval)) st_sleep(0); + } + seqno = init.txnseqno() - 1; + showdatarate("recovered from pwal", inf.tellg(), current_time_millis() - start_time); + cout << "now at seqno " << rseqno << " (really: " << seqno << ")" << endl; + } else { + cout << "waiting for recovery message" << (multirecover ? "s" : "") + << endl; + long long before_recv = current_time_millis(); - vector<st_thread_t> recovery_builders; - assert(seqno == -1); - bool first = true; - for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { - recovery_builders.push_back(my_spawn(lambda() { - // Read the recovery message length and header. - size_t len; - recovery_header hdr; - char buf[sizeof len + sizeof hdr]; - //try { - checkeqnneg(st_read_fully(__ref(replicas[i]), - buf, sizeof len + sizeof hdr, - ST_UTIME_NO_TIMEOUT), - ssize_t(sizeof len + sizeof hdr)); - //} catch (...) { // TODO just catch "Connection reset by peer" - //return; - //} - raw_reader rdr(buf); - rdr.read(len); - rdr.read(hdr); - check(hdr.seqno >= 0); + vector<st_thread_t> recovery_builders; + assert(seqno == -1); + bool first = true; + for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { + recovery_builders.push_back(my_spawn(lambda() { + // Read the recovery message length and header. + size_t len; + recovery_header hdr; + char buf[sizeof len + sizeof hdr]; + //try { + checkeqnneg(st_read_fully(__ref(replicas[i]), + buf, sizeof len + sizeof hdr, + ST_UTIME_NO_TIMEOUT), + ssize_t(sizeof len + sizeof hdr)); + //} catch (...) { // TODO just catch "Connection reset by peer" + //return; + //} + raw_reader rdr(buf); + rdr.read(len); + rdr.read(hdr); + check(hdr.seqno >= 0); - // Resize the table if necessary. - commons::array<entry> &table = __ref(map).get_table(); - if (!__ref(first)) { - checkeq(table.size(), hdr.total); - checkeq(__ref(map).size(), hdr.size); - } else { - __ref(map).set_size(hdr.size); - if (table.size() != hdr.total) { - table.reset(new entry[hdr.total], hdr.total); + // Resize the table if necessary. + commons::array<entry> &table = __ref(map).get_table(); + if (!__ref(first)) { + checkeq(table.size(), hdr.total); + checkeq(__ref(map).size(), hdr.size); + } else { + __ref(map).set_size(hdr.size); + if (table.size() != hdr.total) { + table.reset(new entry[hdr.total], hdr.total); + } } - } - // Receive straight into the table. - pair<size_t, size_t> range = - recovery_range(table.size(), __ctx(i), __ref(init).node_size()); - // Check that we agree on the number of entries. - checkeq(range.second - range.first, hdr.count); - // Check that the count is a power of two. - checkeq(hdr.count & (hdr.count - 1), size_t(0)); - size_t rangelen = sizeof(entry) * hdr.count; - // Read an extra char to ensure that we're at the EOF. - checkeqnneg(st_read_fully(__ref(replicas[i]), - table.begin() + range.first, rangelen + 1, - ST_UTIME_NO_TIMEOUT), - ssize_t(rangelen)); + // Receive straight into the table. + pair<size_t, size_t> range = + recovery_range(table.size(), __ctx(i), __ref(init).node_size()); + // Check that we agree on the number of entries. + checkeq(range.second - range.first, hdr.count); + // Check that the count is a power of two. + checkeq(hdr.count & (hdr.count - 1), size_t(0)); + size_t rangelen = sizeof(entry) * hdr.count; + // Read an extra char to ensure that we're at the EOF. + long long start_time = current_time_millis(); + checkeqnneg(st_read_fully(__ref(replicas[i]), + table.begin() + range.first, rangelen + 1, + ST_UTIME_NO_TIMEOUT), + ssize_t(rangelen)); + long long end_time = current_time_millis(); - long long tm = current_time_millis(); - if (__ref(seqno) != -1) - checkeq(__ref(seqno), hdr.seqno); - __ref(seqno) = hdr.seqno; - cout << "got recovery message of " << len << " bytes (" - << hdr.size << " records in " << hdr.count << " slots) in " - << tm - __ref(before_recv) << " ms; now at seqno " - << hdr.seqno << endl; + if (__ref(seqno) != -1) + checkeq(__ref(seqno), hdr.seqno); + __ref(seqno) = hdr.seqno; + showdatarate("got recovery message", len, end_time - start_time); + cout << "receive took " << end_time - __ref(before_recv) + << " ms total; now at seqno " << hdr.seqno << endl; #if 0 - Recovery recovery; - long long receive_start = 0, receive_end = 0; - size_t len = 0; - { - st_intr intr(stop_hub); - len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, - &receive_end); - } - long long build_start = current_time_millis(); - cout << "got recovery message of " << len << " bytes in " - << build_start - __ref(before_recv) << " ms: xfer took " - << receive_end - receive_start << " ms, deserialization took " - << build_start - receive_end << " ms" << endl; - for (int i = 0; i < recovery.pair_size(); ++i) { - const Recovery_Pair &p = recovery.pair(i); - __ref(map)[p.key()] = p.value(); - if (i % yield_interval == 0) { - if (yield_during_build_up) st_sleep(0); + Recovery recovery; + long long receive_start = 0, receive_end = 0; + size_t len = 0; + { + st_intr intr(stop_hub); + len = readmsg(__ref(replicas)[__ctx(i)], recovery, &receive_start, + &receive_end); } - } - check(recovery.seqno() >= 0); - int seqno = __ref(seqno) = recovery.seqno(); - long long build_end = current_time_millis(); - cout << "receive and build-up took " - << build_end - __ref(before_recv) - << " ms; built up map of " << recovery.pair_size() - << " records in " << build_end - build_start - << " ms; now at seqno " << seqno << endl; + long long build_start = current_time_millis(); + cout << "got recovery message of " << len << " bytes in " + << build_start - __ref(before_recv) << " ms: xfer took " + << receive_end - receive_start << " ms, deserialization took " + << build_start - receive_end << " ms" << endl; + for (int i = 0; i < recovery.pair_size(); ++i) { + const Recovery_Pair &p = recovery.pair(i); + __ref(map)[p.key()] = p.value(); + if (i % yield_interval == 0) { + if (yield_during_build_up) st_sleep(0); + } + } + check(recovery.seqno() >= 0); + int seqno = __ref(seqno) = recovery.seqno(); + long long build_end = current_time_millis(); + cout << "receive and build-up took " + << build_end - __ref(before_recv) + << " ms; built up map of " << recovery.pair_size() + << " records in " << build_end - build_start + << " ms; now at seqno " << seqno << endl; #endif - }, "recovery_builder" + lexical_cast<string>(i))); + }, "recovery_builder" + lexical_cast<string>(i))); + } + foreach (st_thread_t t, recovery_builders) { + st_join(t); + } } - foreach (st_thread_t t, recovery_builders) { - st_join(t); - } + long long mid_time = current_time_millis(); - int mid_seqno = seqno; // XXX using msg::TxnBatch; @@ -1678,6 +1757,7 @@ batch.Clear(); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); + if (rec_pwal) seqno = txn.seqno() - 1; process_txn<rb_traits, rb_traits>(map, txn, seqno, nullptr); if (fake_exec && !Types::is_pb()) { reader.skip(txn.op_size() * Op_Size); @@ -1812,6 +1892,8 @@ "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), "recover from multiple hosts, instead of just one (specified via leader)") + ("rec-pwal", po::bool_switch(&rec_pwal), + "recover from pwal") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") ("dump,D", po::bool_switch(&dump), @@ -2026,9 +2108,3 @@ return 1; } } - -/* - * Compile-time options: - * - * - map, unordered_map, dense_hash_map - */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |