[Assorted-commits] SF.net SVN: assorted:[1419] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-05-15 07:01:23
|
Revision: 1419 http://assorted.svn.sourceforge.net/assorted/?rev=1419&view=rev Author: yangzhang Date: 2009-05-15 07:01:14 +0000 (Fri, 15 May 2009) Log Message: ----------- - updated rec_snapshot to use special disk-based reading - snapshots now only update whatever is necessary on disk (whatever changed) Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/main.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -38,7 +38,6 @@ typedef tuple<sized_array<char>, char*, char*> chunk; typedef commons::array<char> recovery_t; -typedef commons::array<char> wal_chunk; // Configuration. Modified: ydb/trunk/src/rectpcc.clamp.lzz =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -23,6 +23,14 @@ void rec_snapshot(int &seqno) { + long long before_read = current_time_millis(); + commons::array<char> arr = g_tables->read(snapshot_path); + size_t len = arr.size(); + char *rawbuf = arr.release(); + long long after_read = current_time_millis(); + showdatarate("read from disk", len, after_read - before_read); + +#if 0 // Prepare buffer. closingfd fd(checknnegerr(open(snapshot_path.c_str(), O_RDONLY))); size_t fsz = file_size(fd); @@ -31,15 +39,17 @@ // Read. long long before_read = current_time_millis(); - checkeqnneg(read(fd, rawbuf, fsz), static_cast<ssize_t>(fsz)); + checkeqnneg(read(fd, rawbuf, fsz), ssize_t(fsz)); long long after_read = current_time_millis(); showdatarate("read from disk", fsz, after_read - before_read); // Sanity. tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(rawbuf); checkeq(hdr.len, fsz); +#endif // Deserialize. + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(rawbuf); long long before_deser = current_time_millis(); array<char> buf(rawbuf, hdr.len); g_tables->deser(0, 1, hdr, buf); Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-15 06:58:54 UTC (rev 1418) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-15 07:01:14 UTC (rev 1419) @@ -31,6 +31,8 @@ using namespace commons; using namespace std; +typedef commons::array<char> summary_t; + int snapshot_interval, pct_read_only; string snapshot_path; bool do_rec_snapshot, do_wal; @@ -481,7 +483,15 @@ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { + // + // Snapshot prep. + // + snapshot_writer_busy.set(false); + tpcc_recovery_header *tmphdr = new tpcc_recovery_header; + bzero(tmphdr, sizeof *tmphdr); + last_summary.reset(reinterpret_cast<char*>(tmphdr), sizeof *tmphdr); + bool caught_up = init_seqno == 0; // Means we're currently ignoring the incoming txns until we see a fail-ack // from the leader. @@ -554,6 +564,7 @@ } __ref(send_states).push(recovery_t()); snapshots.push(recovery_t()); + __ref(wal).sync(); __ref(w).mark_and_flush(); }); @@ -696,18 +707,22 @@ cout << "serializing snapshot, db state is now at seqno " << seqno << ":" << endl; g_tables->show(); - recovery_t recovery = g_tables->ser(0, 1, seqno); - showdatarate("serialized snapshot", recovery.size(), - current_time_millis() - start_time); + recovery_t recovery = g_tables->ser_partial(last_summary, 0, 1, seqno); + last_summary = g_tables->summarize(); + size_t len = reinterpret_cast<tpcc_recovery_header*>(recovery.get())->len; + showdatarate("serialized snapshot", len, current_time_millis() - start_time); snapshots.push(move(recovery)); snapshot_writer_busy.set(true); } } namespace { +summary_t last_summary; + concurrent_queue<recovery_t> snapshots; atomic<bool> snapshot_writer_busy; +typedef pair<commons::array<char>, size_t> wal_chunk; concurrent_queue<wal_chunk> wal_chunks; atomic<bool> wal_writer_busy; @@ -720,13 +735,13 @@ head_ += len; } void sync() { - wal_chunk tmp_(buf_size); + commons::array<char> tmp_(buf_size); swap(data_, tmp_); - wal_chunks.push_cond(move(tmp_), 0); + wal_chunks.push_cond(make_pair(move(tmp_), head_ - data_), 0); head_ = data_; } private: - wal_chunk data_; + commons::array<char> data_; char *head_; }; } @@ -738,8 +753,8 @@ closingfd fd(checknnegerr(creat("wal", 0644))); while (true) { wal_chunk chunk = wal_chunks.take(); - if (chunk.get() == nullptr) break; - checkeqnneg(write(fd, chunk, chunk.size()), ssize_t(chunk.size())); + if (chunk.first.get() == nullptr) break; + checkeqnneg(write(fd, chunk.first, chunk.second), ssize_t(chunk.second)); fdatasync(fd); } } @@ -748,18 +763,18 @@ snapshot_writer() { cout << "snapshot writer starting" << endl; + g_tables->remove(snapshot_path); while (true) { recovery_t rec = snapshots.take(); cout << "took one" << endl; if (rec.get() == nullptr) break; long long start_time = current_time_millis(); - { - ofstream of((snapshot_path + ".tmp").c_str()); - of.write(rec.get(), rec.size()); - } + size_t len = reinterpret_cast<tpcc_recovery_header*>(rec.get())->len; + g_tables->write(snapshot_path, rec); +#if 0 check0x(rename((snapshot_path + ".tmp").c_str(), snapshot_path.c_str())); - showdatarate("wrote snapshot", rec.size(), - current_time_millis() - start_time); +#endif + showdatarate("wrote snapshot", len, current_time_millis() - start_time); snapshot_writer_busy.set(false); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |