[Assorted-commits] SF.net SVN: assorted:[1418] ydb/trunk/src/tpcc
Brought to you by:
yangzhang
From: <yan...@us...> - 2009-05-15 06:59:07
|
Revision: 1418 http://assorted.svn.sourceforge.net/assorted/?rev=1418&view=rev Author: yangzhang Date: 2009-05-15 06:58:54 +0000 (Fri, 15 May 2009) Log Message: ----------- - added ser_partial, deser_partial, and {remove, write, read} for file ser/deser to tpcctables Modified Paths: -------------- ydb/trunk/src/tpcc/tpcctables.cc.cog ydb/trunk/src/tpcc/tpcctables.h Modified: ydb/trunk/src/tpcc/tpcctables.cc.cog =================================================================== --- ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-15 06:57:32 UTC (rev 1417) +++ ydb/trunk/src/tpcc/tpcctables.cc.cog 2009-05-15 06:58:54 UTC (rev 1418) @@ -1,6 +1,5 @@ //[[[cog // allfields = ''' -// items // warehouses // stock // districts @@ -9,6 +8,7 @@ // orderlines // neworders // history +// items // '''.split() // treepairs = [ p.split('/') for p in ''' // warehouses/Warehouse @@ -37,17 +37,21 @@ #include "tpcctables.h" #include <algorithm> -#include <limits> -#include <vector> - +#include <boost/foreach.hpp> #include <commons/assert.h> #include <commons/check.h> +#include <commons/closing.h> +#include <commons/files.h> #include <commons/memory.h> #include <commons/versioned_heap.h> +#include <fcntl.h> #include <iostream> +#include <limits> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <vector> -#include <boost/foreach.hpp> - #ifdef DO_DUMP #include <fstream> // XXX #include <boost/lexical_cast.hpp> // XXX @@ -1036,9 +1040,10 @@ #if 0 void compare_summaries(const commons::array<char> &a, const commons::array<char> &b) { - raw_reader r(a); - tpcc_recovery_header hdr; - r.read(hdr); + raw_reader r(a), s(b); + tpcc_recovery_header ahdr, bhdr; + r.read(ahdr); + s.read(bhdr); //[[[cog // for name, struct in heappairs: // cog.outl(r''' @@ -1051,11 +1056,245 @@ //]]] //[[[end]]] } +#endif -TPCCTables::restore_partial(sizeof hdr) const +/** + * \param[in] summary Another heap's summary. + */ +commons::array<char> +TPCCTables::ser_partial(const commons::array<char> &summary, + int mypos, int nnodes, int seqno) const { - restore_partial + mypos = nnodes; // TODO use + using namespace std; + + // + // Deserialize summary header. + // + + raw_reader sreader(summary); + tpcc_recovery_header shdr; + sreader.read(shdr); + + // + // Serialize header. + // + + tpcc_recovery_header hdr; + bzero(&hdr, sizeof hdr); + hdr.seqno = seqno; + size_t metalen = 0, datalen = 0; + //[[[cog + // for name in allfields: + // cog.outl(r'hdr.n%s = uint32_t(%s_.size());' % (name, name)) + // for name, struct in heappairs: + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // metalen += heap_%(name)s.metasize(); + // datalen += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + hdr.headlen = uint32_t(whole_units(sizeof hdr + metalen, pgsz()) * pgsz()); + hdr.len = uint32_t(hdr.headlen + datalen + sizeof(Item) * hdr.nitems); + void *raw_arr; + check0x(posix_memalign(&raw_arr, pgsz(), hdr.len)); + + // + // Serialize metas and datas. + // + + commons::array<char> arr(reinterpret_cast<char*>(raw_arr), hdr.len); + raw_writer writer(arr), dwriter(arr + hdr.headlen); + writer.write(hdr); + datalen = 0; + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // cout << "partially serializing %(name)s..." << flush; + // heap_%(name)s.sermeta(writer.ptr()); + // writer.skip(heap_%(name)s.metasize()); + // hdr.n%(name)s = uint32_t(serdata_partial(heap_%(name)s, dwriter.ptr(), sreader, shdr.n%(name)s)); + // dwriter.skip(hdr.n%(name)s * pgsz()); + // cout << "serialized " << hdr.n%(name)s << " of " << heap_%(name)s.pages().size() << endl; + // datalen += hdr.n%(name)s * pgsz(); + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + hdr.len = uint32_t(hdr.headlen + datalen + sizeof(Item) * hdr.nitems); + memput(arr, hdr); + + cout << "partially serializing items" << endl; + foreach (const Item &item, items_) { + dwriter.write(item); + } + + assert(arr.end() >= dwriter.ptr()); + + dump(neworders_, "ser", seqno); + + return arr; } -#endif +void TPCCTables::deser_partial(int mypos, int nnodes, + const tpcc_recovery_header &hdr, + const commons::array<char> &arr) +{ + mypos = nnodes; // TODO use + using namespace std; + typedef typeof(warehouses_) tree_warehouses; + + // This needs to be cleared because it's just a simple log to which things + // are appended. + history_.clear(); + history_.reserve(hdr.nhistory); + // This need to be cleared because things are erased from the map. + neworders_.clear(); + // This needs to be cleared because the hash is on the pointer. + customers_by_name_.clear(); + + char *meta = arr + sizeof hdr, *data = arr + hdr.headlen; + + // XXX determine which objects are actually live + + //[[[cog + // typedefs() + // for name, struct in heappairs: + // cbn = ( r'customers_by_name_.insert(val);' + // if name == 'customers' else '' ) + // obc = ( r'orders_by_customer_.insert(obc_keyof(*val), val);' + // if name == 'orders' else '' ) + // if name == 'neworders': + // insertion = r'%(name)s_.insert(make_pair(keyof(*val), val));' + // elif name == 'history': + // insertion = r'%(name)s_.push_back(val);' + // else: + // insertion = r'%(name)s_.insert(keyof(*val), val);' + // + // cbn = cbn % {'name': name, 'struct': struct} + // obc = obc % {'name': name, 'struct': struct} + // insertion = insertion % {'name': name, 'struct': struct} + // + // cog.outl(r''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // heap_%(name)s.deser_partial(meta, data, hdr.n%(name)s); + // + // // TODO scan over only the serialied portion? + // for (versioned_heap<%(struct)s>::iterator iter_%(name)s = heap_%(name)s.begin(); + // iter_%(name)s.cur() != nullptr; iter_%(name)s.next()) { + // %(struct)s *val = iter_%(name)s.cur(); + // %(insertion)s + // %(cbn)s%(obc)s + // } + // + // meta += heap_%(name)s.metasize(); + // data += datasize(heap_%(name)s); + // ''' % {'name': name, 'struct': struct, 'cbn': cbn, 'obc': obc, + // 'insertion': insertion}) + //]]] + //[[[end]]] + + raw_reader reader(data); + // This needs to be cleared because it's just a simple vector of randomly + // generated items (the set is static). + items_.clear(); + items_.reserve(hdr.nitems); + for (uint32_t i = 0; i < hdr.nitems; ++i) { + items_.push_back(reader.read<Item>()); + } + + serbuf_.reset(arr.get(), arr.size()); + + dump(neworders_, "deser", hdr.seqno); +} + +void +TPCCTables::remove(const string &basepath) const +{ + try_rm(basepath.c_str()); + //[[[cog + // for name, struct in heappairs: + // cog.outl(r''' + // try_rm((basepath + "_%(name)s").c_str()); + // ''' % {'name':name,'struct':struct}) + //]]] + //[[[end]]] + try_rm((basepath+"_items").c_str()); +} + +/** + * Updates the files starting at basepath. If the files already exist their + * contents must consist of older versions of these tables! + */ +void +TPCCTables::write(const string &basepath, const commons::array<char> &data) const +{ + tpcc_recovery_header hdr; + memget(data, hdr); + char *p = data + hdr.headlen; + closingfd fd(checknnegerr(creat(basepath.c_str(), 0644))); + write_file(fd, data, hdr.headlen); + + //[[[cog + // for name, struct in heappairs: + // cog.outl(''' + // versioned_heap<%(struct)s> &heap_%(name)s = heaps<%(struct)s>::heap; + // { + // closingfd fd(checknnegerr(open((basepath + "_%(name)s").c_str(), O_CREAT | O_WRONLY, 0644))); + // for (size_t i = 0; i < hdr.n%(name)s; ++i) { + // off_t pos = heap_%(name)s.hdrof(p).index * pgsz(); + // checkeq(lseek(fd, pos, 0), pos); + // write_file(fd, p, pgsz()); + // p += pgsz(); + // } + // } + // ''' % {'name': name, 'struct': struct}) + //]]] + //[[[end]]] + + { + closingfd fd(checknnegerr(open((basepath + "_items").c_str(), O_CREAT | O_WRONLY, 0644))); + foreach (const Item &item, items_) { + write_file(fd, &item, sizeof(Item)); + } + } +} + +commons::array<char> +TPCCTables::read(const string &basepath) +{ + // Prepare buffer. + closingfd fd(checknnegerr(open(basepath.c_str(), O_RDONLY))); + size_t fsz = file_size(fd), tot_fsz = fsz; + //[[[cog + // for name in allfields: + // cog.outl(r''' + // closingfd fd_%(name)s(checknnegerr(open((basepath + "_%(name)s").c_str(), O_RDONLY))); + // size_t fsz_%(name)s = file_size(fd_%(name)s); + // tot_fsz += fsz_%(name)s; + // ''' % {'name': name}) + //]]] + //[[[end]]] + char *rawbuf; + check0x(posix_memalign(reinterpret_cast<void**>(&rawbuf), pgsz(), tot_fsz)); + + // Read. + char *p = rawbuf; + checkeqnneg(::read(fd, p, fsz), ssize_t(fsz)); + p += fsz; + //[[[cog + // for name in allfields: + // cog.outl(r''' + // { + // checkeqnneg( ::read(fd_%(name)s, p, fsz_%(name)s), + // ssize_t(fsz_%(name)s) ); + // p += fsz_%(name)s; + // } + // ''' % {'name': name}) + //]]] + //[[[end]]] + + return commons::array<char>(rawbuf, tot_fsz); +} + // vim:ft=cpp Modified: ydb/trunk/src/tpcc/tpcctables.h =================================================================== --- ydb/trunk/src/tpcc/tpcctables.h 2009-05-15 06:57:32 UTC (rev 1417) +++ ydb/trunk/src/tpcc/tpcctables.h 2009-05-15 06:58:54 UTC (rev 1418) @@ -92,8 +92,16 @@ commons::array<char> ser(int mypos, int nnodes, int seqno) const; void deser(int mypos, int nnodes, const tpcc_recovery_header &hdr, const commons::array<char> &arr); + commons::array<char> ser_partial(const commons::array<char> &summary, + int mypos, int nnodes, int seqno) const; + void deser_partial(int mypos, int nnodes, const tpcc_recovery_header &hdr, + const commons::array<char> &arr); commons::array<char> summarize() const; + void remove(const string &basepath) const; + void write(const string &basepath, const commons::array<char> &data) const; + commons::array<char> read(const string &basepath); + static const int KEYS_PER_INTERNAL = 8; static const int KEYS_PER_LEAF = 8; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |