[Assorted-commits] SF.net SVN: assorted:[1412] ydb/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-05-13 17:30:22
|
Revision: 1412 http://assorted.svn.sourceforge.net/assorted/?rev=1412&view=rev Author: yangzhang Date: 2009-05-13 17:30:11 +0000 (Wed, 13 May 2009) Log Message: ----------- added aries WALing Modified Paths: -------------- ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/tpcc.clamp.lzz Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/main.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -1,6 +1,7 @@ #hdr #include "unsetprefs.h" #include <boost/tuple/tuple.hpp> +#include <commons/closing.h> #include <commons/st/intr.h> #include <commons/st/sync.h> #include <commons/st/channel.h> @@ -28,7 +29,7 @@ #include <commons/st/io.h> #include <commons/st/sockets.h> #include <iostream> -#include <unistd.h> // pipe, write, sync +#include <unistd.h> // pipe, write, fdatasync #include "tpcc/tpcctables.h" #include "msg.h" #include "setprefs.h" @@ -202,14 +203,15 @@ // TODO? class txn_wal { public: - txn_wal(const string &fname) : of(fname.c_str()) {} + txn_wal(const string &fname) : fd_(checknnegerr(creat(fname.c_str(), 0644))) {} void logbuf(const ser_t &s) { logbuf(s.data(), s.size()); } void logbuf(const void *buf, size_t len) { - of.write(reinterpret_cast<const char*>(buf), len); + checkeqnneg(write(fd_, reinterpret_cast<const char*>(buf), len), + ssize_t(len)); } - void flush() { of.flush(); } + void sync() { fdatasync(fd_); } private: - ofstream of; + closingfd fd_; }; // Globals @@ -257,8 +259,7 @@ cout << "got joiner's connection, sending log from seqnos " << r.start_seqno << " to " << r.end_seqno << endl; - g_twal->flush(); - sync(); + g_twal->sync(); ifstream inf("twal"); long long start_time = current_time_millis(); for (int seqno = 0; seqno < r.start_seqno; ++seqno) { Modified: ydb/trunk/src/rectpcc.clamp.lzz =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -64,8 +64,7 @@ int &seqno = __ref(seqno); cout << "recovering from twal" << endl; long long start_time = current_time_millis(); - g_twal->flush(); - sync(); + g_twal->sync(); ifstream inf("twal"); TpccReq req; while (inf.peek() != ifstream::traits_type::eof()) { Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -112,7 +112,7 @@ } // Initialize physical or txn log. - scoped_ptr<txn_wal> twal(new txn_wal(use_twal ? "twal" : "/dev/null")); + scoped_ptr<txn_wal> twal(new txn_wal(do_wal ? "twal" : "/dev/null")); g_twal = twal.get(); scoped_ptr<wal> pwal(new wal(use_pwal ? "pwal" : "/dev/null")); g_wal = pwal.get(); Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 16:55:10 UTC (rev 1411) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-13 17:30:11 UTC (rev 1412) @@ -536,12 +536,8 @@ commons::array<char> wbuf(buf_size); anchored_stream_reader reader(st_read_fn(leader), st_read_fully_fn(leader), overflow_fn, rbuf.get(), rbuf.size()); - // TODO - //commit_log cwal; writer w(lambda(const void *buf, size_t len) { - //if (do_wal) { - //__ref(cwal).flush(); - //} + if (do_wal) __ref(wal).sync(); st_write(__ref(leader), buf, len); }, wbuf.get(), wbuf.size()); @@ -613,7 +609,7 @@ assert(!depleting); - if (use_twal) wal.logbuf(marker, reader.start() - marker); + if (do_wal) wal.logbuf(marker, reader.start() - marker); // Backlog (auto/implicit) or process. if (!caught_up) { @@ -636,7 +632,7 @@ ser(w, res); reader.set_anchor(); - // Snapsphot. + // Snapshot. if (disk && check_interval(req.seqno(), snapshot_interval)) snapshot(seqno); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |