assorted-commits Mailing List for Assorted projects (Page 22)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2009-05-07 05:36:21
|
Revision: 1377 http://assorted.svn.sourceforge.net/assorted/?rev=1377&view=rev Author: yangzhang Date: 2009-05-07 05:36:15 +0000 (Thu, 07 May 2009) Log Message: ----------- - tweaked warnings Modified Paths: -------------- shell-tools/trunk/src/bash-commons/bashrc.bash Modified: shell-tools/trunk/src/bash-commons/bashrc.bash =================================================================== --- shell-tools/trunk/src/bash-commons/bashrc.bash 2009-05-07 05:35:47 UTC (rev 1376) +++ shell-tools/trunk/src/bash-commons/bashrc.bash 2009-05-07 05:36:15 UTC (rev 1377) @@ -652,7 +652,7 @@ -Wpointer-arith -Wcast-qual -Wcast-align -Wwrite-strings \ -Winit-self -Wno-unused-parameter \ -Wparentheses -Wmissing-format-attribute -Wfloat-equal \ - -Winline "$@" + "$@" } zccw() { @@ -669,14 +669,13 @@ -Werror \ -Wextra \ -Wstrict-null-sentinel \ - -Wno-old-style-cast \ + -Wold-style-cast \ -Woverloaded-virtual \ -Wsign-promo \ -Wformat=2 \ -Winit-self \ -Wswitch-enum \ -Wunused \ - -Wstrict-overflow \ -Wfloat-equal \ -Wundef \ -Wunsafe-loop-optimizations \ @@ -692,7 +691,6 @@ -Wmissing-format-attribute \ -Wpacked \ -Wredundant-decls \ - -Winline \ -Winvalid-pch \ -Wlong-long \ -Wvolatile-register-var \ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 05:35:48
|
Revision: 1376 http://assorted.svn.sourceforge.net/assorted/?rev=1376&view=rev Author: yangzhang Date: 2009-05-07 05:35:47 +0000 (Thu, 07 May 2009) Log Message: ----------- - fixed patterns Modified Paths: -------------- shell-tools/trunk/src/gcc-config.bash Modified: shell-tools/trunk/src/gcc-config.bash =================================================================== --- shell-tools/trunk/src/gcc-config.bash 2009-05-07 05:34:05 UTC (rev 1375) +++ shell-tools/trunk/src/gcc-config.bash 2009-05-07 05:35:47 UTC (rev 1376) @@ -2,12 +2,14 @@ # Specify an argument to just print the value of that argument, e.g. "march". # Specify no argument to print the whole cc1 line. -# Specify CXXFLAGS (eg empty string) to control flags. +# Specify CFLAGS (eg empty string) to control flags. +set -o errexit -o nounset + echo | -gcc -c ${CXXFLAGS--march=native} -v -o /dev/null -x c - 2>&1 | -fgrep -- cc1 | +gcc -c ${CFLAGS--march=native} -v -o /dev/null -x c - 2>&1 | +fgrep -- '/cc1 ' | # Don't just search for 'cc1'; tmp filename may have that. if (( $# > 0 )) -then sed 's/.* -'"$1"'=\([^ ]*\) .*/\1/' +then sed 's/.* -'"$1"'=\([^ ]\+\).*/\1/' else cat fi This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 05:34:06
|
Revision: 1375 http://assorted.svn.sourceforge.net/assorted/?rev=1375&view=rev Author: yangzhang Date: 2009-05-07 05:34:05 +0000 (Thu, 07 May 2009) Log Message: ----------- - added usage, --copy, --remove - handle arg error cases - fixed a few bugs Modified Paths: -------------- shell-tools/trunk/src/clobber-if-diff.py Modified: shell-tools/trunk/src/clobber-if-diff.py =================================================================== --- shell-tools/trunk/src/clobber-if-diff.py 2009-05-07 03:01:26 UTC (rev 1374) +++ shell-tools/trunk/src/clobber-if-diff.py 2009-05-07 05:34:05 UTC (rev 1375) @@ -4,43 +4,68 @@ # TODO: use SpooledTemporaryFile, new in Python 2.6 from __future__ import with_statement -import sys, optparse, subprocess, shutil, commons.startup, os.path +import sys, optparse, subprocess, shutil, commons.startup, os, os.path -def foo(inpath, outpath, opts): +def handle_input_file(inpath, outpath, opts): + '''This is called when the inpath is a normal existing file.''' if os.path.isfile(outpath): if 0 == subprocess.call(['cmp', '-s', inpath, outpath]): return opts.exitcode - shutil.move(inpath, outpath) + if opts.remove: os.remove(outpath) + if opts.copy: shutil.copy(inpath, outpath) + else: shutil.move(inpath, outpath) return 0 def main(argv): - parser = optparse.OptionParser() + + # + # Command-line options. + # + + parser = optparse.OptionParser( + usage = '%prog [OPTIONS] [INPATH] OUTPATH') + parser.add_option('-b', '--buffer', type = 'int', default = 1<<20, help = 'maximum buffer size in bytes, if reading from stdin') + parser.add_option('-c', '--copy', action = 'store_true', + help = "if reading a file, leave orig in place (don't move)") parser.add_option('-t', '--tempfile', type = 'string', default = None, - help = '''path to temporary file where the candidate is to be written, if - reading from stdin; if unspecified, then generate tmp file name''') + help = '''path to temporary file where the candidate is to be written, + if reading from stdin; if unspecified, then generate tmp file name''') parser.add_option('-x', '--exit-code', dest = 'exitcode', type = 'int', default = 0, help = 'exit status if clobbering did not happen') - #parser.add_option('-d', '--debug', action = 'store_true', - #help = 'debug output') - opts, [cmd, inpath, outpath] = parser.parse_args(argv) + parser.add_option('-r', '--remove', action = 'store_true', + help = 'if reading stdin, rm the old file when clobbering') - if os.path.isfile(inpath): - return foo(inpath, outpath, opts) + opts, args = parser.parse_args(argv) + + if len(args) == 3: [cmd, inpath, outpath] = args + elif len(args) == 2: [cmd, outpath] = args; inpath = '-' + else: parser.error('bad number of args; specify OUTPATH or INPATH OUTPATH') + + # + # Perform the operation. + # + + if inpath != '-': + return handle_input_file(inpath, outpath, opts) else: new = sys.stdin.read(opts.buffer) if len(new) == opts.buffer: - f = tempfile.TemporaryFile() if opts.tempfile is None else file(opts.tempfile, 'w') + f = ( tempfile.TemporaryFile() if opts.tempfile is None else + file(opts.tempfile, 'w') ) with f: f.write(new) while len(buf) == opts.buffer: buf = sys.stdin.read(opts.buffer) f.write(buf) - return foo(f.name if opts.tempfile is None else opts.tempfile, outpath, opts) + return handle_input_file(f.name if opts.tempfile is None else + opts.tempfile, outpath, opts) else: - with file(outpath) as f: old = f.read(len(new) + 1) - if old == new: return opts.exitcode + if os.path.isfile(outpath): + with file(outpath) as f: old = f.read(len(new) + 1) + if old == new: return opts.exitcode + if opts.remove: os.remove(outpath) with file(outpath, 'w') as f: f.write(new) commons.startup.run_main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 03:01:39
|
Revision: 1374 http://assorted.svn.sourceforge.net/assorted/?rev=1374&view=rev Author: yangzhang Date: 2009-05-07 03:01:26 +0000 (Thu, 07 May 2009) Log Message: ----------- updated requirements, added notes Modified Paths: -------------- ydb/trunk/README Modified: ydb/trunk/README =================================================================== --- ydb/trunk/README 2009-05-07 02:43:19 UTC (rev 1373) +++ ydb/trunk/README 2009-05-07 03:01:26 UTC (rev 1374) @@ -37,14 +37,14 @@ - [State Threads] 1.8 [boost]: http://www.boost.org/ -[C++ Commons]: http://assorted.sourceforge.net/cpp-commons/ +[C++ Commons]: http://assorted.sf.net/cpp-commons/ [clamp]: http://assorted.sf.net/clamp/ [GCC]: http://gcc.gnu.org/ [google-sparsehash]: http://code.google.com/p/google-sparsehash/ [googletest]: http://code.google.com/p/googletest/ [Lazy C++]: http://www.lazycplusplus.com/ [Protocol Buffers]: http://code.google.com/p/protobuf/ -[State Threads]: http://state-threads.sourceforge.net/ +[State Threads]: http://state-threads.sf.net/ Requirements for tools (building, deployment, analysis): @@ -813,9 +813,99 @@ - should we not be looking at txn based methods at all? - phys logging requires physical ops to be carried out in the same order at all replicas? +- how do other replicated/partitioned dbms's handle (1) recovery and (2) txn + processing? -Period 3/26/09-3/ +Period 4/14/09-4/16/09 +- scaling: 1-3 30K +- netrec: + - ser 274ms (467MB/s) + - xfer 1.8s (73MB/s) + - deser 319ms (402MB/s) + - catch 1.9s (66Ktps) + +Meeting 5/5/09 + +- heap shows that distribution of updates is large, even as we reduce the page + sizes down to very small sizees (16K) + + 131072 + + $ /tmp/ho /tmp/sum-30346-1241641476 /tmp/sum-30346-1241641480 |fgrep % + 157 diff out of 157 and 157 aka 1.0 % + 1 diff out of 1 and 1 aka 1.0 % + 70 diff out of 216 and 338 aka 0.324074074074 % + 1 diff out of 1 and 2 aka 1.0 % + 1 diff out of 1 and 1 aka 1.0 % + 1 diff out of 19 and 30 aka 0.0526315789474 % + 14 diff out of 14 and 21 aka 1.0 % + 251 diff out of 251 and 251 aka 1.0 % + total: + + $ /tmp/ho /tmp/sum-30346-1241641483 /tmp/sum-30346-1241641486 |fgrep % + 157 diff out of 157 and 157 aka 1.0 % + 1 diff out of 1 and 1 aka 1.0 % + 77 diff out of 388 and 461 aka 0.198453608247 % + 2 diff out of 2 and 2 aka 1.0 % + 1 diff out of 1 and 1 aka 1.0 % + 1 diff out of 34 and 40 aka 0.0294117647059 % + 5 diff out of 24 and 29 aka 0.208333333333 % + 251 diff out of 251 and 251 aka 1.0 % + + total: 991 1518 65.2832674572% + + 16384 + + $ /tmp/ho /tmp/sum-31127-124164249[34] |fgrep % + 1241 diff out of 1250 and 1250 aka 99.28 % + 1 diff out of 1 and 1 aka 100.0 % + 173 diff out of 1528 and 1730 aka 11.3219895288 % + 8 diff out of 8 and 8 aka 100.0 % + 1 diff out of 1 and 1 aka 100.0 % + 1 diff out of 135 and 151 aka 0.740740740741 % + 21 diff out of 94 and 106 aka 22.3404255319 % + 1924 diff out of 2041 and 2041 aka 94.2675159236 % + + $ /tmp/ho /tmp/sum-31127-124164249[59] |fgrep % + 1242 diff out of 1250 and 1250 aka 99.36 % + 1 diff out of 1 and 1 aka 100.0 % + 497 diff out of 1926 and 2716 aka 25.8047767394 % + 9 diff out of 9 and 10 aka 100.0 % + 1 diff out of 1 and 1 aka 100.0 % + 1 diff out of 168 and 235 aka 0.595238095238 % + 31 diff out of 118 and 166 aka 26.2711864407 % + 2022 diff out of 2041 and 2041 aka 99.0690837825 % + + total: 7174 10572 67.8584941355% + + 4096 + + $ /tmp/ho /tmp/sum-31356-124164271? |fgrep % + 3949 diff out of 6000 and 6000 aka 65.8166666667 % + 1 diff out of 1 and 1 aka 100.0 % + 661 diff out of 6166 and 6981 aka 10.7200778463 % + 22 diff out of 30 and 32 aka 73.3333333333 % + 1 diff out of 1 and 1 aka 100.0 % + 1 diff out of 544 and 611 aka 0.183823529412 % + 51 diff out of 375 and 425 aka 13.6 % + 5780 diff out of 8334 and 8334 aka 69.3544516439 % + + $ /tmp/ho /tmp/sum-31356-124164272[59] |fgrep % + 4498 diff out of 6000 and 6000 aka 74.9666666667 % + 1 diff out of 1 and 1 aka 100.0 % + 2370 diff out of 12575 and 14948 aka 18.8469184891 % + 40 diff out of 40 and 44 aka 100.0 % + 1 diff out of 1 and 1 aka 100.0 % + 1 diff out of 1084 and 1290 aka 0.0922509225092 % + 149 diff out of 766 and 911 aka 19.4516971279 % + 7083 diff out of 8334 and 8334 aka 84.9892008639 % + + total: 24609 50252 48.9711852265% + + +Future + - TODO faster disk logging using separate threads - TODO show aries-write This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 02:43:21
|
Revision: 1373 http://assorted.svn.sourceforge.net/assorted/?rev=1373&view=rev Author: yangzhang Date: 2009-05-07 02:43:19 +0000 (Thu, 07 May 2009) Log Message: ----------- - silenced the build - updated line-counts Modified Paths: -------------- ydb/trunk/tools/test.bash Modified: ydb/trunk/tools/test.bash =================================================================== --- ydb/trunk/tools/test.bash 2009-05-07 02:36:29 UTC (rev 1372) +++ ydb/trunk/tools/test.bash 2009-05-07 02:43:19 UTC (rev 1373) @@ -228,8 +228,8 @@ ./setup.bash -d -p ~/.local/pkg/cpp-commons refresh-local cd ~/ydb/src - make clean - PPROF=1 OPT=1 make -j16 ydb WTF= DISTCC= + make -s clean + PPROF=1 OPT=1 make -sj16 ydb WTF= DISTCC= } node-setup-cog() { @@ -326,13 +326,13 @@ setup-ydb() { parssh mkdir -p ydb/ ccom/ rm -rf /tmp/{ydb,ccom}-export/ - svn export ~/work/assorted/ydb/trunk/ /tmp/ydb-export/ - svn export ~/work/assorted/cpp-commons/trunk/ /tmp/ccom-export/ + svn export -q ~/work/assorted/ydb/trunk/ /tmp/ydb-export/ + svn export -q ~/work/assorted/cpp-commons/trunk/ /tmp/ccom-export/ parscp -r /tmp/ydb-export/* ^:ydb/ parscp -r /tmp/ccom-export/* ^:ccom/ local head="${hosts%% *}" tail="${hosts#* }" remote $head node-setup-ydb - scp $head:ydb/src/ydb /tmp/ + scp -q $head:ydb/src/ydb /tmp/ hosts="$tail" parscp /tmp/ydb ^:ydb/src/ } @@ -494,8 +494,9 @@ # ydb scalability test. scaling-helper() { local leader=$1 + : ${stopseqno:=100000} shift - tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X 100000 --tpcc ${extraargs:-}" & + tagssh $leader "CPUPROFILE=ydb.prof ydb/src/ydb -q -l -n $# -X $stopseqno --tpcc ${extraargs:-}" & sleep .1 for rep in "$@" do tagssh $rep "CPUPROFILE=ydb.prof ydb/src/ydb -q -n $# -H $leader --tpcc ${extraargs:-}" & @@ -635,7 +636,7 @@ } line-counts() { - wc -l "$(dirname "$0")/../src/"{main.lzz.clamp,ser.{h,cc},p2.cc,ydb.proto,Makefile,serperf.cc,tpcc/{Makefile,*.{h,cc,cog}}} \ + wc -l "$(dirname "$0")/../src/"{*.clamp.lzz,{p2,ser,serperf}.cc,ydb.proto,Makefile,tpcc/{Makefile,*.{h,cc,cog}}} \ ~/ccom/src/{commons/{,st/}*.h,test/{*.*,Makefile}} } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 02:36:38
|
Revision: 1372 http://assorted.svn.sourceforge.net/assorted/?rev=1372&view=rev Author: yangzhang Date: 2009-05-07 02:36:29 +0000 (Thu, 07 May 2009) Log Message: ----------- - removed nfd2name - added snapshot_writer thread (self-throttling) - added --rec-snap, --snap-path, --snapshot-int - using fixed seed - using versioned_heaps in tpcctables, updating serialization and adding summarization - added st.valgrind - added rec_snapshot for recovering from a snapshot - updated network recovery to use versioned_heap - using replica_info more pervasively - restructured control flow in process_tpccs - using clobber-if-diff in Makefile - including tpcc/*.d in Makefile Modified Paths: -------------- ydb/trunk/src/Makefile ydb/trunk/src/leader.clamp.lzz ydb/trunk/src/main.clamp.lzz ydb/trunk/src/rectpcc.clamp.lzz ydb/trunk/src/replica.clamp.lzz ydb/trunk/src/stxn.clamp.lzz ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc/tpcctables.h ydb/trunk/src/tpcc.clamp.lzz ydb/trunk/src/util.clamp.lzz ydb/trunk/src/ydb.clamp.lzz Added Paths: ----------- ydb/trunk/src/st.valgrind Modified: ydb/trunk/src/Makefile =================================================================== --- ydb/trunk/src/Makefile 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/Makefile 2009-05-07 02:36:29 UTC (rev 1372) @@ -129,13 +129,14 @@ %.pb.o: %.pb.cc %.pb.h %.cc: %.cc.cog - cog.py $< > $@ + cog.py $< | clobber-if-diff -r $@ + chmod -w $@ %.pb.h %.pb.cc: %.proto protoc --cpp_out=. $< %.clamp: %.clamp.lzz - cp $< $@ + clobber-if-diff -rc $< $@ chmod -w $@ %.cc.clamp %.hh.clamp: %.clamp @@ -158,9 +159,10 @@ pch.h: svn ls -rHEAD -R $(SVNURL) | \ - egrep -v '/$$|Makefile' | \ - xargs sed 's/.*\binclude\b *<\(.*\)>.*/\#include <\1>/; t succ; d; :succ /commons/ d' | \ - sort -u > $@ + egrep -v '/$$|Makefile' | \ + xargs sed 's/.*\binclude\b *<\(.*\)>.*/\#include <\1>/; t succ; d; :succ /commons/ d' | \ + clobber-if-diff -r $@ + chmod -w $@ %.h.gch: CXXFLAGS = $(CXXFLAGS0) %.h.gch: %.h @@ -204,4 +206,4 @@ serperf: ydb.pb.o ser: ydb.pb.o --include *.d +-include *.d tpcc/*.d Modified: ydb/trunk/src/leader.clamp.lzz =================================================================== --- ydb/trunk/src/leader.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/leader.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -64,8 +64,7 @@ // Start dispatching queries. st_bool accept_joiner; int seqno = 0; - st_channel<replica_info> newreps; - st_channel<st_netfd_t> delreps; + st_channel<replica_info> newreps, delreps; foreach (const replica_info &r, replicas) newreps.push(r); function<void()> f; if (do_tpcc) @@ -83,8 +82,8 @@ foreach (replica_info r, replicas) { function<void()> fn; if (do_tpcc) - fn = bind(handle_tpcc_responses, r.fd(), ref(seqno), rid++, - ref(recover_signals), ref(delreps), true); + fn = bind(handle_tpcc_responses, r, ref(seqno), rid++, + ref(recover_signals), ref(newreps), ref(delreps), true); else fn = bind(handle_responses, r.fd(), ref(seqno), rid++, ref(recover_signals), true); @@ -119,8 +118,8 @@ function<void()> handle_responses_joiner_fn; if (do_tpcc) handle_responses_joiner_fn = - bind(handle_tpcc_responses, joiner, ref(seqno), rid++, - ref(recover_signals), ref(delreps), false); + bind(handle_tpcc_responses, replica_info(joiner, 0), ref(seqno), + rid++, ref(recover_signals), ref(newreps), ref(delreps), false); else handle_responses_joiner_fn = bind(handle_responses, joiner, ref(seqno), rid++, Modified: ydb/trunk/src/main.clamp.lzz =================================================================== --- ydb/trunk/src/main.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/main.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -49,7 +49,7 @@ bool yield_during_build_up, yield_during_catch_up, dump, show_updates, count_updates, stop_on_recovery, general_txns, disk, debug_memory, use_pwal, use_twal, - use_pb, use_pb_res, g_caught_up, rec_pwal, rec_twal, do_tpcc, + use_pb, use_pb_res, g_caught_up, do_rec_pwal, do_rec_twal, do_tpcc, suppress_txn_msgs, force_ser, fake_exec, ship_log; long long timelim, read_thresh; @@ -238,10 +238,10 @@ recover_joiner(st_netfd_t listener, st_channel<recovery_t> &send_states) { - cout << "waiting for joiner" << endl; recovery_t recovery; st_netfd_t joiner; if (ship_log) { + cout << "waiting for joiner" << endl; { st_intr intr(stop_hub); joiner = checkerr(st_accept(listener, nullptr, nullptr, @@ -283,12 +283,12 @@ } else { { st_intr intr(stop_hub); - // Wait for the snapshot. + cout << "waiting for the snapshot" << endl; recovery = send_states.take(); if (recovery == nullptr) { return; } - // Wait for the new joiner. + cout << "waiting for the joiner" << endl; joiner = checkerr(st_accept(listener, nullptr, nullptr, ST_UTIME_NO_TIMEOUT)); } @@ -302,12 +302,3 @@ showdatarate("sent recovery", recovery.size(), diff); } } - -void -threadfunc() -{ - while (true) { - sleep(3); - cout << "AAAAAAAAAAAAAAAAAAAAAA" << endl; - } -} Modified: ydb/trunk/src/rectpcc.clamp.lzz =================================================================== --- ydb/trunk/src/rectpcc.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/rectpcc.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -6,23 +6,61 @@ #src #include "unsetprefs.h" +#include <commons/files.h> +#include <commons/memory.h> #include <commons/time.h> #include <commons/st/io.h> #include <commons/st/threads.h> #include <commons/st/reader.h> +#include <sys/mman.h> #include "tpcc/tpcctables.h" #include "ydb.pb.h" #include "setprefs.h" #end +const size_t pgsz = 4096; + void +rec_snapshot(int &seqno) +{ + // Prepare buffer. + closingfd fd(checknnegerr(open(snapshot_path.c_str(), O_RDONLY))); + size_t fsz = file_size(fd); + char *rawbuf; + check0x(posix_memalign(reinterpret_cast<void**>(&rawbuf), pgsz, fsz)); + + // Read. + long long before_read = current_time_millis(); + checkeqnneg(read(fd, rawbuf, fsz), static_cast<ssize_t>(fsz)); + long long after_read = current_time_millis(); + showdatarate("read from disk", fsz, after_read - before_read); + + // Sanity. + tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(rawbuf); + checkeq(hdr.len, fsz); + + // Deserialize. + long long before_deser = current_time_millis(); + array<char> buf(rawbuf, hdr.len); + g_tables->deser(0, 1, hdr, buf); + buf.release(); + long long after_deser = current_time_millis(); + showdatarate("deserialized snapshot", + size_t(hdr.len), after_deser - before_deser); + + // Final steps. + seqno = hdr.seqno; + cout << "after deserialize, db state is now at seqno " + << hdr.seqno << ":" << endl; + g_tables->show(); +} + +void rec_tpcc(int &seqno, int mypos, const Init &init, const vector<st_netfd_t> &replicas, recovery_t &orig, st_channel<chunk> &backlog) { - commons::array<char> recarr(0); - - function<void()> rec_twal_fn = lambda() { + function<void()> rec_twal = lambda() { int &seqno = __ref(seqno); cout << "recovering from twal" << endl; long long start_time = current_time_millis(); @@ -41,7 +79,7 @@ cout << "now at seqno " << seqno << endl; }; - function<void()> recv_log_fn = lambda() { + function<void()> recv_log = lambda() { st_netfd_t src = __ref(replicas[0]); int &seqno = __ref(seqno); ASSERT(fail_seqno == seqno); @@ -64,40 +102,46 @@ } }; - if (rec_twal) { + if (fail_seqno > 0) { failed.waitset(); g_tables.reset(new TPCCTables); + } + + if (do_rec_snapshot) { + rec_snapshot(seqno); + } else if (do_rec_twal) { tpcc_recovery_header &hdr = *reinterpret_cast<tpcc_recovery_header*>(orig.begin()); commons::array<char> body(orig.begin() + sizeof(tpcc_recovery_header), orig.size() - sizeof(tpcc_recovery_header)); g_tables->deser(mypos, init.node_size(), hdr, body); body.release(); - rec_twal_fn(); + rec_twal(); + } + + if (fail_seqno > 0) { failed.reset(); - recv_log_fn(); } #if 0 - st_thread_t rec_twal_thread = my_spawn(rec_twal_fn, "rec_twal"); - st_thread_t recv_log_thread = my_spawn(recv_log_fn, "recv_log"); + st_thread_t rec_twal_thread = my_spawn(rec_twal, "rec_twal"); + st_thread_t recv_log_thread = my_spawn(recv_log, "recv_log"); st_join(rec_twal_thread); st_join(recv_log_thread); #endif - if (rec_pwal) { + if (do_rec_pwal) { // Recover from phy log. - } else if (rec_twal) { + } else if (do_rec_twal) { // Recover from txn log. } else { - g_tables.reset(new TPCCTables); - // // Build-up // if (ship_log) { + recv_log(); } else { // XXX indent @@ -106,7 +150,8 @@ long long before_recv = current_time_millis(); vector<st_thread_t> recovery_builders; - ASSERT(seqno == -1); + ASSERT(do_rec_snapshot || seqno == -1); + bool first = true; for (int i = 0; i < (multirecover ? init.node_size() : 1); ++i) { recovery_builders.push_back(my_spawn(lambda() { // Read the recovery message length and header. @@ -120,21 +165,28 @@ cout << "receiving recovery of " << hdr.len << " bytes" << endl; long long start_time = current_time_millis(); - __ref(recarr).reset(new char[hdr.len], hdr.len); + void *rawbuf; + check0x(posix_memalign(&rawbuf, pgsz, hdr.len)); + commons::array<char> recarr(reinterpret_cast<char*>(rawbuf), hdr.len); + memput(recarr, hdr); checkeqnneg(st_read_fully(__ref(replicas[i]), - __ref(recarr).get(), hdr.len, + recarr + sizeof hdr, + hdr.len - sizeof hdr, ST_UTIME_NO_TIMEOUT), - ssize_t(hdr.len)); + ssize_t(hdr.len - sizeof hdr)); long long before_deser = current_time_millis(); showdatarate("received recovery message", size_t(hdr.len), before_deser - start_time); - if (__ref(seqno) == -1) + if (__ref(first)) { __ref(seqno) = hdr.seqno; - else + __ref(first) = false; + } else { checkeq(__ref(seqno), hdr.seqno); + } - g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, __ref(recarr)); + g_tables->deser(__ctx(i), __ref(init).node_size(), hdr, recarr); + recarr.release(); long long end_time = current_time_millis(); showdatarate("deserialized recovery message", size_t(hdr.len), end_time - before_deser); Modified: ydb/trunk/src/replica.clamp.lzz =================================================================== --- ydb/trunk/src/replica.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/replica.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -8,6 +8,7 @@ #include <boost/archive/binary_iarchive.hpp> #include <commons/st/sockets.h> #include <commons/st/threads.h> +#include <boost/thread.hpp> #include "tpcc/clock.h" #include "tpcc/randomgenerator.h" #include "tpcc/tpccclient.h" @@ -26,15 +27,14 @@ run_replica(std::string leader_host, uint16_t leader_port, uint16_t listen_port) { if (disk) { - // Disk IO threads. - for (int i = 0; i < 5; ++i) { - //thread somethread(threadfunc); - } + cout << "starting the snapshot writer" << endl; + thread(bind(snapshot_writer)); } // Initialize database state. int seqno = -1; mii &map = g_map; + if (do_tpcc) { TPCCTables *tables = new TPCCTables(); g_tables.reset(tables); @@ -43,6 +43,7 @@ // Create a generator for filling the database. RealRandomGenerator* random = new RealRandomGenerator(); NURandC cLoad = NURandC::makeRandom(random); + random->seed(0); random->setC(cLoad); // Generate the data @@ -61,8 +62,9 @@ cout << "loaded " << nwarehouses << " warehouses in " << current_time_millis() - start_time << " ms" << endl; tables->show(); + g_tables->ser(0, 0, seqno); } - recovery_t orig = rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); + recovery_t orig = do_rec_twal ? g_tables->ser(0, 0, seqno) : recovery_t(); finally f(bind(summarize, "REPLICA", ref(seqno))); st_channel<recovery_t> send_states; @@ -102,7 +104,7 @@ INET_ADDRSTRLEN)) << ':' << sa.port() << (is_self ? " (self)" : "") << endl; if (is_self) mypos = i; - if (!is_self && (init.txnseqno() > 0 || rec_twal)) { + if (!is_self && (init.txnseqno() > 0 || fail_seqno > 0)) { replicas.push_back(st_tcp_connect(host, static_cast<uint16_t>(sa.port()), timeout)); @@ -144,7 +146,7 @@ // Simple txns // - if (rec_pwal) { + if (do_rec_pwal) { // Recover from physical log. cout << "recovering from pwal" << endl; long long start_time = current_time_millis(); @@ -315,7 +317,7 @@ batch.Clear(); for (int t = 0; t < batch.txn_size(); ++t) { const Txn &txn = batch.txn(t); - if (rec_pwal) seqno = txn.seqno() - 1; + if (do_rec_pwal) seqno = txn.seqno() - 1; process_txn(map, txn, seqno); if (fake_exec && !use_pb) { reader.skip(txn.op_size() * Op_Size); Added: ydb/trunk/src/st.valgrind =================================================================== --- ydb/trunk/src/st.valgrind (rev 0) +++ ydb/trunk/src/st.valgrind 2009-05-07 02:36:29 UTC (rev 1372) @@ -0,0 +1,48 @@ +## Pth specific (basically everything :() +{ + libpth-Addr4-internal + Memcheck:Addr4 + obj:/opt/gcc3/lib/libpth.so.20.0.20 +} +{ + libpth-Value4-internal + Memcheck:Value4 + obj:/opt/gcc3/lib/libpth.so.20.0.20 +} +{ + libpth-Cond-internal + Memcheck:Cond + obj:/opt/gcc3/lib/libpth.so.20.0.20 +} + +## Pth system call related +{ + swapcontext-Value4 + Memcheck:Value4 + fun:swapcontext +} +{ + swapcontext-Addr4 + Memcheck:Addr4 + fun:swapcontext +} +{ + swapcontext-Cond + Memcheck:Cond + fun:swapcontext +} +{ + sigismember-Value4 + Memcheck:Value4 + fun:sigismember +} +{ + sigismember-Addr4 + Memcheck:Addr4 + fun:sigismember +} +{ + sigismember-Cond + Memcheck:Cond + fun:sigismember +} Modified: ydb/trunk/src/stxn.clamp.lzz =================================================================== --- ydb/trunk/src/stxn.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/stxn.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -138,8 +138,8 @@ // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). - // XXX rec_pwal - if (!newreps.empty() && seqno > 0 && !rec_pwal) { + // XXX do_rec_pwal + if (!newreps.empty() && seqno > 0 && !do_rec_pwal) { start_txn(batch); fin_txn(batch); // TODO: verify that this made the catch-up stream more efficient, Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-07 02:36:29 UTC (rev 1372) @@ -1,13 +1,39 @@ //[[[cog -// allfields = 'items warehouses stock districts customers orders orderlines neworders history'.split() -// treepairs = 'warehouses/Warehouse stock/Stock districts/District customers/Customer orders/Order orderlines/OrderLine'.split() -// allpairs = treepairs + ['neworders/NewOrder'] +// allfields = ''' +// items +// warehouses +// stock +// districts +// customers +// orders +// orderlines +// neworders +// history +// '''.split() +// treepairs = [ p.split('/') for p in ''' +// warehouses/Warehouse +// stock/Stock +// districts/District +// customers/Customer +// orders/Order +// orderlines/OrderLine +// '''.split() ] +// allpairs = treepairs + [ ('neworders', 'NewOrder') ] +// heappairs = allpairs + [ ('history', 'History') ] +// // def typedefs(): // for name in allfields: // cog.outl(r'typedef typeof(%s_) type_%s;' % (name, name)) //]]] //[[[end]]] +#ifdef DO_DUMP_SER +#define DO_DUMP +#endif +#ifdef DO_DUMP_SUMMARY +#define DO_DUMP +#endif + #include "tpcctables.h" #include <algorithm> @@ -15,15 +41,100 @@ #include <vector> #include <commons/assert.h> +#include <commons/check.h> #include <commons/memory.h> +#include <commons/versioned_heap.h> #include <iostream> #include <boost/foreach.hpp> +#ifdef DO_DUMP +#include <fstream> // XXX +#include <boost/lexical_cast.hpp> // XXX +#endif + #define foreach BOOST_FOREACH using std::vector; +// Start of heap utilities. + +template<typename T> +struct heaps { + static versioned_heap<T> heap; +}; +template<typename T> versioned_heap<T> heaps<T>::heap(4096); + +template<typename T> +versioned_heap<T> &heapof(const T &x) { + return heaps<T>::heap; +} + +template<typename T> +typename versioned_heap<T>::hdr &hdrof(const T &x) { + return heapof(x).hdrof(&x); +} + +template<typename T> +T *construct(const T &x) { + return &heaps<T>::heap.construct(x); +} + +template<typename T> +T *construct() { + return &heaps<T>::heap.construct(); +} + +template<typename T> +void destroy(T *x) { + heapof(*x).destroy(*x); +} + +template<typename T> +void update(T *x) { + heapof(*x).touch(*x); +} + +namespace { +template<typename T> +void dump(const T &map, const char *label, int seqno) { +#ifdef DO_DUMP_SER + ofstream of((string("/tmp/dmp-") + + lexical_cast<string>(getpid()) + "-" + + lexical_cast<string>(time(nullptr))).c_str()); + of << label << ' ' << seqno << endl; + foreach (const typename T::value_type &pair, map) { + of << pair.first << ' ' << pair.second << endl; + } +#endif +} + +void dump_summary(const array<char> &a) +{ +#ifdef DO_DUMP_SUMMARY + ofstream of((string("/tmp/sum-") + + lexical_cast<string>(getpid()) + "-" + + lexical_cast<string>(time(nullptr))).c_str()); + raw_reader r(a); + tpcc_recovery_header hdr; + r.read(hdr); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // of << "%(name)s: " << hdr.n%(name)s << endl; + // for (size_t i = 0; i < hdr.n%(name)s; ++i) { + // of << r.read<size_t>() << ' '; + // of << r.read<int>() << endl; + // } + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] +#endif +} +} + +// End of heap utilities. + bool CustomerByNameOrdering::operator()(const Customer* a, const Customer* b) { if (a->c_w_id < b->c_w_id) return true; if (a->c_w_id > b->c_w_id) return false; @@ -194,8 +305,7 @@ // Modify the order id to assign it d->d_next_o_id += 1; - //XXX pwal.write(d); - //XXX pwal.write(d->d_next_o_id); + update(d); Warehouse* w = findWarehouse(warehouse_id); output->w_tax = w->w_tax; @@ -234,6 +344,7 @@ } else { stock->s_quantity = stock->s_quantity - items[i].ol_quantity + 91; } + update(stock); output->items[i].s_quantity = stock->s_quantity; assert(sizeof(line.ol_dist_info) == sizeof(stock->s_dist[district_id])); memcpy(line.ol_dist_info, stock->s_dist[district_id], sizeof(line.ol_dist_info)); @@ -284,12 +395,15 @@ float h_amount, const char* now, PaymentOutput* output) { Warehouse* w = findWarehouse(warehouse_id); w->w_ytd += h_amount; + update(w); output->warehouse = *w; District* d = findDistrict(warehouse_id, district_id); d->d_ytd += h_amount; + update(w); output->district = *d; + update(c); c->c_balance -= h_amount; c->c_ytd_payment += h_amount; c->c_payment_cnt += 1; @@ -353,7 +467,7 @@ assert(neworder->no_d_id == d_id && neworder->no_w_id == warehouse_id); int32_t o_id = neworder->no_o_id; neworders_.erase(iterator); - if (!within(serbuf_, neworder)) delete neworder; + if (!within(serbuf_, neworder)) destroy(neworder); DeliveryOrderInfo order; order.d_id = d_id; @@ -363,6 +477,7 @@ Order* o = findOrder(warehouse_id, d_id, o_id); assert(o->o_carrier_id == Order::NULL_CARRIER_ID); o->o_carrier_id = carrier_id; + update(o); float total = 0; // TODO: Select based on (w_id, d_id, o_id) rather than using ol_number? @@ -370,6 +485,7 @@ OrderLine* line = findOrderLine(warehouse_id, d_id, o_id, i); assert(0 == strlen(line->ol_delivery_d)); strcpy(line->ol_delivery_d, now); + update(line); assert(strlen(line->ol_delivery_d) == DATETIME_SIZE); total += line->ol_amount; } @@ -377,13 +493,14 @@ Customer* c = findCustomer(warehouse_id, d_id, o->o_c_id); c->c_balance += total; c->c_delivery_cnt += 1; + update(c); } } template <typename T> static T* insert(BPlusTree<int32_t, T*, TPCCTables::KEYS_PER_INTERNAL, TPCCTables::KEYS_PER_LEAF>* tree, int32_t key, const T& item) { assert(!tree->find(key)); - T* copy = new T(item); + T* copy = construct(item); tree->insert(key, copy); return copy; } @@ -592,7 +709,7 @@ } void TPCCTables::insertNewOrder(int32_t w_id, int32_t d_id, int32_t o_id) { - NewOrder* neworder = new NewOrder(); + NewOrder* neworder = construct<NewOrder>(); neworder->no_w_id = w_id; neworder->no_d_id = d_id; neworder->no_o_id = o_id; @@ -609,7 +726,7 @@ } void TPCCTables::insertHistory(const History& history) { - History* h = new History(history); + History* h = construct(history); history_.push_back(h); } @@ -624,7 +741,7 @@ << endl; } -commons::array<char> TPCCTables::ser(int mypos, int nnodes, int seqno) const { +commons::array<char> TPCCTables::old_ser(int mypos, int nnodes, int seqno) const { mypos = nnodes; // TODO use using namespace std; @@ -639,8 +756,7 @@ // typedefs() // for name in allfields: // cog.outl(r'hdr.n%s = uint32_t(%s_.size());' % (name, name)) - // for pair in allpairs: - // name, struct = pair.split('/') + // for name, struct in allpairs: // cog.outl(r'hdr.len += uint32_t(hdr.n%s * (sizeof(type_%s::key_type) + sizeof(%s)));' % (name, name, struct)) //]]] //[[[end]]] @@ -661,8 +777,7 @@ } //[[[cog - // for pair in treepairs: - // name, struct = pair.split('/') + // for name, struct in treepairs: // cog.outl(r''' // { // size_t count = 0; @@ -693,7 +808,68 @@ return arr; } -void TPCCTables::deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, +size_t pgsz() { return heaps<Warehouse>::heap.pgsz(); } + +commons::array<char> TPCCTables::ser(int mypos, int nnodes, int seqno) const { + mypos = nnodes; // TODO use + using namespace std; + + // + // Serialize header. + // + + tpcc_recovery_header hdr; + bzero(&hdr, sizeof hdr); + hdr.seqno = seqno; + size_t metalen = 0, datalen = 0; + //[[[cog + // for name in allfields: + // cog.outl(r'hdr.n%s = uint32_t(%s_.size());' % (name, name)) + // for name, struct in heappairs: + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // metalen += heap_%(name)s.metasize(); + // datalen += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + hdr.headlen = uint32_t(whole_units(sizeof hdr + metalen, pgsz()) * pgsz()); + hdr.len = uint32_t(hdr.headlen + datalen + sizeof(Item) * hdr.nitems); + void *raw_arr; + check0x(posix_memalign(&raw_arr, pgsz(), hdr.len)); + + // + // Serialize metas and datas. + // + + commons::array<char> arr(reinterpret_cast<char*>(raw_arr), hdr.len); + raw_writer writer(arr), dwriter(arr + hdr.headlen); + writer.write(hdr); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // cout << "serializing %(name)s" << endl; + // heap_%(name)s.sermeta(writer.ptr()); + // writer.skip(heap_%(name)s.metasize()); + // serdata(heap_%(name)s, dwriter.ptr()); + // dwriter.skip(datasize(heap_%(name)s)); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + + cout << "serializing items" << endl; + foreach (const Item &item, items_) { + dwriter.write(item); + } + + assert(arr.end() == dwriter.ptr()); + + dump(neworders_, "ser", seqno); + + return arr; +} + +void TPCCTables::old_deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, const commons::array<char> &arr) { mypos = nnodes; // TODO use using namespace std; @@ -708,8 +884,7 @@ //[[[cog // typedefs() - // for pair in treepairs: - // name, struct = pair.split('/') + // for name, struct in treepairs: // # Generate customers_by_name_ // cbn = r'customers_by_name_.insert(reinterpret_cast<Customer*>(reader.ptr()));' if name == 'customers' else '' // obc = r'orders_by_customer_.insert(makeOrderByCustomerKey(val->o_w_id, val->o_d_id, val->o_c_id, val->o_id), reinterpret_cast<Order*>(reader.ptr()));' if name == 'orders' else '' @@ -741,4 +916,121 @@ serbuf_.reset(arr.get(), arr.size()); } +inline int32_t keyof(const Warehouse &w) +{ return w.w_id; } +inline int32_t keyof(const Stock &s) +{ return makeStockKey(s.s_w_id, s.s_i_id); } +inline int32_t keyof(const District &d) +{ return makeDistrictKey(d.d_w_id, d.d_id); } +inline int32_t keyof(const Customer &c) +{ return makeCustomerKey(c.c_w_id, c.c_d_id, c.c_id); } +inline int32_t keyof(const Order &o) +{ return makeOrderKey(o.o_w_id, o.o_d_id, o.o_id); } +inline int32_t keyof(const OrderLine &o) +{ return makeOrderLineKey(o.ol_w_id, o.ol_d_id, o.ol_o_id, o.ol_number); } +inline int64_t keyof(const NewOrder &n) +{ return makeNewOrderKey(n.no_w_id, n.no_d_id, n.no_o_id); } +inline int64_t obc_keyof(const Order &o) +{ return makeOrderByCustomerKey(o.o_w_id, o.o_d_id, o.o_c_id, o.o_id); } + +void TPCCTables::deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, + const commons::array<char> &arr) { + mypos = nnodes; // TODO use + using namespace std; + typedef typeof(warehouses_) tree_warehouses; + + // This needs to be cleared because it's just a simple log to which things + // are appended. + history_.clear(); + history_.reserve(hdr.nhistory); + // This need to be cleared because things are erased from the map. + neworders_.clear(); + // This needs to be cleared because the hash is on the pointer. + customers_by_name_.clear(); + + char *meta = arr + sizeof hdr, *data = arr + hdr.headlen; + + // XXX determine which objects are actually live + + //[[[cog + // typedefs() + // for name, struct in heappairs: + // cbn = ( r'customers_by_name_.insert(val);' + // if name == 'customers' else '' ) + // obc = ( r'orders_by_customer_.insert(obc_keyof(*val), val);' + // if name == 'orders' else '' ) + // if name == 'neworders': + // insertion = r'%(name)s_.insert(make_pair(keyof(*val), val));' + // elif name == 'history': + // insertion = r'%(name)s_.push_back(val);' + // else: + // insertion = r'%(name)s_.insert(keyof(*val), val);' + // + // cbn = cbn % {'name': name, 'struct': struct} + // obc = obc % {'name': name, 'struct': struct} + // insertion = insertion % {'name': name, 'struct': struct} + // + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // heap_%(name)s.deser(meta, data); + // + // for (versioned_heap<%(struct)s>::iterator iter_%(name)s = heap_%(name)s.begin(); + // iter_%(name)s.cur() != nullptr; iter_%(name)s.next()) { + // %(struct)s *val = iter_%(name)s.cur(); + // %(insertion)s + // %(cbn)s%(obc)s + // } + // + // meta += heap_%(name)s.metasize(); + // data += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct, 'cbn': cbn, 'obc': obc, + // 'insertion': insertion}) + //]]] + //[[[end]]] + + raw_reader reader(data); + // This needs to be cleared because it's just a simple vector of randomly + // generated items (the set is static). + items_.clear(); + items_.reserve(hdr.nitems); + for (uint32_t i = 0; i < hdr.nitems; ++i) { + items_.push_back(reader.read<Item>()); + } + + serbuf_.reset(arr.get(), arr.size()); + + dump(neworders_, "deser", hdr.seqno); +} + +commons::array<char> TPCCTables::summarize() const +{ + tpcc_recovery_header hdr; + bzero(&hdr, sizeof hdr); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // hdr.n%(name)s = uint32_t(heap_%(name)s.pages().size()); + // hdr.len += uint32_t(heap_%(name)s.pages().size()); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + + array<char> a(sizeof hdr + hdr.len * (sizeof(size_t) + sizeof(int))); + raw_writer w(a); + w.write(hdr); + + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // foreach (const char *page, heap_%(name)s.pages()) { + // w.write(heap_%(name)s.hdrof(page).index); + // w.write(heap_%(name)s.hdrof(page).version); + // }''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + + return a; +} + // vim:ft=cpp Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-05-07 02:36:29 UTC (rev 1372) @@ -18,6 +18,7 @@ struct tpcc_recovery_header { uint32_t len; + uint32_t headlen; // both this header and heap metadata and padding for alignment uint32_t nitems; uint32_t nwarehouses; uint32_t nstock; @@ -85,40 +86,44 @@ void insertHistory(const History& history); void show() const; + commons::array<char> old_ser(int mypos, int nnodes, int seqno) const; + void old_deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, + const commons::array<char> &arr); commons::array<char> ser(int mypos, int nnodes, int seqno) const; void deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, const commons::array<char> &arr); + commons::array<char> summarize() const; static const int KEYS_PER_INTERNAL = 8; static const int KEYS_PER_LEAF = 8; template <typename KeyType, typename ValueType> void deleteBTreeValues(BPlusTree<KeyType, ValueType*, KEYS_PER_INTERNAL, KEYS_PER_LEAF>* btree) { - KeyType key = std::numeric_limits<KeyType>::max(); - ValueType* value = NULL; - while (btree->findLastLessThan(key, &value, &key)) { - assert(value != NULL); - if (!commons::within(serbuf_, value)) delete value; - } + //KeyType key = std::numeric_limits<KeyType>::max(); + //ValueType* value = NULL; + //while (btree->findLastLessThan(key, &value, &key)) { + // assert(value != NULL); + // if (!commons::within(serbuf_, value)) delete *value; + //} } // Deletes all elements in STL container. template <typename T> void STLDeleteElements(T* container) { - const typename T::iterator end = container->end(); - for (typename T::iterator i = container->begin(); i != end; ++i) { - if (!commons::within(serbuf_, *i)) delete *i; - } - container->clear(); + //const typename T::iterator end = container->end(); + //for (typename T::iterator i = container->begin(); i != end; ++i) { + // if (!commons::within(serbuf_, *i)) delete *i; + //} + //container->clear(); }; template <typename T> void STLDeleteValues(T* container) { - const typename T::iterator end = container->end(); - for (typename T::iterator i = container->begin(); i != end; ++i) { - if (!commons::within(serbuf_, i->second)) delete i->second; - } - container->clear(); + //const typename T::iterator end = container->end(); + //for (typename T::iterator i = container->begin(); i != end; ++i) { + // if (!commons::within(serbuf_, i->second)) delete i->second; + //} + //container->clear(); }; Modified: ydb/trunk/src/tpcc.clamp.lzz =================================================================== --- ydb/trunk/src/tpcc.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/tpcc.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -10,7 +10,10 @@ #end #src #include "unsetprefs.h" +#include <commons/atomic.h> +#include <commons/files.h> #include <commons/memory.h> +#include <commons/squeue.h> #include <commons/st/io.h> #include <commons/time.h> #include <string> @@ -28,6 +31,10 @@ using namespace commons; using namespace std; +int snapshot_interval; +string snapshot_path; +bool do_rec_snapshot; + class OrderStatusOutput; class PaymentOutput; @@ -216,7 +223,7 @@ void issue_tpcc(st_channel<replica_info> &newreps, - st_channel<st_netfd_t> &delreps, + st_channel<replica_info> &delreps, int &seqno, st_bool &accept_joiner) { @@ -234,6 +241,7 @@ // Change the constants for run RealRandomGenerator* random = new RealRandomGenerator(); NURandC cLoad = NURandC::makeRandom(random); + random->seed(0); random->setC(NURandC::makeRandomForRun(random, cLoad)); // Client owns all the parameters @@ -244,8 +252,8 @@ // Did we get a new member? If so, notify an arbitrary member (the first // one) to prepare to send recovery information (by sending an // empty/default Txn). - // XXX rec_pwal - if (!newreps.empty() && seqno > 0 && !rec_pwal && !rec_twal) { + // XXX do_rec_pwal + if (!newreps.empty() && seqno > 0 && !do_rec_pwal && !do_rec_twal) { tables.sendRec(); } @@ -255,8 +263,9 @@ fds.push_back(newreps.take().fd()); } while (!delreps.empty()) { + cout << "broadcasting fail-ack" << endl; tables.sendFailAck(); - fds.erase( find(fds.begin(), fds.end(), delreps.take()) ); + fds.erase( find(fds.begin(), fds.end(), delreps.take().fd()) ); } tables.set_seqno(seqno); @@ -305,14 +314,16 @@ class tpcc_response_handler { public: - tpcc_response_handler(st_netfd_t replica, const int &seqno, int rid, + tpcc_response_handler(replica_info replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, - st_channel<st_netfd_t> &delreps, bool caught_up) + st_channel<replica_info> &newreps, + st_channel<replica_info> &delreps, bool caught_up) : replica(replica), seqno(seqno), rid(rid), recover_signals(recover_signals), + newreps(newreps), delreps(delreps), caught_up(caught_up), sub(recover_signals.subscribe()), @@ -329,7 +340,7 @@ finally f(bind(&tpcc_response_handler::cleanup, this)); commons::array<char> rbuf(read_buf_size), wbuf(buf_size); - st_reader reader(replica, rbuf.get(), rbuf.size()); + st_reader reader(replica.fd(), rbuf.get(), rbuf.size()); writer w(lambda(const void*, size_t) { throw not_supported_exception("response handler should not be writing"); }, wbuf.get(), wbuf.size()); @@ -356,10 +367,17 @@ if (res.seqno() == -1) { st_intr intr(stop_hub); - cout << "got a failed node" << endl; + cout << "node " << show_sockaddr(replica.fd()) << " failed" << endl; delreps.push(replica); - readmsg(reader, res); + { + st_intr intr(kill_hub); + readmsg(reader, res); + } + checkeq(res.seqno(), -1); + cout << "failed node " << show_sockaddr(replica.fd()) + << " resumed; resuming at " << seqno - 1 << endl; last_seqno = seqno - 1; + newreps.push(replica); } else { if (res.seqno() < last_seqno) @@ -384,7 +402,7 @@ if (check_interval(res.seqno(), handle_responses_display)) { cout << rid << ": " << "got response " << res.seqno() << " from " - << replica << "; "; + << show_sockaddr(replica.fd()) << "; "; long long display_time = current_time_millis(); showtput("handling", display_time, last_display_time, res.seqno(), res.seqno() - handle_responses_display); @@ -431,11 +449,11 @@ } } - st_netfd_t replica; + replica_info replica; const int &seqno; int rid; st_multichannel<long long> &recover_signals; - st_channel<st_netfd_t> &delreps; + st_channel<replica_info> &newreps, &delreps; bool caught_up; st_channel<long long> ⊂ long long start_time, recovery_start_time, recovery_end_time; @@ -443,12 +461,13 @@ }; void -handle_tpcc_responses(st_netfd_t replica, const int &seqno, int rid, +handle_tpcc_responses(replica_info replica, const int &seqno, int rid, st_multichannel<long long> &recover_signals, - st_channel<st_netfd_t> &delreps, bool caught_up) + st_channel<replica_info> &newreps, + st_channel<replica_info> &delreps, bool caught_up) { - tpcc_response_handler h(replica, seqno, rid, recover_signals, delreps, - caught_up); + tpcc_response_handler h(replica, seqno, rid, recover_signals, newreps, + delreps, caught_up); h.run(); } @@ -458,6 +477,7 @@ st_channel<chunk> &backlog, int init_seqno, int mypos, int nnodes) { + snapshot_writer_busy.set(false); bool caught_up = init_seqno == 0; // Means we're currently ignoring the incoming txns until we see a fail-ack // from the leader. @@ -478,16 +498,20 @@ function<void(anchored_stream_reader& reader)> overflow_fn = lambda(anchored_stream_reader &reader) { - if (__ref(caught_up)) { + if (__ref(caught_up) || __ref(depleting)) { // Anchor should already be correctly set, so just shift down. shift_reader(reader); } else if (__ref(first_seqno_in_chunk) == __ref(seqno) + 1) { // Has the replayer just caught up to the start of the chunk? ASSERT(reader.buf().get() == reader.anchor()); - // Replay all messages up to but not included the current unprocessed + long long time0 = current_time_millis(); + int seqno0 = __ref(seqno); + // Replay all messages up to but not including the current unprocessed // message (which we may be in the middle of receiving, triggering this // overflow). process_buf(reader.anchor(), __ref(marker), __ref(req), __ref(seqno)); + showtput("final buffer replayer caught up", + current_time_millis(), time0, __ref(seqno), seqno0); // Update the anchor to point to the unprocessed message, so that we // shift the unprocessed message down. reader.anchor() = __ref(marker); @@ -523,6 +547,7 @@ __ref(seqno_caught_up)); } __ref(send_states).push(recovery_t()); + snapshots.push(recovery_t()); __ref(w).mark_and_flush(); }); @@ -535,6 +560,19 @@ w.mark_and_flush(); }; + function<void()> takemsg = lambda() { + __ref(marker) = __ref(reader).start(); + { + st_intr intr(stop_hub); + readmsg(__ref(reader), __ref(req)); + } + if (__ref(req).seqno() == -1) { + // End of stream. + cout << "got stop command" << endl; + throw break_exception(); + } + }; + while (true) { marker = reader.start(); @@ -546,82 +584,97 @@ if (req.seqno() == -1) { // End of stream. + cout << "here" << endl; break; } else if (req.seqno() == -2) { // Prepare recovery msg. send_states.push(make_tpcc_recovery(mypos, nnodes, seqno)); + } else if (req.seqno() == -3) { + // Ignore fail-ack otherwise. } else { if (depleting) { - if (req.seqno() == -3) { - // Fail-ack. Should not be receiving anything until we resume. - failed.waitreset(); - send_failure_msg(); - // Note that we don't reset depleting; we want the next iteration to - // fall through to the next case in this if-else chain.... + // This is the first txn after resuming. Tell the recoverer task + // that this is the seqno to build up to (from another replica's + // log). + resume.push(req.seqno()); + depleting = false; + cout << "resumed at seqno " << seqno + << " while leader issues seqno " << req.seqno() << endl; + } - // Adjust reader so that the next xact (the first one after failure) - // will go to the start of the buffer; this is necessary for - // backlogging. - reader.set_anchor(); - shift_reader(reader); - } else if (!failed) { - // This is the first txn after resuming. Tell the recoverer task - // that this is the seqno to build up to (from another replica's - // log). - resume.push(req.seqno()); - depleting = false; + assert(!depleting); + + if (use_twal) wal.logbuf(marker, reader.start() - marker); + + // Backlog (auto/implicit) or process. + if (!caught_up) { + // If we were at the start of a new buffer (our chunk was recently reset). + if (reader.buf().get() == marker) + first_seqno_in_chunk = req.seqno(); + // If we fully caught up. + if (req.seqno() == seqno + 1) { + time_caught_up = current_time_millis(); + seqno_caught_up = seqno; + showtput("process_tpccs caught up; backlogged", + time_caught_up, start_time, seqno_caught_up, + first_seqno == -1 ? init_seqno - 1 : first_seqno); + caught_up = true; } - // Ignore all other messages. } + if (caught_up) { + // Process. + process_tpcc(req, seqno, &res); + ser(w, res); + reader.set_anchor(); - if (!depleting) { - if (req.seqno() == -3) { - // Ignore the fail-ack. - } else { - if (use_twal) wal.logbuf(marker, reader.start() - marker); + // Snapsphot. + if (disk && check_interval(req.seqno(), snapshot_interval)) + snapshot(seqno); + } - // Backlog (auto/implicit) or process. - if (!caught_up) { - // If we were at the start of a new buffer (our chunk was recently reset). - if (reader.buf().get() == marker) - first_seqno_in_chunk = req.seqno(); - // If we fully caught up. - if (req.seqno() == seqno + 1) { - time_caught_up = current_time_millis(); - seqno_caught_up = seqno; - showtput("process_tpccs caught up; backlogged", - time_caught_up, start_time, seqno_caught_up, - first_seqno == -1 ? init_seqno - 1 : first_seqno); - caught_up = true; - } - } - if (caught_up) { - // Process. - process_tpcc(req, seqno, &res); - ser(w, res); - reader.set_anchor(); - } + // Display/yield. + if (check_interval(req.seqno(), process_display)) + cout << (caught_up ? "processed req " : "backlogged req ") + << req.seqno() << endl; + if (check_interval(req.seqno(), yield_interval)) st_sleep(0); - // Display/yield. - if (check_interval(req.seqno(), process_display)) - cout << (caught_up ? "processed req " : "backlogged req ") - << req.seqno() << endl; - if (check_interval(req.seqno(), yield_interval)) st_sleep(0); + // Die. + if (fail_seqno > 0 && req.seqno() == fail_seqno) { + cout << "process_tpccs failing on seqno " << fail_seqno; + time_failed = current_time_millis(); + showtput("; live-processed ", time_failed, start_time, seqno, 0); + ASSERT(init_seqno == 0); + caught_up = false; + depleting = true; + seqno = -1; - // Die. - if (fail_seqno > 0 && req.seqno() == fail_seqno) { - cout << "process_tpccs failing on seqno " << fail_seqno; - time_failed = current_time_millis(); - showtput("; live-processed ", time_failed, start_time, seqno, 0); - ASSERT(init_seqno == 0); - caught_up = false; - depleting = true; - seqno = -1; + failed.set(); + send_failure_msg(); - failed.set(); - send_failure_msg(); + try { + while (true) { + takemsg(); + if (req.seqno() == -3) { + // Fail-ack. Should not be receiving anything until we resume. + cout << "got fail-ack" << endl; + failed.waitreset(); + cout << "resuming" << endl; + send_failure_msg(); + // Note that we don't reset depleting; we want the next iteration to + // fall through to the next case in this if-else chain.... + + // Adjust reader so that the next xact (the first one after failure) + // will go to the start of the buffer; this is necessary for + // backlogging. + reader.set_anchor(); + shift_reader(reader); + break; + } + // Ignore all other types of messages. } + } catch (break_exception &ex) { + break; } } @@ -631,6 +684,47 @@ } void +snapshot(int seqno) +{ + if (!snapshot_writer_busy.get()) { + long long start_time = current_time_millis(); + cout << "serializing snapshot, db state is now at seqno " + << seqno << ":" << endl; + g_tables->show(); + recovery_t recovery = g_tables->ser(0, 1, seqno); + showdatarate("serialized snapshot", recovery.size(), + current_time_millis() - start_time); + snapshots.push(move(recovery)); + snapshot_writer_busy.set(true); + } +} + +namespace { +concurrent_queue<recovery_t> snapshots; +atomic<bool> snapshot_writer_busy; +} + +void +snapshot_writer() +{ + cout << "snapshot writer starting" << endl; + while (true) { + recovery_t rec = snapshots.take(); + cout << "took one" << endl; + if (rec.get() == nullptr) break; + long long start_time = current_time_millis(); + { + ofstream of((snapshot_path + ".tmp").c_str()); + of.write(rec.get(), rec.size()); + } + check0x(rename((snapshot_path + ".tmp").c_str(), snapshot_path.c_str())); + showdatarate("wrote snapshot", rec.size(), + current_time_millis() - start_time); + snapshot_writer_busy.set(false); + } +} + +void process_buf(char *begin, char *end, TpccReq &req, int &seqno) { ASSERT(begin < end); Modified: ydb/trunk/src/util.clamp.lzz =================================================================== --- ydb/trunk/src/util.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/util.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -182,14 +182,6 @@ return inet_ntoa(sa.sin_addr); } -inline const string& -nfd2name(st_netfd_t fd) -{ - return nfdnames[fd]; -} - -map<st_netfd_t, string> nfdnames; - // // ST Threads // Modified: ydb/trunk/src/ydb.clamp.lzz =================================================================== --- ydb/trunk/src/ydb.clamp.lzz 2009-05-07 01:36:53 UTC (rev 1371) +++ ydb/trunk/src/ydb.clamp.lzz 2009-05-07 02:36:29 UTC (rev 1372) @@ -1,6 +1,9 @@ #src #include "unsetprefs.h" #include <boost/program_options.hpp> +#include <boost/thread/thread.hpp> +#include <commons/threads.h> +#include <pthread.h> #include <commons/st/sockets.h> #include <commons/st/threads.h> #include <csignal> // sigaction, etc. @@ -115,9 +118,11 @@ "yield periodically during catch-up phase of recovery (for recoverer)") ("multirecover,m", po::bool_switch(&multirecover), "recover from multiple hosts, instead of just one (specified via leader)") - ("rec-twal", po::bool_switch(&rec_twal), + ("rec-snap", po::bool_switch(&do_rec_snapshot), + "recover from snapshot") + ("rec-twal", po::bool_switch(&do_rec_twal), "recover from twal") - ("rec-pwal", po::bool_switch(&rec_pwal), + ("rec-pwal", po::bool_switch(&do_rec_pwal), "recover from pwal") ("disk,k", po::bool_switch(&disk), "use disk-based recovery") @@ -152,6 +157,10 @@ "run the leader (run replica by default)") ("exit-on-recovery,x", po::bool_switch(&stop_on_recovery), "exit after the joiner fully recovers (for leader)") + ("snap-path", po::value<string>(&snapshot_path)->default_value(string("snapshots")), + "path to the snapshot file (for worker)") + ("snapshot-int", po::value<int>(&snapshot_interval)->default_value(100000), + "number of txns to process between snapshots (for worker)") ("batch-size,b", po::value<int>(&batch_size)->default_value(100), "number of txns to batch up in each msg (for leader)") ("tpcc", po::bool_switch(&do_tpcc), This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 02:22:23
|
Revision: 1371 http://assorted.svn.sourceforge.net/assorted/?rev=1371&view=rev Author: yangzhang Date: 2009-05-07 01:36:53 +0000 (Thu, 07 May 2009) Log Message: ----------- updated docs Modified Paths: -------------- cpp-commons/trunk/README Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2009-05-07 01:27:52 UTC (rev 1370) +++ cpp-commons/trunk/README 2009-05-07 01:36:53 UTC (rev 1371) @@ -42,8 +42,13 @@ - utilities for streams - utilities for [tamer] - x86 architecture-specific tools -- object memory pool with coarse-grained versioned pages +- serializable object memory pool with coarse-grained versioned pages - C++ abstractions for `mmap` +- concurrent queue +- stream formatters (e.g. for arrays) +- bit manipulation +- simple `atomic<T>` +- convenience functions for [libcrypto] Third-party code: @@ -60,6 +65,7 @@ [C++0x `unique_ptr`]: svn://gcc.gnu.org/svn/gcc/trunk/libstdc++-v3/include/bits/unique_ptr.h [C++03-emulated TR1 `unique_ptr.hpp`]: http://home.roadrunner.com/~hinnant/unique_ptr03.html [Yonat's STL extensions]: http://ootips.org/yonat/4dev/ +[libcrypto]: http://www.openssl.org/ Setup ----- This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 01:28:02
|
Revision: 1370 http://assorted.svn.sourceforge.net/assorted/?rev=1370&view=rev Author: yangzhang Date: 2009-05-07 01:27:52 +0000 (Thu, 07 May 2009) Log Message: ----------- - tweaked Makefile - added tests for bit manip functions - added more tests for versioned_heap - added test for deque Modified Paths: -------------- cpp-commons/trunk/src/test/Makefile cpp-commons/trunk/src/test/memory.cc cpp-commons/trunk/src/test/versioned_heap.cc Added Paths: ----------- cpp-commons/trunk/src/test/deque.cc Modified: cpp-commons/trunk/src/test/Makefile =================================================================== --- cpp-commons/trunk/src/test/Makefile 2009-05-07 01:25:59 UTC (rev 1369) +++ cpp-commons/trunk/src/test/Makefile 2009-05-07 01:27:52 UTC (rev 1370) @@ -1,6 +1,5 @@ CXXFLAGS = \ - -MD \ - -g3 \ + -MD -g3 -pipe \ -Wall \ -Werror \ -Wextra \ Added: cpp-commons/trunk/src/test/deque.cc =================================================================== --- cpp-commons/trunk/src/test/deque.cc (rev 0) +++ cpp-commons/trunk/src/test/deque.cc 2009-05-07 01:27:52 UTC (rev 1370) @@ -0,0 +1,16 @@ +#include <commons/deque.h> +#include <boost/foreach.hpp> +#include "test.h" + +TEST(deque, basics) { + deque<int> d; + for (int i = 0; i < 10; ++i) { + d.push_back(i); + } + int i = 0; + foreach (int x, d) { + EXPECT_EQ(i, x); + ++i; + } + EXPECT_EQ(10, i); +} Modified: cpp-commons/trunk/src/test/memory.cc =================================================================== --- cpp-commons/trunk/src/test/memory.cc 2009-05-07 01:25:59 UTC (rev 1369) +++ cpp-commons/trunk/src/test/memory.cc 2009-05-07 01:27:52 UTC (rev 1370) @@ -11,3 +11,8 @@ EXPECT_EQ(2, whole_units(101, 100)); EXPECT_EQ(2, whole_units(102, 100)); } + +TEST(memory, bits) { + EXPECT_EQ(0xfff0UL, clear_bits_below(0xffffUL, 4)); + EXPECT_EQ(0x0fffUL, clear_bits_above(0xffffUL, 12)); +} Modified: cpp-commons/trunk/src/test/versioned_heap.cc =================================================================== --- cpp-commons/trunk/src/test/versioned_heap.cc 2009-05-07 01:25:59 UTC (rev 1369) +++ cpp-commons/trunk/src/test/versioned_heap.cc 2009-05-07 01:27:52 UTC (rev 1370) @@ -1,6 +1,8 @@ #include <commons/check.h> #include <commons/unique_ptr.h> #include <commons/versioned_heap.h> +#include <algorithm> +#include <vector> #include "test.h" struct s { char xs[40000]; }; @@ -9,6 +11,8 @@ typedef versioned_heap<s> heap; heap h; + vector<s*> ss; + s &a = h.construct(); EXPECT_EQ(0, h.hdrof(&a).version); s &b = h.construct(); @@ -17,6 +21,9 @@ EXPECT_EQ(2, h.hdrof(&c).version); EXPECT_EQ(&a + 1, &b); EXPECT_EQ(&b + 1, &c); + ss.push_back(&a); + ss.push_back(&b); + ss.push_back(&c); s &d = h.construct(); EXPECT_EQ(0, h.hdrof(&d).version); @@ -26,13 +33,24 @@ EXPECT_EQ(2, h.hdrof(&f).version); EXPECT_EQ(&d + 1, &e); EXPECT_EQ(&e + 1, &f); + ss.push_back(&d); + ss.push_back(&e); + ss.push_back(&f); h.touch(d); EXPECT_EQ(h.hdrof(&d).version, 3); h.destroy(d); EXPECT_EQ(h.hdrof(&d).version, 4); + ss.erase(find(ss.begin(), ss.end(), &d)); + EXPECT_EQ(5, ss.size()); + size_t i = 0; + for (versioned_heap<s>::iterator it = h.begin(); + it.cur() != nullptr; it.next()) { + EXPECT_EQ(ss[i++], it.cur()); + } + h.touch(a, 4); foreach (char *page, h.pages()) { EXPECT_EQ(h.hdrof(page).version, 4); @@ -40,7 +58,15 @@ s &g = h.construct(); EXPECT_EQ(&d, &g); + ss.insert(find(ss.begin(), ss.end(), &e), &g); + EXPECT_EQ(6, ss.size()); + i = 0; + for (versioned_heap<s>::iterator it = h.begin(); + it.cur() != nullptr; it.next()) { + EXPECT_EQ(ss[i++], it.cur()); + } + void *meta = new char[h.metasize()], *data; check0x(posix_memalign(&data, h.pgsz(), datasize(h))); unique_ptr<char[]> umeta(reinterpret_cast<char*>(meta)), @@ -59,3 +85,33 @@ } } } + +TEST(versioned_heap, simple_bitset) { + uint32_t buffer[20]; + simple_bitset &xs = *(new (buffer) simple_bitset(20, 0)); + xs.set(77); // word 2 + xs.set(188); // word 5 + xs.clr(77); + EXPECT_EQ(size_t(-1), xs.find_first(3)); + EXPECT_EQ(188, xs.find_first(20)); +} + +TEST(versioned_heap, realloc) { + using namespace std; + versioned_heap<s> h; + vector<s*> ss; + for (int i = 0; i < 10; ++i) { + ss.push_back(&h.construct()); + } + EXPECT_EQ(4, h.pages().size()); + + h.free(ss[2]); // last + h.free(ss[4]); // mid + h.free(ss[6]); // first + + for (int i = 0; i < 3; ++i) { + ss[2 + 2 * i] = &h.construct(); + } + + EXPECT_EQ(4, h.pages().size()); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-07 01:26:04
|
Revision: 1369 http://assorted.svn.sourceforge.net/assorted/?rev=1369&view=rev Author: yangzhang Date: 2009-05-07 01:25:59 +0000 (Thu, 07 May 2009) Log Message: ----------- - major change to versioned_heap to place free bits inline in the pages - added formatting (format_array), crypto, atomic - extended concurrent_queue - fixed const-ness of array, sized_array - added write_file and made functions static/unused - enhanced the deque, but currently in a broken state - added alignof, clr_bit, set_bit_mut, clr_bit_mut, clear_bits_above, clear_bits_below Modified Paths: -------------- cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/deque.h cpp-commons/trunk/src/commons/files.h cpp-commons/trunk/src/commons/memory.h cpp-commons/trunk/src/commons/squeue.h cpp-commons/trunk/src/commons/versioned_heap.h Added Paths: ----------- cpp-commons/trunk/src/commons/atomic.h cpp-commons/trunk/src/commons/crypto.h cpp-commons/trunk/src/commons/formatting.h Modified: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/array.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -32,10 +32,8 @@ T *get() const { return p_; } T *begin() const { return p_; } T *end() const { return p_ + n_; } - const T &operator[](size_t i) const { return p_[i]; } - T &operator[](size_t i) { return p_[i]; } - const T &at(size_t i) const { ASSERT(i < n_); return p_[i]; } - T &at(size_t i) { ASSERT(i < n_); return p_[i]; } + T &operator[](size_t i) const { return p_[i]; } + T &at(size_t i) const { ASSERT(i < n_); return p_[i]; } void reset(T *p, size_t n) { p_ = p; n_ = n; } private: T *p_; @@ -69,17 +67,14 @@ } return *this; } - operator const T*() const { return p_.get(); } - operator T*() { return p_.get(); } + operator T*() const { return p_.get(); } size_t size() const { return n_; } T *get() const { return p_.get(); } T *release() { return p_.release(); } T *begin() const { return p_.get(); } T *end() const { return this->get() + n_; } - const T &at(size_t i) const { ASSERT(i < n_); return p_[i]; } - T &at(size_t i) { ASSERT(i < n_); return p_[i]; } - const T &operator[](size_t i) const { return p_[i]; } - T &operator[](size_t i) { return p_[i]; } + T &at(size_t i) const { ASSERT(i < n_); return p_[i]; } + T &operator[](size_t i) const { return p_[i]; } void reset(T *p, size_t n) { p_.reset(p); n_ = n; } private: unique_ptr<T[]> p_; Added: cpp-commons/trunk/src/commons/atomic.h =================================================================== --- cpp-commons/trunk/src/commons/atomic.h (rev 0) +++ cpp-commons/trunk/src/commons/atomic.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -0,0 +1,30 @@ +#ifndef COMMONS_ATOMIC_H +#define COMMONS_ATOMIC_H + +#include <boost/thread/mutex.hpp> + +namespace commons +{ + using namespace boost; + + template<typename T> + class atomic + { + private: + T x_; + mutex m_; + public: + atomic() {} + template<typename U> atomic(const U &x) : x_(x) {} + T get() { + mutex::scoped_lock lock(m_); + return x_; + } + void set(const T &x) { + mutex::scoped_lock lock(m_); + x_ = x; + } + }; +} + +#endif Added: cpp-commons/trunk/src/commons/crypto.h =================================================================== --- cpp-commons/trunk/src/commons/crypto.h (rev 0) +++ cpp-commons/trunk/src/commons/crypto.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -0,0 +1,25 @@ +#ifndef COMMONS_CRYPTO_H +#define COMMONS_CRYPTO_H + +#include <openssl/sha.h> + +namespace commons { + + struct sha1md { + unsigned char md[SHA_DIGEST_LENGTH]; + }; + + UNUSED static sha1md + sha1(const char *data, size_t len) + { + sha1md md; + SHA_CTX c; + SHA1_Init(&c); + SHA1_Update(&c, data, len); + SHA1_Final(md.md, &c); + return md; + } + +} + +#endif Modified: cpp-commons/trunk/src/commons/deque.h =================================================================== --- cpp-commons/trunk/src/commons/deque.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/deque.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -20,27 +20,70 @@ class deque { private: + class chunk { public: - chunk() : xs(node_size) {} - private: + chunk(size_t node_size) { xs.reserve(node_size); } vector<T> xs; }; list<chunk> chunks; size_t node_size; + public: + + class iterator + { + private: + typename list<chunk>::iterator c_; + size_t pos_; + public: + typedef forward_iterator_tag iterator_category; + typedef T value_type; + typedef ptrdiff_t difference_type; + typedef value_type *pointer; + typedef value_type &reference; + iterator(const typename list<chunk>::iterator &c, size_t pos) : + c_(c), pos_(pos) {} + T &operator*() { return c_->xs[pos_]; } + T &operator->() { return c_->xs[pos_]; } + T &operator++() { + if (pos_ == c_->xs.size()) { + pos_ = 0; + ++c_; + } + return c_->xs[++pos_]; + } + T &operator++(int) { + T &x = *(*this); + ++(*this); + return x; + } + + bool operator==(const iterator &that) const { + return c_ == that.c_ && pos_ == that.pos_; + } + bool operator!=(const iterator &that) const { + return !(*this == that); + } + }; + + typedef iterator const_iterator; + deque(size_t node_size = 8192) : node_size(node_size) {} void push_back(const T& x) { - chunk& last = chunks.back(); - if (last.xs.size() == last.xs.capacity()) { - chunks.push_back(chunk()); + if (chunks.empty() || + chunks.back().xs.size() == chunks.back().xs.capacity()) { + chunks.push_back(chunk(node_size)); } - last.push_back(x); + chunks.back().xs.push_back(x); } + + iterator begin() { return iterator(chunks.begin(), 0); } + iterator end() { return iterator(chunks.end(), 0); } }; } Modified: cpp-commons/trunk/src/commons/files.h =================================================================== --- cpp-commons/trunk/src/commons/files.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/files.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -32,7 +32,7 @@ /** * Get the size of a file in bytes. */ - off_t file_size(const char *path) + UNUSED static off_t file_size(const char *path) { struct stat s; check0x(stat(path, &s)); @@ -42,7 +42,7 @@ /** * Get the size of a file in bytes. */ - off_t file_size(int fd) + UNUSED static off_t file_size(int fd) { struct stat s; check0x(fstat(fd, &s)); @@ -52,7 +52,7 @@ /** * Read in a whole file as a string. */ - void read_file_as_string ( const string & name, string & out ) { + UNUSED static void read_file_as_string ( const string & name, string & out ) { ifstream in ( name.c_str() ); if (in.fail()) throw file_not_found_exception( name ); out = string ( istreambuf_iterator<char> ( in ), istreambuf_iterator<char>() ); @@ -61,7 +61,7 @@ /** * Read in a whole file as a vector of chars. */ - void read_file_as_vector ( const string & name, vector<char> & out ) { + UNUSED static void read_file_as_vector ( const string & name, vector<char> & out ) { ifstream in ( name.c_str() ); if ( in.fail() ) throw file_not_found_exception( name ); out = vector<char> ( istreambuf_iterator<char> ( in ), istreambuf_iterator<char>() ); @@ -70,21 +70,38 @@ /** * Read in a whole file as an array. */ - array<char> read_file_as_array(const char *path) + UNUSED static array<char> read_file_as_array(const char *path) { closingfd fd(checknnegerr(open(path, O_RDONLY))); array<char> buf(file_size(fd)); - checkeqnneg(read(fd, buf, buf.size()), static_cast<ssize_t>(buf.size())); + checkeqnneg(read(fd, buf, buf.size()), ssize_t(buf.size())); return buf; } /** + * Write a whole array to disk. + */ + UNUSED static void write_file(const char *path, const char *buf, size_t len) + { + closingfd fd(checknnegerr(creat(path, 0644))); + checkeqnneg(write(fd, buf, len), ssize_t(len)); + } + + /** + * Write a whole array to disk. + */ + UNUSED static void write_file(const char *path, const array<char> &buf) + { + write_file(path, buf, buf.size()); + } + + /** * Load an entire file directly into buf and also give us the length of the * buffer (size of the file). * TODO this probably isn't very safe, since we're demoting an off_t to a * size_t. Is there a healthier approach? */ - char * + UNUSED static char * load_file(const char *path, size_t & len, unsigned int ncpus) { struct stat sb; Added: cpp-commons/trunk/src/commons/formatting.h =================================================================== --- cpp-commons/trunk/src/commons/formatting.h (rev 0) +++ cpp-commons/trunk/src/commons/formatting.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -0,0 +1,40 @@ +#ifndef COMMONS_FORMATTING_H +#define COMMONS_FORMATTING_H + +#include <string> +#include <ostream> +#include <iomanip> +#include <commons/array.h> + +namespace commons { + + using namespace std; + + // TODO: change to standard ostream<< + + UNUSED static ostream & + format_array(ostream &out, const char *buf, size_t len) + { + for (size_t i = 0; i < len; ++i) { + out << hex << setfill('0') << setw(2) << buf[i] << len; + } + return out; + } + + UNUSED static ostream & + format_array(ostream &out, const array<char> arr) + { + return format_array(out, arr, arr.size()); + } + + UNUSED static string + format_array_as_string(const char *buf, size_t len) + { + stringstream ss; + format_array(ss, buf, len); + return ss.str(); + } + +} + +#endif Modified: cpp-commons/trunk/src/commons/memory.h =================================================================== --- cpp-commons/trunk/src/commons/memory.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/memory.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -2,11 +2,16 @@ #define COMMONS_MEMORY_H #include <cstring> // for size_t +#include <limits> #include <commons/utility.h> // for UNUSED +#define alignof __alignof__ // gcc + namespace commons { + using namespace std; + /** * Copy a datum directly to a memory location. Useful for serializing small * data (e.g. ints). Faster than memcpy. @@ -40,7 +45,8 @@ template<typename T> void write(const T &x) { memput(p_, x); p_ += sizeof(T); } /** Get the current value of the pointer. */ - void *ptr() const { return p_; } + char *ptr() const { return p_; } + char *&ptr() { return p_; } /** Skip n bytes. */ void skip(size_t n) { p_ += n; } }; @@ -57,13 +63,16 @@ T &read() { void *p = p_; p_ += sizeof(T); return memget<T>(p); } template<typename T> T *readptr() { return reinterpret_cast<T*>(readptr(sizeof(T))); } - void *readptr(size_t n) { void *p = p_; p_ += n; return p; } - void *ptr() const { return p_; } + char *readptr(size_t n) { char *p = p_; p_ += n; return p; } + char *ptr() const { return p_; } + char *&ptr() { return p_; } void skip(size_t n) { p_ += n; } }; - /** Round `value` up to the nearest multiple of `unit` and return this - * factor. */ + /** + * Return `ceil(value / factor)`. Round `value` up to the nearest multiple + * of `unit` and return this factor. + */ template<typename T> T whole_units(T value, T unit) { return (value + unit - 1) / unit; @@ -85,12 +94,31 @@ return x | (1U << bit); } - /** Note: there exists a setbit macro in /usr/include/sys/param.h */ template<typename T> - T set_bit(T x, size_t bit, T val) { - return x | (val << bit); + T clr_bit(T x, size_t bit) { + return x & ~(1U << bit); } + template<typename T> + void set_bit_mut(T &x, size_t bit) { + x |= 1U << bit; + } + + template<typename T> + void clr_bit_mut(T &x, size_t bit) { + x &= ~(1U << bit); + } + + template<typename T> + T clear_bits_below(T x, size_t nbits) { + return x & (~T(0) << nbits); + } + + template<typename T> + T clear_bits_above(T x, size_t nbits) { + return x & (~T(0) >> (numeric_limits<T>::digits - nbits)); + } + } #endif Modified: cpp-commons/trunk/src/commons/squeue.h =================================================================== --- cpp-commons/trunk/src/commons/squeue.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/squeue.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -19,6 +19,18 @@ condition_variable c_; public: + /** + * Push x onto queue iff queue.size() <= n. + */ + template<typename U> void push_cond(U &&x, size_t n) + { + mutex::scoped_lock lock(m_); + if (q_.size() <= n) + q_.push(forward<U>(x)); + lock.unlock(); + c_.notify_one(); + } + template<typename U> void push(U &&x) { mutex::scoped_lock lock(m_); @@ -27,6 +39,12 @@ c_.notify_one(); } + size_t size() const + { + mutex::scoped_lock lock(m_); + return q_.size(); + } + bool empty() const { mutex::scoped_lock lock(m_); Modified: cpp-commons/trunk/src/commons/versioned_heap.h =================================================================== --- cpp-commons/trunk/src/commons/versioned_heap.h 2009-05-05 21:24:40 UTC (rev 1368) +++ cpp-commons/trunk/src/commons/versioned_heap.h 2009-05-07 01:25:59 UTC (rev 1369) @@ -5,7 +5,10 @@ #include <cstdlib> #include <vector> #include <boost/foreach.hpp> +#include <deque> #include <commons/memory.h> +#include <commons/nullptr.h> +#include <boost/pending/lowest_bit.hpp> #define foreach BOOST_FOREACH namespace commons @@ -13,67 +16,241 @@ using namespace std; + class simple_bitset + { + private: + typedef uint32_t Block; + Block words[0]; + static const size_t bits_per_block = numeric_limits<Block>::digits; + public: + /** Initialize all bits. */ + simple_bitset(size_t nwords, Block value = ~Block(0)) { + for (size_t word = 0; word < nwords; ++word) + words[word] = value; + } + /** Find the first bit. */ + size_t find_first(size_t nwords) { + for (size_t word = 0; word < nwords; ++word) + if (words[word] != 0) + return word * bits_per_block + lowest_bit(words[word]); + return size_t(-1); + } + /** Which word contains bit `i`? */ + static size_t bit2word(size_t i) { return i / bits_per_block; } + /** Which bit within the word is bit `i`? */ + static size_t bit2bit(size_t i) { return i & (bits_per_block - 1); } + /** Set the i-th bit. */ + void set(size_t i) { set_bit_mut(words[bit2word(i)], bit2bit(i)); } + /** Clear the i-th bit. */ + void clr(size_t i) { clr_bit_mut(words[bit2word(i)], bit2bit(i)); }; + /** Get the i-th bit. */ + bool get(size_t i) { return (words[bit2word(i)] >> bit2bit(i)) & 1; } + }; + + template<typename T, typename V = int> class versioned_heap; + /** + * Calculates the number of elements we can pack into a page based on the + * freemap size and the size and alignment of the value type. + */ + template<typename T> + void compute_layout(size_t pgsz, size_t &pgcnt, + size_t &values_offset, size_t &freemap_words) + { + typedef typename commons::versioned_heap<T>::hdr hdr; + // TODO The following is correct. + // + // Memory layout diagram: + // + // [header][freemap ] [value 0]... + // [word 0][word 1][word 2][word 3] + // [align 0][align 1][align 2][align 3][align 4]... + // + // TODO The following is incorrect. + // + // Memory layout diagram: + // + // [header][freemap ] [value 0]... + // [word 0][word 1][word 2] + // [align 0][align 1][align 2][align 3]... + // [freemap bytes by align ] + // + // Calculate the maximum possible amount of space we can reserve for the + // freemap based on the alignof(T). This is also the + // + // Let's call each unit of alignof(T) bytes an _align. + // + // Then the formula is: + // + // naligns * alignment + naligns * 8 * alignment * sizeof(T) <= pgsz + // + // Solve for naligns: + // + // naligns = pgsz / (alignment + 8 * alignment * sizeof(T)) + UNUSED size_t a = alignof(T), b = sizeof(T); + + // TODO: use simple_bitset instead and address the following: + // warning: invalid access to non-static data member `CNeoIOBlock::fPB' of NULL object + // warning: (perhaps the `offsetof' macro was used incorrectly) + struct foo { hdr h; uint32_t fm; }; + size_t freemap_offset = offsetof(foo, fm); + size_t freemap_aligns_by_alignment = + (pgsz - freemap_offset) / (alignof(T) + 8 * alignof(T) * sizeof(T)); + + // Try to increase the space by 1 alignment if possible, since the last + // alignment might have sent the space required for actual objects over the + // limit. + if (freemap_offset + + (freemap_aligns_by_alignment + 1) * alignof(T) + + freemap_aligns_by_alignment * 8 * alignof(T) * sizeof(T) < pgsz) + ++freemap_aligns_by_alignment; + + size_t freemap_bytes_by_alignment = + max(sizeof(uint32_t), freemap_aligns_by_alignment * alignof(T)); + values_offset = freemap_offset + freemap_bytes_by_alignment; + + // Now see how many uint32_t's we can fit into this space. Our freemap + // implementation only uses whole uint32_t's for efficiency. This will + // yield the exact length of the freemap. + + freemap_words = freemap_bytes_by_alignment / sizeof(uint32_t); + size_t freemap_bytes = freemap_words * sizeof(uint32_t); + + // Now we know how much space remains for the number of elements. + + pgcnt = ( pgsz - freemap_offset - freemap_bytes ) / sizeof(T); + } + + /** * A simple object memory pool that places things into pages that are tagged * with a version number. */ - template<typename T, typename VersionType = int> + template<typename T, typename VersionType> class versioned_heap { public: typedef VersionType version_type; typedef T value_type; struct hdr { + // Version number of this page. version_type version; - uint32_t index; + // Index. Defines ordering in pages_. Used for + // serialization. + size_t index; + // Slightly tricky semantics. next_free == nullptr if not in free list + // *or* if it's the last node in the free list. + hdr *next_free; + // Whether this page is memory-managed by the heap, or if it's + // externally managed. Deserialized pages are externally managed, in + // order to reduce the amount of memory copies when deserializing. bool managed; }; + + class iterator + { + private: + versioned_heap &h_; + typename deque<char*>::iterator page_; + size_t pos_; + + /** + * Advance to next valid entry and return it. + */ + T *advance() { + do { + if (++pos_ == h_.pgcnt()) { + pos_ = 0; + ++page_; + if (page_ == h_.pages().end()) + return nullptr; + } + } while (h_.freemapof(*page_).get(pos_)); + return h_.firstof(*page_) + pos_; + } + + public: + iterator(versioned_heap &h) : + h_(h), page_(h.pages_.begin()), pos_(0) + { + // If the first slot is invalid, advance to first valid slot. + if (h_.freemapof(*page_).get(pos_)) + advance(); + } + + /** Return current. */ + T *cur() { + return page_ == h_.pages().end() ? + nullptr : h_.firstof(*page_) + pos_; + } + + /** Return current and advance to next. */ + T *next() { + T *ret = cur(); + if (ret != nullptr) advance(); + return ret; + } + }; + versioned_heap(size_t pgsz = 131072) : - last_i_(0), pgsz_(pgsz), pgcnt_((pgsz - sizeof(hdr)) / sizeof(T)) {} + first_free_(nullptr), + last_free_(nullptr), + pgsz_(pgsz) + { + compute_layout<T>(pgsz, pgcnt_, values_offset_, freemap_words_); + } /** Frees all pages, without destroying the objects. */ ~versioned_heap() { foreach (char *page, pages_) if (hdrof(page).managed) ::free(page); } - char *alloc() { - for (size_t counter = 1; counter <= freemap_.size(); ++counter) { - size_t i = (last_i_ + counter) % freemap_.size(); - if (freemap_[i]) { - freemap_[i] = false; - char *pos = pages_[i / pgcnt_] + sizeof(hdr) + - (i % pgcnt_) * sizeof(T); - ++hdrof(pos).version; - last_i_ = i; + + T *alloc() { + while (first_free_ != nullptr) { + // Find the first free slot. + simple_bitset &freemap = freemapof(first_free_); + size_t i = freemap.find_first(freemap_words_); + if (i < pgcnt_) { + freemap.clr(i); + T *pos = firstof(first_free_) + i; + // Touch the page. + ++first_free_->version; return pos; } + // Pop this page off the front of the free list. + hdr *old = first_free_; + first_free_ = first_free_->next_free; + old->next_free = nullptr; } + char *page; posix_memalign(reinterpret_cast<void**>(&page), pgsz_, pgsz_); hdr &h = *reinterpret_cast<hdr*>(page); h.version = 0; - h.index = uint32_t(pages_.size()); + h.index = pages_.size(); + h.next_free = nullptr; h.managed = true; + first_free_ = last_free_ = &h; pages_.push_back(page); - freemap_.resize(freemap_.size() + pgcnt_, true); - freemap_[freemap_.size() - pgcnt_] = false; - return page + sizeof(hdr); + (new (&freemapof(page)) simple_bitset(freemap_words_))->clr(0); + return firstof(page); } + T &construct() { - char *buf = alloc(); + T *buf = alloc(); try { return *new (buf) T; } catch (...) { free(buf); throw; } } // TODO: replace with code gen template<typename A0> T &construct(const A0 &a0) { - char *buf = alloc(); + T *buf = alloc(); try { return *new (buf) T(a0); } catch (...) { free(buf); throw; } } template<typename A0, typename A1> T &construct(const A0 &a0, const A1 &a1) { - char *buf = alloc(); + T *buf = alloc(); try { return *new (buf) T(a0, a1); } catch (...) { free(buf); throw; } } @@ -81,9 +258,19 @@ hdr &h = hdrof(p); if (version == 0) ++h.version; else h.version = version; - char *px = reinterpret_cast<char*>(p); - char *p0 = reinterpret_cast<char*>(&h + 1); - freemap_[h.index * pgcnt_ + (px - p0) / sizeof(T)] = true; + T *px = reinterpret_cast<T*>(p); + T *p0 = firstof(&h); + freemapof(&h).set(px - p0); + + // If this page is not in the free list, then add it to the free list. + if (h.next_free == nullptr && last_free_ != &h) { + h.next_free = first_free_; + // If the free list is empty, initialize it. + if (first_free_ == nullptr) { + last_free_ = &h; + } + first_free_ = &h; + } } void destroy(T &x, version_type version = 0) { x.~T(); @@ -93,23 +280,38 @@ void touch(T &p, version_type version) { hdrof(&p).version = version; } size_t pgcnt() const { return pgcnt_; } size_t pgsz() const { return pgsz_; } - const vector<char*> &pages() const { return pages_; } + const deque<char*> &pages() const { return pages_; } + + // + // Page structure accessors. + // + /** Pronounced "header of". */ - const hdr &hdrof(void *p) const { + const hdr &hdrof(const void *p) const { return *reinterpret_cast<hdr*>(uintptr_t(p) & ~(pgsz_ - 1)); } hdr &hdrof(void *p) { return *reinterpret_cast<hdr*>(uintptr_t(p) & ~(pgsz_ - 1)); } + // TODO: use simple_bitset instead and address the following: + // warning: invalid access to non-static data member `CNeoIOBlock::fPB' of NULL object + // warning: (perhaps the `offsetof' macro was used incorrectly) + struct foo { hdr h; uint32_t fm; }; + simple_bitset &freemapof(void *page) { + return *reinterpret_cast<simple_bitset*>(reinterpret_cast<char*>(page) + offsetof(foo, fm)); + } + T *firstof(void *p) { + return reinterpret_cast<T*>(reinterpret_cast<char*>(p) + + values_offset_); + } /** * Calculate the required space for serializing the metadata. */ size_t metasize() const { - return sizeof last_i_ + sizeof pgsz_ + sizeof pgcnt_ + - sizeof freemap_.size() + - nbits2nints(freemap_.size()) * sizeof(uint32_t) + - sizeof datasize(*this); + return sizeof pgsz_ + + sizeof(size_t) + sizeof(size_t) + // first_free_, last_free_ + sizeof datasize(*this); } /** @@ -120,71 +322,79 @@ } /** - * Serializes metadata and data to out buffer. + * Serializes metadata to buffer. */ - void ser(void *meta, void *data) const { + void sermeta(void *meta) const { raw_writer w(meta); - - // Serialize metadata. - w.write(last_i_); + w.write(ptr2ind(first_free_)); + w.write(ptr2ind(last_free_)); w.write(pgsz_); - w.write(pgcnt_); + w.write(datasize(*this)); + } - // Serialize freemap. - w.write(freemap_.size()); - size_t freesize = nbits2nints(freemap_.size()); - for (size_t i = 0; i < freesize; ++i) { - uint32_t val = 0; - for (size_t j = 0; j < 32 && 32 * i + j < freemap_.size(); ++j) { - val = set_bit(val, j, uint32_t(freemap_[32 * i + j])); - } - w.write(val); - } - - // Serialize data. - w.write(datasize(*this)); + /** + * Serializes metadata and data to out buffer. + */ + void ser(void *meta, void *data) const { + sermeta(meta); serdata(*this, data); } /** - * Deserializes a heap in-place. + * Deserialize a heap in place. */ void deser(void *meta, void *data) { + size_t datasize = desermeta(meta, data); + deserdata(datasize, data); + } + + /** + * Iterator. + */ + iterator begin() { return iterator(*this); } + + private: + hdr *first_free_; + hdr *last_free_; + size_t pgsz_; + size_t pgcnt_; + size_t values_offset_; + size_t freemap_words_; + deque<char*> pages_; + + /** + * Deserializes a heap in-place, returning the length of the serialized + * data. + */ + size_t desermeta(void *meta, void *data) { raw_reader r(meta); // Deserialize metadata. - r.read(last_i_); + r.read(reinterpret_cast<size_t&>(first_free_)); + r.read(reinterpret_cast<size_t&>(last_free_)); r.read(pgsz_); - r.read(pgcnt_); + char *p0 = reinterpret_cast<char*>(data); + ind2ptr_mut(first_free_, p0, pgsz_); + ind2ptr_mut(last_free_, p0, pgsz_); + compute_layout<T>(pgsz_, pgcnt_, values_offset_, freemap_words_); - // Deserialize freemap. - size_t nbits; - r.read(nbits); - size_t nints = nbits2nints(nbits); - for (size_t i = 0; i < nints; ++i) { - uint32_t val; - r.read(val); - for (size_t j = 0; j < nbits && 32 * i + j < nbits; ++j) { - freemap_.push_back(get_bit(val, j)); - } - } + // Deserialize data. + return r.read<size_t>(); + } - // Deserialize data. - size_t datasize; - r.read(datasize); + /** + * Deserialize a heap in place. + */ + void deserdata(size_t len, void *data) { + pages_.clear(); char *p0 = reinterpret_cast<char*>(data); - for (char *p = p0; p < p0 + datasize; p += pgsz_) { + for (char *p = p0; p < p0 + len; p += pgsz_) { hdrof(p).managed = false; pages_.push_back(p); + ind2ptr_mut(hdrof(p).next_free, p0, pgsz_); } } - private: - size_t last_i_; - size_t pgsz_; - size_t pgcnt_; - vector<bool> freemap_; - vector<char*> pages_; }; /** @@ -201,15 +411,41 @@ * (see datasize). */ template<typename T> - void serdata(const versioned_heap<T> &h, void *out) + void serdata(const versioned_heap<T> &heap, void *out) { char *p = reinterpret_cast<char*>(out); - foreach (void *page, h.pages()) { - memcpy(p, page, h.pgsz()); - p += h.pgsz(); + size_t i = 0; + foreach (void *page, heap.pages()) { + memcpy(p, page, heap.pgsz()); + typedef typename versioned_heap<T>::hdr hdr; + hdr &h = *reinterpret_cast<hdr*>(p); + assert(i == h.index); + ptr2ind_mut(h.next_free); + p += heap.pgsz(); + ++i; } } + template<typename T> + size_t ptr2ind(T *p) { + return p == nullptr ? size_t(-1) : p->index; + } + + template<typename T> + void ptr2ind_mut(T *&p) { + p = reinterpret_cast<T*>(ptr2ind(p)); + } + + template<typename T> + T *ind2ptr(size_t i, char *pages, size_t pgsz) { + return reinterpret_cast<T*>(i == size_t(-1) ? nullptr : pages + pgsz * i); + } + + template<typename T> + void ind2ptr_mut(T *&p, char *pages, size_t pgsz) { + p = ind2ptr<T>(reinterpret_cast<size_t>(p), pages, pgsz); + } + } #undef foreach This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 21:24:48
|
Revision: 1368 http://assorted.svn.sourceforge.net/assorted/?rev=1368&view=rev Author: yangzhang Date: 2009-05-05 21:24:40 +0000 (Tue, 05 May 2009) Log Message: ----------- added clobber-if-diff Added Paths: ----------- shell-tools/trunk/src/clobber-if-diff.py Added: shell-tools/trunk/src/clobber-if-diff.py =================================================================== --- shell-tools/trunk/src/clobber-if-diff.py (rev 0) +++ shell-tools/trunk/src/clobber-if-diff.py 2009-05-05 21:24:40 UTC (rev 1368) @@ -0,0 +1,46 @@ +#!/usr/bin/env python + +# TODO: test more thoroughly! +# TODO: use SpooledTemporaryFile, new in Python 2.6 + +from __future__ import with_statement +import sys, optparse, subprocess, shutil, commons.startup, os.path + +def foo(inpath, outpath, opts): + if os.path.isfile(outpath): + if 0 == subprocess.call(['cmp', '-s', inpath, outpath]): + return opts.exitcode + shutil.move(inpath, outpath) + return 0 + +def main(argv): + parser = optparse.OptionParser() + parser.add_option('-b', '--buffer', type = 'int', default = 1<<20, + help = 'maximum buffer size in bytes, if reading from stdin') + parser.add_option('-t', '--tempfile', type = 'string', default = None, + help = '''path to temporary file where the candidate is to be written, if + reading from stdin; if unspecified, then generate tmp file name''') + parser.add_option('-x', '--exit-code', dest = 'exitcode', type = 'int', + default = 0, help = 'exit status if clobbering did not happen') + #parser.add_option('-d', '--debug', action = 'store_true', + #help = 'debug output') + opts, [cmd, inpath, outpath] = parser.parse_args(argv) + + if os.path.isfile(inpath): + return foo(inpath, outpath, opts) + else: + new = sys.stdin.read(opts.buffer) + if len(new) == opts.buffer: + f = tempfile.TemporaryFile() if opts.tempfile is None else file(opts.tempfile, 'w') + with f: + f.write(new) + while len(buf) == opts.buffer: + buf = sys.stdin.read(opts.buffer) + f.write(buf) + return foo(f.name if opts.tempfile is None else opts.tempfile, outpath, opts) + else: + with file(outpath) as f: old = f.read(len(new) + 1) + if old == new: return opts.exitcode + with file(outpath, 'w') as f: f.write(new) + +commons.startup.run_main() Property changes on: shell-tools/trunk/src/clobber-if-diff.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 20:11:24
|
Revision: 1367 http://assorted.svn.sourceforge.net/assorted/?rev=1367&view=rev Author: yangzhang Date: 2009-05-05 20:11:12 +0000 (Tue, 05 May 2009) Log Message: ----------- added demo of stringstream tediousness Added Paths: ----------- sandbox/trunk/src/cc/stringstream.cc Added: sandbox/trunk/src/cc/stringstream.cc =================================================================== --- sandbox/trunk/src/cc/stringstream.cc (rev 0) +++ sandbox/trunk/src/cc/stringstream.cc 2009-05-05 20:11:12 UTC (rev 1367) @@ -0,0 +1,18 @@ +// Demo that stringstreams are tedious to use. + +#include <sstream> +#include <iostream> +using namespace std; + +int main() { + // Doesn't work: + // struct basic_ostream<char, char_traits<char> >’ has no member named ‘str’ + // cout << (stringstream() << 3).str() << endl; + + // Proper way: + stringstream ss; + ss << 3; + cout << ss.str() << endl; + + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 16:11:17
|
Revision: 1366 http://assorted.svn.sourceforge.net/assorted/?rev=1366&view=rev Author: yangzhang Date: 2009-05-05 16:11:09 +0000 (Tue, 05 May 2009) Log Message: ----------- added setup-root.bash and tweaked setup-yang.bash comments Modified Paths: -------------- configs/trunk/setup-yang.bash Added Paths: ----------- configs/trunk/setup-root.bash Copied: configs/trunk/setup-root.bash (from rev 1342, configs/trunk/setup-yang.bash) =================================================================== --- configs/trunk/setup-root.bash (rev 0) +++ configs/trunk/setup-root.bash 2009-05-05 16:11:09 UTC (rev 1366) @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# Installs my personal configuration files that need to be installed as root +# (e.g. because they go in /etc/). + +. common.bash || exit 1 + +pkg=configs +. simple-setup.bash + +cd src +install etc/blockcontrol/ blockcontrol/asdfasdf Property changes on: configs/trunk/setup-root.bash ___________________________________________________________________ Added: svn:executable + * Added: svn:mergeinfo + Modified: configs/trunk/setup-yang.bash =================================================================== --- configs/trunk/setup-yang.bash 2009-05-05 16:10:44 UTC (rev 1365) +++ configs/trunk/setup-yang.bash 2009-05-05 16:11:09 UTC (rev 1366) @@ -1,6 +1,6 @@ #!/usr/bin/env bash -# Installs files that I personally use (most likely not useful to others). +# Installs my personal configuration files that go into the home directory. . common.bash || exit 1 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 16:11:00
|
Revision: 1365 http://assorted.svn.sourceforge.net/assorted/?rev=1365&view=rev Author: yangzhang Date: 2009-05-05 16:10:44 +0000 (Tue, 05 May 2009) Log Message: ----------- added blockcontrol Added Paths: ----------- configs/trunk/src/blockcontrol/ configs/trunk/src/blockcontrol/blocklists.list Added: configs/trunk/src/blockcontrol/blocklists.list =================================================================== --- configs/trunk/src/blockcontrol/blocklists.list (rev 0) +++ configs/trunk/src/blockcontrol/blocklists.list 2009-05-05 16:10:44 UTC (rev 1365) @@ -0,0 +1,81 @@ +# blocklists.list - lists the blocklists used by blockcontrol + +# Place one URL per line for every blocklist. Any line which starts with a # +# (hash) is a comment and is ignored. + +# All lists have to be in the same blocklist format. This format has to be +# specified in blockcontrol.conf. + +# For local blocklists start the line with "locallist". + +# Have a look at /usr/share/doc/blockcontrol/README.blocklists.gz for detailed +# information about some available blocklists. + +# Do a "blockcontrol reload" (or "restart" or "update") when you have edited +# this file. + + +# TBG Primary Threats +http://list.iblocklist.com/?list=ijfqtofzixtwayqovmxn + +# TBG General Corporate Ranges +http://list.iblocklist.com/?list=ecqbsykllnadihkdirsh + +# TBG Business ISPs +http://list.iblocklist.com/?list=jcjfaxgyyshvdbceroxf + +# TBG Educational Institutions +#http://list.iblocklist.com/?list=lljggjrpmefcwqknpalp + +# TBG Search Engines +http://list.iblocklist.com/?list=pfefqteoxlfzopecdtyw + +# TBG Hijacked +http://list.iblocklist.com/?list=tbnuqfclfkemqivekikv + +# TBG Bogon +http://list.iblocklist.com/?list=ewqglwibdgjttwttrinl + +# Bluetack level1 (P2P) +http://list.iblocklist.com/?list=bt_level1 + +# Bluetack level2 +http://list.iblocklist.com/?list=bt_level2 + +# Bluetack level3 +http://list.iblocklist.com/?list=bt_level3 + +# Bluetack edu +#http://list.iblocklist.com/?list=bt_edu + +# Bluetack ads +#http://list.iblocklist.com/?list=bt_ads + +# Bluetack bogon +http://list.iblocklist.com/?list=bt_bogon + +# Bluetack spyware +#http://list.iblocklist.com/?list=bt_spyware + +# Bluetack spider +#http://list.iblocklist.com/?list=bt_spider + +# Bluetack Microsoft +#http://list.iblocklist.com/?list=bt_microsoft + +# Bluetack proxy +http://list.iblocklist.com/?list=bt_proxy + +# Bluetack hijacked +http://list.iblocklist.com/?list=bt_hijacked + +# Bluetack badpeers (previously known as templist) +#http://list.iblocklist.com/?list=bt_templist + +# Bluetack rangetest +#http://list.iblocklist.com/?list=bt_rangetest + +# Bluetack dshield +http://list.iblocklist.com/?list=bt_dshield + +#locallist /etc/blockcontrol/custom-blocklist.p2p This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 16:10:45
|
Revision: 1364 http://assorted.svn.sourceforge.net/assorted/?rev=1364&view=rev Author: yangzhang Date: 2009-05-05 16:10:32 +0000 (Tue, 05 May 2009) Log Message: ----------- added more file sets to backup script Modified Paths: -------------- configs/trunk/src/cron/backup.bash Modified: configs/trunk/src/cron/backup.bash =================================================================== --- configs/trunk/src/cron/backup.bash 2009-05-05 16:09:58 UTC (rev 1363) +++ configs/trunk/src/cron/backup.bash 2009-05-05 16:10:32 UTC (rev 1364) @@ -14,6 +14,17 @@ eval `keychain --eval --nogui id_dsa 2> /dev/null` export PASSPHRASE="$( cat ~/.backup.auth )" -duplicity $args ~/personal/ scp://hv//export/home/yang/backup-zs.ath.cx -duplicity $args ~/.purple/ scp://hv//export/home/yang/purple-zs.ath.cx -duplicity $args ~/.xchat2/ scp://hv//export/home/yang/xchat-zs.ath.cx + +run-duplicity() { + duplicity $args "$1" "scp://hv//export/home/yang/backup-zs.ath.cx/$2" +} + +run-duplicity ~/personal/ backup +run-duplicity ~/.purple/ purple +run-duplicity ~/.xchat2/ xchat +run-duplicity ~/.mozilla-thunderbird/r9d4e7vh.default/Mail/'Local Folders'/ \ + thunderbird +run-duplicity ~/.zdb/ zdb +run-duplicity ~/.quodlibet/ quodlibet +run-duplicity ~/.gnupg/ gpg +run-duplicity ~/.supybot/ supybot This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 16:10:08
|
Revision: 1363 http://assorted.svn.sourceforge.net/assorted/?rev=1363&view=rev Author: yangzhang Date: 2009-05-05 16:09:58 +0000 (Tue, 05 May 2009) Log Message: ----------- added nr Modified Paths: -------------- configs/trunk/src/ssh/config Modified: configs/trunk/src/ssh/config =================================================================== --- configs/trunk/src/ssh/config 2009-05-05 06:06:16 UTC (rev 1362) +++ configs/trunk/src/ssh/config 2009-05-05 16:09:58 UTC (rev 1363) @@ -5,6 +5,10 @@ # StrictHostKeyChecking no +Host nr + HostName nr.ath.cx + User yang + Host farm1 HostName farm1.csail.mit.edu User yang This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 06:06:34
|
Revision: 1362 http://assorted.svn.sourceforge.net/assorted/?rev=1362&view=rev Author: yangzhang Date: 2009-05-05 06:06:16 +0000 (Tue, 05 May 2009) Log Message: ----------- added demo of static array sizes Added Paths: ----------- sandbox/trunk/src/c/static_array_sizes.c Added: sandbox/trunk/src/c/static_array_sizes.c =================================================================== --- sandbox/trunk/src/c/static_array_sizes.c (rev 0) +++ sandbox/trunk/src/c/static_array_sizes.c 2009-05-05 06:06:16 UTC (rev 1362) @@ -0,0 +1,11 @@ +// Neither of the following works +#if 0 +typedef int[3] blah; blah foo() { int xs[3] = {0,1,2}; return xs; } +int[3] foo() { int xs[3] = {0,1,2}; return xs; } +int main() { int xs[3] = foo(); return 0; } +#endif + +// However, the following works +struct c { int xs[3]; }; +struct c foo() { struct c x = {{0,1,2}}; return x; } +int main() { struct c x = foo(); return x.xs[0]; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 05:43:55
|
Revision: 1361 http://assorted.svn.sourceforge.net/assorted/?rev=1361&view=rev Author: yangzhang Date: 2009-05-05 05:43:42 +0000 (Tue, 05 May 2009) Log Message: ----------- added demo of sha1 hashing using openssl Added Paths: ----------- sandbox/trunk/src/c/sha1.c Added: sandbox/trunk/src/c/sha1.c =================================================================== --- sandbox/trunk/src/c/sha1.c (rev 0) +++ sandbox/trunk/src/c/sha1.c 2009-05-05 05:43:42 UTC (rev 1361) @@ -0,0 +1,20 @@ +#include <openssl/sha.h> +#include <stdio.h> +void run(const char *data, size_t len) { + SHA_CTX c; + unsigned char md[SHA_DIGEST_LENGTH]; + SHA1_Init(&c); + SHA1_Update(&c, data, len); + SHA1_Final(md, &c); + int i; + for (i = 0; i < SHA_DIGEST_LENGTH; ++i) + printf("%02x ", md[i]); + printf("\n"); +} +int main() { + run("hello", 5); + run("hello", 6); + run("hello\n", 6); + run("hello\n", 7); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-05-05 05:02:38
|
Revision: 1360 http://assorted.svn.sourceforge.net/assorted/?rev=1360&view=rev Author: yangzhang Date: 2009-05-05 05:02:25 +0000 (Tue, 05 May 2009) Log Message: ----------- added demo of files Added Paths: ----------- sandbox/trunk/src/c/files.c Added: sandbox/trunk/src/c/files.c =================================================================== --- sandbox/trunk/src/c/files.c (rev 0) +++ sandbox/trunk/src/c/files.c 2009-05-05 05:02:25 UTC (rev 1360) @@ -0,0 +1,13 @@ +// Demo of flags/modes. + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> +int main() { + // Equiv: int fd = open("/tmp/ASDF", O_CREAT|O_TRUNC|O_WRONLY, 0644); + int fd = creat("/tmp/ASDF", 0644); + write(fd, "hello\n", 6); + close(fd); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-30 20:39:21
|
Revision: 1359 http://assorted.svn.sourceforge.net/assorted/?rev=1359&view=rev Author: yangzhang Date: 2009-04-30 20:39:12 +0000 (Thu, 30 Apr 2009) Log Message: ----------- renamed getbit, setbit to get_bit, set_bit; added whole_units Modified Paths: -------------- cpp-commons/trunk/src/commons/memory.h cpp-commons/trunk/src/commons/versioned_heap.h Added Paths: ----------- cpp-commons/trunk/src/test/memory.cc Modified: cpp-commons/trunk/src/commons/memory.h =================================================================== --- cpp-commons/trunk/src/commons/memory.h 2009-04-30 15:56:53 UTC (rev 1358) +++ cpp-commons/trunk/src/commons/memory.h 2009-04-30 20:39:12 UTC (rev 1359) @@ -2,6 +2,7 @@ #define COMMONS_MEMORY_H #include <cstring> // for size_t +#include <commons/utility.h> // for UNUSED namespace commons { @@ -61,22 +62,32 @@ void skip(size_t n) { p_ += n; } }; - size_t nbits2nints(size_t nbits) { + /** Round `value` up to the nearest multiple of `unit` and return this + * factor. */ + template<typename T> + T whole_units(T value, T unit) { + return (value + unit - 1) / unit; + } + + /** Number of 32-bit ints to hold the given number of bits. */ + UNUSED static size_t nbits2nints(size_t nbits) { return (nbits + 31) / 32; } template<typename T> - bool getbit(T x, size_t bit) { + bool get_bit(T x, size_t bit) { return (x >> bit) & 1; } + /** Note: there exists a setbit macro in /usr/include/sys/param.h */ template<typename T> - T setbit(T x, size_t bit) { + T set_bit(T x, size_t bit) { return x | (1U << bit); } + /** Note: there exists a setbit macro in /usr/include/sys/param.h */ template<typename T> - T setbit(T x, size_t bit, T val) { + T set_bit(T x, size_t bit, T val) { return x | (val << bit); } Modified: cpp-commons/trunk/src/commons/versioned_heap.h =================================================================== --- cpp-commons/trunk/src/commons/versioned_heap.h 2009-04-30 15:56:53 UTC (rev 1358) +++ cpp-commons/trunk/src/commons/versioned_heap.h 2009-04-30 20:39:12 UTC (rev 1359) @@ -1,5 +1,5 @@ -#ifndef VERSIONED_HEAP_H -#define VERSIONED_HEAP_H +#ifndef COMMONS_VERSIONED_HEAP_H +#define COMMONS_VERSIONED_HEAP_H #define _POSIX_C_SOURCE 200112L #include <cstdlib> @@ -136,7 +136,7 @@ for (size_t i = 0; i < freesize; ++i) { uint32_t val = 0; for (size_t j = 0; j < 32 && 32 * i + j < freemap_.size(); ++j) { - val = setbit(val, j, uint32_t(freemap_[32 * i + j])); + val = set_bit(val, j, uint32_t(freemap_[32 * i + j])); } w.write(val); } @@ -165,7 +165,7 @@ uint32_t val; r.read(val); for (size_t j = 0; j < nbits && 32 * i + j < nbits; ++j) { - freemap_.push_back(getbit(val, j)); + freemap_.push_back(get_bit(val, j)); } } Added: cpp-commons/trunk/src/test/memory.cc =================================================================== --- cpp-commons/trunk/src/test/memory.cc (rev 0) +++ cpp-commons/trunk/src/test/memory.cc 2009-04-30 20:39:12 UTC (rev 1359) @@ -0,0 +1,13 @@ +#include <commons/memory.h> +#include "test.h" + +TEST(memory, whole_units) { + EXPECT_EQ(0, whole_units(0, 100)); + EXPECT_EQ(1, whole_units(1, 100)); + EXPECT_EQ(1, whole_units(2, 100)); + EXPECT_EQ(1, whole_units(98, 100)); + EXPECT_EQ(1, whole_units(99, 100)); + EXPECT_EQ(1, whole_units(100, 100)); + EXPECT_EQ(2, whole_units(101, 100)); + EXPECT_EQ(2, whole_units(102, 100)); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-30 15:57:03
|
Revision: 1358 http://assorted.svn.sourceforge.net/assorted/?rev=1358&view=rev Author: yangzhang Date: 2009-04-30 15:56:53 +0000 (Thu, 30 Apr 2009) Log Message: ----------- added serialization/deserialization to versioned_heap; added nbits2nints, getbit, setbit Modified Paths: -------------- cpp-commons/trunk/src/commons/memory.h cpp-commons/trunk/src/commons/versioned_heap.h cpp-commons/trunk/src/test/Makefile cpp-commons/trunk/src/test/versioned_heap.cc Modified: cpp-commons/trunk/src/commons/memory.h =================================================================== --- cpp-commons/trunk/src/commons/memory.h 2009-04-29 20:45:50 UTC (rev 1357) +++ cpp-commons/trunk/src/commons/memory.h 2009-04-30 15:56:53 UTC (rev 1358) @@ -61,6 +61,25 @@ void skip(size_t n) { p_ += n; } }; + size_t nbits2nints(size_t nbits) { + return (nbits + 31) / 32; + } + + template<typename T> + bool getbit(T x, size_t bit) { + return (x >> bit) & 1; + } + + template<typename T> + T setbit(T x, size_t bit) { + return x | (1U << bit); + } + + template<typename T> + T setbit(T x, size_t bit, T val) { + return x | (val << bit); + } + } #endif Modified: cpp-commons/trunk/src/commons/versioned_heap.h =================================================================== --- cpp-commons/trunk/src/commons/versioned_heap.h 2009-04-29 20:45:50 UTC (rev 1357) +++ cpp-commons/trunk/src/commons/versioned_heap.h 2009-04-30 15:56:53 UTC (rev 1358) @@ -5,6 +5,7 @@ #include <cstdlib> #include <vector> #include <boost/foreach.hpp> +#include <commons/memory.h> #define foreach BOOST_FOREACH namespace commons @@ -25,42 +26,69 @@ struct hdr { version_type version; uint32_t index; + bool managed; }; versioned_heap(size_t pgsz = 131072) : - pgsz_(pgsz), pgcnt_((pgsz - sizeof(hdr)) / sizeof(T)) {} + last_i_(0), pgsz_(pgsz), pgcnt_((pgsz - sizeof(hdr)) / sizeof(T)) {} + /** Frees all pages, without destroying the objects. */ ~versioned_heap() { - foreach (char *page, pages_) ::free(page); + foreach (char *page, pages_) + if (hdrof(page).managed) + ::free(page); } - T &construct(version_type version = 0) { - for (size_t i = 0; i < freemap_.size(); ++i) { + char *alloc() { + for (size_t counter = 1; counter <= freemap_.size(); ++counter) { + size_t i = (last_i_ + counter) % freemap_.size(); if (freemap_[i]) { freemap_[i] = false; char *pos = pages_[i / pgcnt_] + sizeof(hdr) + (i % pgcnt_) * sizeof(T); - if (version == 0) ++hdrof(pos).version; - else hdrof(pos).version = version; - return *new (pos) T; + ++hdrof(pos).version; + last_i_ = i; + return pos; } } char *page; posix_memalign(reinterpret_cast<void**>(&page), pgsz_, pgsz_); hdr &h = *reinterpret_cast<hdr*>(page); - h.version = version; + h.version = 0; h.index = uint32_t(pages_.size()); + h.managed = true; pages_.push_back(page); freemap_.resize(freemap_.size() + pgcnt_, true); freemap_[freemap_.size() - pgcnt_] = false; - return *new (page + sizeof(hdr)) T; + return page + sizeof(hdr); } - void free(T &x, version_type version = 0) { - x.~T(); - hdr &h = hdrof(&x); + T &construct() { + char *buf = alloc(); + try { return *new (buf) T; } + catch (...) { free(buf); throw; } + } + // TODO: replace with code gen + template<typename A0> + T &construct(const A0 &a0) { + char *buf = alloc(); + try { return *new (buf) T(a0); } + catch (...) { free(buf); throw; } + } + template<typename A0, typename A1> + T &construct(const A0 &a0, const A1 &a1) { + char *buf = alloc(); + try { return *new (buf) T(a0, a1); } + catch (...) { free(buf); throw; } + } + void free(void *p, version_type version = 0) { + hdr &h = hdrof(p); if (version == 0) ++h.version; else h.version = version; - char *px = reinterpret_cast<char*>(&x); + char *px = reinterpret_cast<char*>(p); char *p0 = reinterpret_cast<char*>(&h + 1); freemap_[h.index * pgcnt_ + (px - p0) / sizeof(T)] = true; } + void destroy(T &x, version_type version = 0) { + x.~T(); + free(&x, version); + } void touch(T &p) { ++hdrof(&p).version; } void touch(T &p, version_type version) { hdrof(&p).version = version; } size_t pgcnt() const { return pgcnt_; } @@ -73,13 +101,115 @@ hdr &hdrof(void *p) { return *reinterpret_cast<hdr*>(uintptr_t(p) & ~(pgsz_ - 1)); } + + /** + * Calculate the required space for serializing the metadata. + */ + size_t metasize() const { + return sizeof last_i_ + sizeof pgsz_ + sizeof pgcnt_ + + sizeof freemap_.size() + + nbits2nints(freemap_.size()) * sizeof(uint32_t) + + sizeof datasize(*this); + } + + /** + * Calculate the required space for serializing the metadata and data. + */ + size_t sersize() const { + return metasize() + datasize(*this); + } + + /** + * Serializes metadata and data to out buffer. + */ + void ser(void *meta, void *data) const { + raw_writer w(meta); + + // Serialize metadata. + w.write(last_i_); + w.write(pgsz_); + w.write(pgcnt_); + + // Serialize freemap. + w.write(freemap_.size()); + size_t freesize = nbits2nints(freemap_.size()); + for (size_t i = 0; i < freesize; ++i) { + uint32_t val = 0; + for (size_t j = 0; j < 32 && 32 * i + j < freemap_.size(); ++j) { + val = setbit(val, j, uint32_t(freemap_[32 * i + j])); + } + w.write(val); + } + + // Serialize data. + w.write(datasize(*this)); + serdata(*this, data); + } + + /** + * Deserializes a heap in-place. + */ + void deser(void *meta, void *data) { + raw_reader r(meta); + + // Deserialize metadata. + r.read(last_i_); + r.read(pgsz_); + r.read(pgcnt_); + + // Deserialize freemap. + size_t nbits; + r.read(nbits); + size_t nints = nbits2nints(nbits); + for (size_t i = 0; i < nints; ++i) { + uint32_t val; + r.read(val); + for (size_t j = 0; j < nbits && 32 * i + j < nbits; ++j) { + freemap_.push_back(getbit(val, j)); + } + } + + // Deserialize data. + size_t datasize; + r.read(datasize); + char *p0 = reinterpret_cast<char*>(data); + for (char *p = p0; p < p0 + datasize; p += pgsz_) { + hdrof(p).managed = false; + pages_.push_back(p); + } + } + private: + size_t last_i_; size_t pgsz_; size_t pgcnt_; vector<bool> freemap_; vector<char*> pages_; }; + /** + * Calculate the data size of a heap. + */ + template<typename T> + size_t datasize(const versioned_heap<T> &h) + { + return h.pages().size() * h.pgsz(); + } + + /** + * Copies pages from the heap into the out buffer. Assumes out is big enough + * (see datasize). + */ + template<typename T> + void serdata(const versioned_heap<T> &h, void *out) + { + char *p = reinterpret_cast<char*>(out); + foreach (void *page, h.pages()) { + memcpy(p, page, h.pgsz()); + p += h.pgsz(); + } + } + } #undef foreach Modified: cpp-commons/trunk/src/test/Makefile =================================================================== --- cpp-commons/trunk/src/test/Makefile 2009-04-29 20:45:50 UTC (rev 1357) +++ cpp-commons/trunk/src/test/Makefile 2009-04-30 15:56:53 UTC (rev 1358) @@ -1,4 +1,5 @@ CXXFLAGS = \ + -MD \ -g3 \ -Wall \ -Werror \ @@ -46,3 +47,5 @@ squeue: LDLIBS += -lboost_thread-gcc43-mt .PHONY: all build clean + +-include *.d Modified: cpp-commons/trunk/src/test/versioned_heap.cc =================================================================== --- cpp-commons/trunk/src/test/versioned_heap.cc 2009-04-29 20:45:50 UTC (rev 1357) +++ cpp-commons/trunk/src/test/versioned_heap.cc 2009-04-30 15:56:53 UTC (rev 1358) @@ -1,3 +1,5 @@ +#include <commons/check.h> +#include <commons/unique_ptr.h> #include <commons/versioned_heap.h> #include "test.h" @@ -4,30 +6,31 @@ struct s { char xs[40000]; }; TEST(versioned_heap, basics) { - versioned_heap<s> h; + typedef versioned_heap<s> heap; + heap h; s &a = h.construct(); - EXPECT_EQ(h.hdrof(&a).version, 0); + EXPECT_EQ(0, h.hdrof(&a).version); s &b = h.construct(); - EXPECT_EQ(h.hdrof(&b).version, 1); + EXPECT_EQ(1, h.hdrof(&b).version); s &c = h.construct(); - EXPECT_EQ(h.hdrof(&c).version, 2); + EXPECT_EQ(2, h.hdrof(&c).version); EXPECT_EQ(&a + 1, &b); EXPECT_EQ(&b + 1, &c); s &d = h.construct(); - EXPECT_EQ(h.hdrof(&d).version, 0); + EXPECT_EQ(0, h.hdrof(&d).version); s &e = h.construct(); - EXPECT_EQ(h.hdrof(&e).version, 1); + EXPECT_EQ(1, h.hdrof(&e).version); s &f = h.construct(); - EXPECT_EQ(h.hdrof(&f).version, 2); + EXPECT_EQ(2, h.hdrof(&f).version); EXPECT_EQ(&d + 1, &e); EXPECT_EQ(&e + 1, &f); h.touch(d); EXPECT_EQ(h.hdrof(&d).version, 3); - h.free(d); + h.destroy(d); EXPECT_EQ(h.hdrof(&d).version, 4); h.touch(a, 4); @@ -37,4 +40,22 @@ s &g = h.construct(); EXPECT_EQ(&d, &g); + + void *meta = new char[h.metasize()], *data; + check0x(posix_memalign(&data, h.pgsz(), datasize(h))); + unique_ptr<char[]> umeta(reinterpret_cast<char*>(meta)), + udata(reinterpret_cast<char*>(data)); + h.ser(meta, data); + + heap h2; + h2.deser(meta, data); + + EXPECT_EQ(h.pages().size(), h2.pages().size()); + for (size_t i = 0; i < h.pages().size(); ++i) { + s *p = reinterpret_cast<s*>(h .pages()[i] + sizeof(heap::hdr)); + s *q = reinterpret_cast<s*>(h2.pages()[i] + sizeof(heap::hdr)); + for (size_t j = 0; j < h.pgcnt(); ++j) { + EXPECT_EQ(0, memcmp(p, q, sizeof(s))); + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 20:45:54
|
Revision: 1357 http://assorted.svn.sourceforge.net/assorted/?rev=1357&view=rev Author: yangzhang Date: 2009-04-29 20:45:50 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added demo of default equality operator (or lack thereof) Added Paths: ----------- sandbox/trunk/src/cc/default_equality.cc Added: sandbox/trunk/src/cc/default_equality.cc =================================================================== --- sandbox/trunk/src/cc/default_equality.cc (rev 0) +++ sandbox/trunk/src/cc/default_equality.cc 2009-04-29 20:45:50 UTC (rev 1357) @@ -0,0 +1,18 @@ +// Is there a default comparator? No! + +class s { public: char xs[40]; }; +class t { public: char xs[40]; bool operator==(const t &) { return true; } }; + +int main() { + { + s a __attribute__((unused)); + s b __attribute__((unused)); + // Doesn't work: + // return a == b; + } + { + t a; + t b; + return a == b; + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 05:26:48
|
Revision: 1356 http://assorted.svn.sourceforge.net/assorted/?rev=1356&view=rev Author: yangzhang Date: 2009-04-29 05:26:32 +0000 (Wed, 29 Apr 2009) Log Message: ----------- updated documentation Modified Paths: -------------- cpp-commons/trunk/README Modified: cpp-commons/trunk/README =================================================================== --- cpp-commons/trunk/README 2009-04-29 05:25:41 UTC (rev 1355) +++ cpp-commons/trunk/README 2009-04-29 05:26:32 UTC (rev 1356) @@ -24,7 +24,8 @@ - check macros (like assertions but never removed from compilation) - `deque`: simpler deque implementation that uses coarse-grained allocation - error handling, such as `die()`, which leverages `strerror` -- file I/O utilities, such as reading complete files +- file I/O utilities, such as reading complete files or common operations like + finding file sizes - function delegates (for use with C functions that take `(void*)(void*)`) - generic binary stream readers and writers that are more efficient than std::streambuf @@ -41,6 +42,8 @@ - utilities for streams - utilities for [tamer] - x86 architecture-specific tools +- object memory pool with coarse-grained versioned pages +- C++ abstractions for `mmap` Third-party code: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 05:25:50
|
Revision: 1355 http://assorted.svn.sourceforge.net/assorted/?rev=1355&view=rev Author: yangzhang Date: 2009-04-29 05:25:41 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added mmap abstractions Added Paths: ----------- cpp-commons/trunk/src/commons/mmap.h Added: cpp-commons/trunk/src/commons/mmap.h =================================================================== --- cpp-commons/trunk/src/commons/mmap.h (rev 0) +++ cpp-commons/trunk/src/commons/mmap.h 2009-04-29 05:25:41 UTC (rev 1355) @@ -0,0 +1,44 @@ +#ifndef COMMONS_MMAP_H +#define COMMONS_MMAP_H + +#include <commons/check.h> +#include <commons/closing.h> +#include <commons/nullptr.h> +#include <commons/files.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <fcntl.h> + +namespace commons { + +enum { prot_read = PROT_READ }; +enum { map_private = MAP_PRIVATE }; +const void *map_failed = reinterpret_cast<void*>(-1); + +class mmapping +{ +public: + explicit mmapping(const char *path) + : fd_(checknnegerr(open(path, O_RDONLY))) + { + len_ = file_size(fd_); + map_ = checkpass(mmap(nullptr, len_, prot_read, map_private, fd_, 0)); + checkneq(map_, map_failed); + } + + ~mmapping() { if (map_ != nullptr) check0x(munmap(map_, len_)); } + int fd() const { return fd_; } + void *get() const { return map_; } + size_t len() const { return len_; } + void release() { map_ = nullptr; fd_.release(); } + +private: + closingfd fd_; + size_t len_; + void *map_; +}; + +} + +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 05:24:34
|
Revision: 1354 http://assorted.svn.sourceforge.net/assorted/?rev=1354&view=rev Author: yangzhang Date: 2009-04-29 05:24:26 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added tests for versioned_heap and squeue Added Paths: ----------- cpp-commons/trunk/src/test/squeue.cc cpp-commons/trunk/src/test/versioned_heap.cc Added: cpp-commons/trunk/src/test/squeue.cc =================================================================== --- cpp-commons/trunk/src/test/squeue.cc (rev 0) +++ cpp-commons/trunk/src/test/squeue.cc 2009-04-29 05:24:26 UTC (rev 1354) @@ -0,0 +1,14 @@ +#include <commons/squeue.h> +#include <commons/unique_ptr.h> +#include "test.h" + +TEST(squeue, move) { + concurrent_queue<unique_ptr<int> > q; + q.push(unique_ptr<int>(new int(0))); + q.push(unique_ptr<int>(new int(1))); + unique_ptr<int> p1; + q.pop(p1); + EXPECT_EQ(0, *p1); + unique_ptr<int> p2 = q.take(); + EXPECT_EQ(1, *p2); +} Added: cpp-commons/trunk/src/test/versioned_heap.cc =================================================================== --- cpp-commons/trunk/src/test/versioned_heap.cc (rev 0) +++ cpp-commons/trunk/src/test/versioned_heap.cc 2009-04-29 05:24:26 UTC (rev 1354) @@ -0,0 +1,40 @@ +#include <commons/versioned_heap.h> +#include "test.h" + +struct s { char xs[40000]; }; + +TEST(versioned_heap, basics) { + versioned_heap<s> h; + + s &a = h.construct(); + EXPECT_EQ(h.hdrof(&a).version, 0); + s &b = h.construct(); + EXPECT_EQ(h.hdrof(&b).version, 1); + s &c = h.construct(); + EXPECT_EQ(h.hdrof(&c).version, 2); + EXPECT_EQ(&a + 1, &b); + EXPECT_EQ(&b + 1, &c); + + s &d = h.construct(); + EXPECT_EQ(h.hdrof(&d).version, 0); + s &e = h.construct(); + EXPECT_EQ(h.hdrof(&e).version, 1); + s &f = h.construct(); + EXPECT_EQ(h.hdrof(&f).version, 2); + EXPECT_EQ(&d + 1, &e); + EXPECT_EQ(&e + 1, &f); + + h.touch(d); + EXPECT_EQ(h.hdrof(&d).version, 3); + + h.free(d); + EXPECT_EQ(h.hdrof(&d).version, 4); + + h.touch(a, 4); + foreach (char *page, h.pages()) { + EXPECT_EQ(h.hdrof(page).version, 4); + } + + s &g = h.construct(); + EXPECT_EQ(&d, &g); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2009-04-29 05:24:17
|
Revision: 1353 http://assorted.svn.sourceforge.net/assorted/?rev=1353&view=rev Author: yangzhang Date: 2009-04-29 05:24:10 +0000 (Wed, 29 Apr 2009) Log Message: ----------- added channel test to st tests Modified Paths: -------------- cpp-commons/trunk/src/test/st.cc Modified: cpp-commons/trunk/src/test/st.cc =================================================================== --- cpp-commons/trunk/src/test/st.cc 2009-04-29 04:28:27 UTC (rev 1352) +++ cpp-commons/trunk/src/test/st.cc 2009-04-29 05:24:10 UTC (rev 1353) @@ -7,3 +7,23 @@ ch.push(unique_ptr<int>(new int(0))); unique_ptr<int> p = ch.take(); } + +struct a { + a() {} + a(const a &) { ADD_FAILURE() << "shouldn't be copying"; } + a(a &&) {} + void operator=(const a &) { ADD_FAILURE() << "shouldn't be copying"; } + void operator=(a &&) {} + void f() {} +}; + +TEST(st, channel2) { + st_channel<a> ch; + ch.push(a()); + a s; + ch.push(move(s)); + + a x = ch.take(); + x.f(); + ch.take(); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |