[Assorted-commits] SF.net SVN: assorted:[1413] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-05-13 19:20:26
|
Revision: 1413 http://assorted.svn.sourceforge.net/assorted/?rev=1413&view=rev Author: yangzhang Date: 2009-05-13 19:20:13 +0000 (Wed, 13 May 2009) Log Message: ----------- added asynchronous (background) wal (somewhat cheats since no time shifting; actually want to overlap wal of current txns with execing of next txns) Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/main.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -38,6 +38,7 @@ typedef tuple<sized_array<char>, char*, char*> chunk; typedef commons::array<char> recovery_t; +typedef commons::array<char> wal_chunk; // Configuration. Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -31,6 +31,11 @@ thread(bind(snapshot_writer)); } + if (do_wal) { + cout << "starting the WAL writer" << endl; + thread(bind(wal_writer)); + } + // Initialize database state. int seqno = -1; mii &map = g_map; Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-13 19:20:13 UTC (rev 1413) @@ -1033,4 +1033,29 @@ return a; } +#if 0 +void compare_summaries(const commons::array<char> &a, const commons::array<char> &b) +{ + raw_reader r(a); + tpcc_recovery_header hdr; + r.read(hdr); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // of << "%(name)s: " << hdr.n%(name)s << endl; + // for (size_t i = 0; i < hdr.n%(name)s; ++i) { + // of << r.read<size_t>() << ' '; + // of << r.read<int>() << endl; + // } + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] +} + +TPCCTables::restore_partial(sizeof hdr) const +{ + restore_partial +} +#endif + // vim:ft=cpp Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 19:20:13 UTC (rev 1413) @@ -498,7 +498,7 @@ int first_seqno_in_chunk = -1; TpccReq req; TpccRes res; - txn_wal &wal = *g_twal; + //txn_wal &wal = *g_twal; function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { @@ -536,6 +536,7 @@ commons::array<char> wbuf(buf_size); anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); + realwal wal; writer w(lambda(const void *buf, size_t len) { if (do_wal) __ref(wal).sync(); st_write(__ref(leader), buf, len); @@ -706,9 +707,44 @@ namespace { concurrent_queue<recovery_t> snapshots; atomic<bool> snapshot_writer_busy; + +concurrent_queue<wal_chunk> wal_chunks; +atomic<bool> wal_writer_busy; + +class realwal { + public: + realwal() : data_(read_buf_size), head_(data_) {} + void logbuf(void *data, size_t len) { + assert(head_ + len < data_.end()); + memcpy(head_, data, len); + head_ += len; + } + void sync() { + wal_chunk tmp_(buf_size); + swap(data_, tmp_); + wal_chunks.push_cond(move(tmp_), 0); + head_ = data_; + } + private: + wal_chunk data_; + char *head_; +}; } void +wal_writer() +{ + cout << "wal writer starting" << endl; + closingfd fd(checknnegerr(creat("wal", 0644))); + while (true) { + wal_chunk chunk = wal_chunks.take(); + if (chunk.get() == nullptr) break; + checkeqnneg(write(fd, chunk, chunk.size()), ssize_t(chunk.size())); + fdatasync(fd); + } +} + +void snapshot_writer() { cout << "snapshot writer starting" << endl; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |