[Assorted-commits] SF.net SVN: assorted: [326] hash-join/trunk/src
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-06 16:01:33
|
Revision: 326 http://assorted.svn.sourceforge.net/assorted/?rev=326&view=rev Author: yangzhang Date: 2008-02-06 08:01:30 -0800 (Wed, 06 Feb 2008) Log Message: ----------- things seem to be working ; performed first experiments on this version (on caneland) Modified Paths: -------------- hash-join/trunk/src/Makefile hash-join/trunk/src/hashjoin.cc Added Paths: ----------- hash-join/trunk/src/method_thread1.h Modified: hash-join/trunk/src/Makefile =================================================================== --- hash-join/trunk/src/Makefile 2008-02-06 08:41:56 UTC (rev 325) +++ hash-join/trunk/src/Makefile 2008-02-06 16:01:30 UTC (rev 326) @@ -1,7 +1,7 @@ all: hashjoin hashjoin: hashjoin.cc - g++ -g3 -Wall -o hashjoin hashjoin.cc -lprofiler + g++ -g3 -Wall -o hashjoin hashjoin.cc -lprofiler -lpthread clean: rm -f hashjoin Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 08:41:56 UTC (rev 325) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 16:01:30 UTC (rev 326) @@ -13,6 +13,8 @@ #include <pthread.h> +#include "method_thread1.h" + // // c++ commons :: numa // TODO document @@ -61,13 +63,15 @@ class timer { public: - timer() : start(current_time_millis()), last(start) {} + timer(const string label) : + label(label), start(current_time_millis()), last(start) {} void print() { long long now = current_time_millis(); - cout << now - last << endl; + cout << label << now - last << endl; last = now; } private: + const string label; long long start, last; }; @@ -130,29 +134,46 @@ }; // TODO dependency injection -const unsigned int ncpus = 2; +unsigned int ncpus = 1; typedef hash_map<const char *, const void *, hash<const char *>, eqstr> hmap; -const hmap::size_type map_size = 1000000; +const hmap::size_type map_size = 10000000; +class bucket { +public: + vector<size_t> sz; + vector<char *> bufs; +}; + class db { public: db(const char *path) { load_file(path, buf, buflen, ncpus); } + const bucket **partition(); + virtual void partition1(unsigned int pid, bucket* bucket) = 0; + virtual ~db() {} + unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char + *p, size_t nbytes); protected: // TODO smart pointer char *buf; size_t buflen; }; -class movdb : db { +class movdb : public db { public: movdb(const char *path) : db(path) {} - void build(hmap &h); + virtual ~movdb() {} + const hmap *build(const bucket **movbucs); + void build1(unsigned int pid, const bucket **movbucs, hmap *h); + void partition1(unsigned int pid, bucket* bucket); }; -class actdb : db { +class actdb : public db { public: actdb(const char *path) : db(path) {} - void probe(const hmap &h, bool show_progress); + virtual ~actdb() {} + void probe(const hmap *hs, const bucket **actbucs, bool show_progress); + void probe1(unsigned int pid, const hmap *hh, const bucket **actbucs); + void partition1(unsigned int pid, bucket* bucket); }; // template <typename T> @@ -183,107 +204,338 @@ int main(int argc, char *argv[]) { - timer t; + if (argc != 3) { + fprintf(stderr, "hashjoin <ncpus> <actresses>\n"); + exit(1); + } + ncpus = atoi(argv[1]); + const char *actresses = argv[2]; + + timer t("main time: "); + cout << "loading movies" << endl; - movdb movdb("../movie-data/movies.list"); + movdb mdb("../movie-data/movies.dat"); t.print(); cout << "loading actresses" << endl; - actdb actdb("../movie-data/actresses.list"); + actdb adb(actresses);//"../movie-data/mdactresses.dat"); t.print(); - hmap h(map_size); + cout << "hash-partitioning movies into per-core buckets" << endl; + const bucket **movbucs = mdb.partition(); + t.print(); + cout << "hash-partitioning actresses into per-core buckets" << endl; + const bucket **actbucs = adb.partition(); + t.print(); + cout << "building with movies" << endl; - movdb.build(h); + const hmap *hs = mdb.build(movbucs); t.print(); cout << "probing with actresses" << endl; - actdb.probe(h, true); + adb.probe(hs, actbucs, true); t.print(); cout << "done" << endl; } -void -movdb::build(hmap &h) +const bucket ** +db::partition() { - char *p = buf, *end = buf + buflen; - // Skip 20 lines. - p = strchrrep(p, '\n', 20); - while (p < end) { - // Search for end of title and null-terminate it. - char *title = strsep(&p, "\t"); - if (p == NULL) { - // The last line is ----------- so ignore it. - break; + // Create the buckets. + bucket **buckets = new bucket*[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + buckets[i] = new bucket[ncpus]; + for (unsigned int j = 0; j < ncpus; j++) { + int bucket_size = max(1000000UL,buflen / ncpus * 3); + // Each bucket should be twice as large as it would be given uniform + // distribution. This is just an initial size; extending can happen. + buckets[i][j].bufs.push_back(new char[bucket_size]); + buckets[i][j].sz.push_back(0); + check(buckets[i][j].bufs[0]); } - // Search for start of release date. - char *release = p + strcspn(p, "\t"); - // Insert into hash map. - h[title] = release; - // Search for next line. - char *newline = strchr(release, '\n'); - check(newline != NULL); - p = newline + 1; } + + // Partition the data into the buckets using the hash function. Reason for + // buckets: poor man's message passing. All the data from CPU i to CPU j goes + // into bucket[i][j]. + pthread_t ts[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + ts[i] = method_thread1(this, &db::partition1, i, buckets[i]); + } + for (unsigned int i = 0; i < ncpus; i++) { + void *value; + check(pthread_join(ts[i], &value) == 0); + } + + // // Now from the consumer + // for (int i = 0; i < ncpus; i++) { + // ts[i] = method_thread1( + // // XXX + // ); + // } + return const_cast<const bucket**>(buckets); // TODO why is this cast needed? } -void -join(const char *movie, const char *actress) +// XXX +//inline const char * +//unsafe_strstr(const char *p, const char *q, const char *lim) +//{ +// while (true) { +// if (lim > 0 && p >= lim) return NULL; +// p = strchr(p, '\0') + 1; +// if (lim > 0 && p >= lim) return NULL; +// if (*p == '\0') return p; +// } +//} + +inline char * +unsafe_strstr(char *p, char *q, char *lim) { - // cout << "JOINED: " << movie << " WITH " << actress << endl; + if (lim == 0) { + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0'); p++); + return p; + } + } else { + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); + if (p == lim) return NULL; + return p; + } + } } +// inline char * +// unsafe_strstr(char *p, char *q, char *lim) +// { +// return const_cast<char *>(unsafe_strstr(const_cast<const char*>(p), +// const_cast<const char*>(q), +// const_cast<const char*>(lim))); +// } + +inline char * +next_tuple(char *p) +{ + char *next = unsafe_strstr(p, "\0\0", 0); + return next == NULL ? p + strlen(p) : next + 2; +} + +/** + * \param s The string to hash. + * \param p The start of the tuple. + * \param nbytes The length of the tuple. + */ +unsigned int +db::push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes) +{ + size_t h = __stl_hash_string(s); + unsigned int bucket = h % (map_size * ncpus) / map_size; + //cout << s << " : " << bucket << endl; + //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; + int bucket_size = max(1000000UL,buflen / ncpus * 3); + if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { + memcpy(heads[bucket], s, nbytes); + heads[bucket] += nbytes; + return -1; + } else { + //cout << s << endl; + // cout << (uintptr_t)heads[bucket] << " " << nbytes << " " << (uintptr_t)bs[bucket].buf << " " << bucket_size << endl; + bs[bucket].sz.back() = heads[bucket] - bs[bucket].bufs.back(); + bs[bucket].bufs.push_back(new char[bucket_size]); + check(bs[bucket].bufs.back()); + heads[bucket] = bs[bucket].bufs.back(); + return bucket; + } +} + void -actdb::probe(const hmap &h, bool show_progress) +movdb::partition1(unsigned int pid, bucket *bs) { - char *p = buf, *end = buf + buflen; - p = strchrrep(p, '\n', 240); - int counter = 0, hits = 0, misses = 0; + // Calculate where our initial partition starts and ends (approximately -- if + // we land in the middle of a tuple, we will advance to the next one first). + char *partstart = pid == 0 ? buf : next_tuple(buf + buflen * pid / ncpus), + *partend = pid == ncpus - 1 ? + buf + buflen - 1 : + next_tuple(buf + buflen * (pid + 1) / ncpus); + cout << "cpu " << pid << " partition " << + partstart - buf << ".." << partend - buf << endl; + // Position the writing heads at the head of each bucket. (TODO find better + // name than head) + char *heads[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + heads[i] = bs[i].bufs[0]; + } + // Statistics (TODO dynamic allocation) + int counter = 0, mincount = INT_MAX; + char *p = partstart, *end = partend; while (p < end) { + // cout << "remaining: " << end - p << " " << (uintptr_t) p << " ; " << ((int) *(p-1)) << " ; " << ((int) *(p)) << " ; " << ((int) *(p+1)) << endl; + char *title = p; + char *release = strchr(p, '\0') + 1; + p = strchr(release, '\0') + 2; + // Copy this line into the correct local bucket. + if (-1 != push_bucket(heads, bs, title, title, p - title)) { + //cout << "FUCK " << heads[0] - bs[0].buf << " " << heads[1] - bs[1].buf << " " << p - title << endl; + //mincount = min(mincount, counter); + //if (mincount == counter) cout << "CRAP" << counter << endl; + //cout << "overflowed on: " << title << endl; + } counter++; - // Search for end of name and null-terminate it. - char *name = strsep(&p, "\t"); - if (p == NULL) { - // The last line is ------------ so ignore it. - break; - } - if (show_progress && counter % 100000 == 0) { - cout << hits << " " << misses << " " << name << endl; - } + } + // Record the written size of each bucket. + for (unsigned int i = 0; i < ncpus; i++) { + bs[i].sz.back() = heads[i] - bs[i].bufs.back(); + } + cout << "movie count " << counter << " vs " << bs[0].sz.back()<< endl; +} + +void +actdb::partition1(unsigned int pid, bucket *bs) +{ + // Calculate where our initial partition starts and ends (approximately -- if + // we land in the middle of a tuple, we will advance to the next one first). + char *partstart = pid == 0 ? buf : next_tuple(buf + buflen * pid / ncpus), + *partend = pid == ncpus - 1 ? + buf + buflen - 1 : + next_tuple(buf + buflen * (pid + 1) / ncpus); + cout << pid << " part " << partstart - buf << " " << partend - buf << endl; + + // Position the writing heads at the head of each bucket. (TODO find better + // name than head) + char *heads[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + heads[i] = bs[i].bufs[0]; + } + + // Statistics (TODO dynamic allocation) + int counter = 0, mincount = INT_MAX; + char *p = partstart, *end = partend; + while (p < end - 999) { + char *name = p; + p = strchr(p, '\0') + 1; + char *tuple_end = unsafe_strstr(p, "\0\0", end) + 2; while (true) { - // Search for start of movie title. - char *title = p + strspn(p, "\t"); - // Search for end of movie title and null-terminate it. - char *title_end1 = strstr(title, " "); - char *title_end2 = strchr(title, '\n'); - check(title_end2); - char *title_end = title_end1 != NULL && title_end1 < title_end2 ? title_end1 : title_end2; - *title_end = '\0'; - p = title_end + 2; - // Emit any joined tuple. - hmap::const_iterator i = h.find(title); - if (i != h.end()) { - hits++; - join(i->first, name); - } else { - misses++; + char *title = p; + p = strchr(p, '\0') + 1; + + // Copy this line into the correct local bucket. + //cout << "hashing " << title << endl; + unsigned int bbb; + if (-1 != (bbb = push_bucket(heads, bs, title, name, tuple_end - name))) { + //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; + //int bucket_size = max(1000000UL,buflen / ncpus * 3); + //cout << "FUCK " << heads[0] - bs[0].buf << " " << bucket_size << " " << heads[1] - bs[1].buf << " " << p - title << endl; + ////mincount = min(mincount, counter); + ////if (mincount == counter) cout << "CRAP" << counter << endl; + //cout << "overflowed " << bbb << " on: " << name << endl; } - // Search for next line. - char *newline = strchr(p, '\n'); - check(newline); - p = newline + 1; - // Two consecutive newlines means new actress. - if (*p == '\n') { + counter++; + + // End of tuple? + if (*p == '\0') { p++; break; - } else { - check(*p == '\t'); } } } + + // Record the written size of each bucket. + for (unsigned int i = 0; i < ncpus; i++) { + bs[i].sz.back() = heads[i] - bs[i].bufs.back(); + } + cout << "actress count " << counter << " vs " << bs[0].sz.back()<< endl; } +const hmap * +movdb::build(const bucket **movbucs) +{ + pthread_t ts[ncpus]; + hmap *hs = new hmap[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + ts[i] = method_thread1(this, &movdb::build1, i, movbucs, &hs[i]); + } + for (unsigned int i = 0; i < ncpus; i++) { + void *value; + check(pthread_join(ts[i], &value) == 0); + } + return hs; +} + +void +movdb::build1(unsigned int pid, const bucket **movbucs, hmap *hh) +{ + hmap &h = *hh; + // Visit each bucket that's destined to us (visit each source). + for (unsigned int i = 0; i < ncpus; i++) { + char *p = movbucs[i][pid].bufs[0], + *end = movbucs[i][pid].bufs[0] + movbucs[i][pid].sz[0]; + while (p < end) { + char *title = p; + char *release = strchr(p, '\0') + 1; + p = strchr(release, '\0') + 2; + // Insert into hash map. + h[title] = release; + } + //cout << "cpu " << pid << " src " << i << " cumulative h.size " << h.size() + //<< endl; + } +} + +void +join(const char *movie, const char *actress) +{ + // cout << "JOINED: " << movie << " WITH " << actress << endl; +} + +void +actdb::probe(const hmap *hs, const bucket **actbucs, bool show_progress) +{ + pthread_t ts[ncpus]; + for (unsigned int i = 0; i < ncpus; i++) { + ts[i] = method_thread1(this, &actdb::probe1, i, hs, actbucs); + } + for (unsigned int i = 0; i < ncpus; i++) { + void *value; + check(pthread_join(ts[i], &value) == 0); + } +} + +void +actdb::probe1(unsigned int pid, const hmap *hh, const bucket **actbucs) +{ + const hmap &h = *hh; + for (unsigned int i = 0; i < ncpus; i++) { + char *p = actbucs[i][pid].bufs[0], + *end = actbucs[i][pid].bufs[0] + actbucs[i][pid].sz[0]; + int hits = 0, misses = 0; + while (p < end) { + char *name = p; + p = strchr(p, '\0') + 1; + while (true) { + char *title = p; + p = strchr(p, '\0') + 1; + // cout << "name " << name << "title: " << title << p - title << endl; + // Emit any joined tuple. + if (h.find(title) != h.end()) { + //cout << " HIT" << endl; + hits++; + join(title, name); + } else { + misses++; + } + // End of tuple? + if (*p == '\0') { + p++; + break; + } + } + } + //cout << "cpu " << pid << " src " << i << " hits " << hits << " misses " << + //misses << endl; + } +} + // vim:et:sw=2:ts=2 Added: hash-join/trunk/src/method_thread1.h =================================================================== --- hash-join/trunk/src/method_thread1.h (rev 0) +++ hash-join/trunk/src/method_thread1.h 2008-02-06 16:01:30 UTC (rev 326) @@ -0,0 +1,132 @@ +#ifndef method_thread_h +#define method_thread_h + +#include <assert.h> +#include <pthread.h> + +// non-rpc-specific utility to start a thread that runs +// an object method. returns a pthread_t on success, and +// zero on error. +template <class C> pthread_t +method_thread1(C *o, void (C::*m)()) +{ + class XXX { + public: + C *o; + void (C::*m)(); + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)() = x->m; + delete x; + (o->*m)(); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + pthread_t th; + if(pthread_create(&th, NULL, &XXX::yyy, (void *) x) == 0){ + return th; + } + return 0; +} + +template <class C, class A> pthread_t +method_thread1(C *o, void (C::*m)(A), A a) +{ + class XXX { + public: + C *o; + void (C::*m)(A a); + A a; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + delete x; + (o->*m)(a); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a = a; + pthread_t th; + if(pthread_create(&th, NULL, &XXX::yyy, (void *) x) == 0){ + return th; + } + return 0; +} + +template <class C, class A1, class A2> pthread_t +method_thread1(C *o, void (C::*m)(A1 , A2 ), A1 a1, A2 a2) +{ + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2); + A1 a1; + A2 a2; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + delete x; + (o->*m)(a1, a2); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + pthread_t th; + if(pthread_create(&th, NULL, &XXX::yyy, (void *) x) == 0){ + return th; + } + return 0; +} + +template <class C, class A1, class A2, class A3> pthread_t +method_thread1(C *o, void (C::*m)(A1 , A2, A3), A1 a1, A2 a2, A3 a3) +{ + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2, A3 a3); + A1 a1; + A2 a2; + A3 a3; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1, A2, A3) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + A3 a3 = x->a3; + delete x; + (o->*m)(a1, a2, a3); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + x->a3 = a3; + pthread_t th; + if(pthread_create(&th, NULL, &XXX::yyy, (void *) x) == 0){ + return th; + } + return 0; +} + +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |