Thread: [Assorted-commits] SF.net SVN: assorted: [327] hash-join/trunk/src/hashjoin.cc
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-06 16:19:47
|
Revision: 327 http://assorted.svn.sourceforge.net/assorted/?rev=327&view=rev Author: yangzhang Date: 2008-02-06 08:19:42 -0800 (Wed, 06 Feb 2008) Log Message: ----------- fixed the obscene replication Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 16:01:30 UTC (rev 326) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 16:19:42 UTC (rev 327) @@ -204,22 +204,23 @@ int main(int argc, char *argv[]) { - if (argc != 3) { - fprintf(stderr, "hashjoin <ncpus> <actresses>\n"); + if (argc != 4) { + fprintf(stderr, "hashjoin <ncpus> <movies> <actresses>\n"); exit(1); } ncpus = atoi(argv[1]); - const char *actresses = argv[2]; + const char *movies = argv[2]; + const char *actresses = argv[3]; timer t("main time: "); cout << "loading movies" << endl; - movdb mdb("../movie-data/movies.dat"); + movdb mdb(movies); // "../movie-data/movies.dat" t.print(); cout << "loading actresses" << endl; - actdb adb(actresses);//"../movie-data/mdactresses.dat"); + actdb adb(actresses); // "../movie-data/mdactresses.dat" t.print(); cout << "hash-partitioning movies into per-core buckets" << endl; @@ -409,6 +410,7 @@ for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } + char tmp[1024]; // Statistics (TODO dynamic allocation) int counter = 0, mincount = INT_MAX; @@ -416,15 +418,20 @@ while (p < end - 999) { char *name = p; p = strchr(p, '\0') + 1; + strcpy(tmp, name); + char *subtmp = tmp + strlen(name) + 1; char *tuple_end = unsafe_strstr(p, "\0\0", end) + 2; while (true) { char *title = p; p = strchr(p, '\0') + 1; + strcpy(subtmp, title); + size_t strl = strlen(subtmp); + // Copy this line into the correct local bucket. //cout << "hashing " << title << endl; unsigned int bbb; - if (-1 != (bbb = push_bucket(heads, bs, title, name, tuple_end - name))) { + if (-1 != (bbb = push_bucket(heads, bs, title, tmp, subtmp + strl + 1 - tmp))) { //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; //int bucket_size = max(1000000UL,buflen / ncpus * 3); //cout << "FUCK " << heads[0] - bs[0].buf << " " << bucket_size << " " << heads[1] - bs[1].buf << " " << p - title << endl; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-06 17:09:49
|
Revision: 328 http://assorted.svn.sourceforge.net/assorted/?rev=328&view=rev Author: yangzhang Date: 2008-02-06 09:09:53 -0800 (Wed, 06 Feb 2008) Log Message: ----------- fixed some bugs Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 16:19:42 UTC (rev 327) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 17:09:53 UTC (rev 328) @@ -338,7 +338,7 @@ //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; int bucket_size = max(1000000UL,buflen / ncpus * 3); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { - memcpy(heads[bucket], s, nbytes); + memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; return -1; } else { @@ -390,7 +390,7 @@ for (unsigned int i = 0; i < ncpus; i++) { bs[i].sz.back() = heads[i] - bs[i].bufs.back(); } - cout << "movie count " << counter << " vs " << bs[0].sz.back()<< endl; + cout << "movie count " << counter << " nbytes " << bs[0].sz.back()<< endl; } void @@ -415,7 +415,7 @@ // Statistics (TODO dynamic allocation) int counter = 0, mincount = INT_MAX; char *p = partstart, *end = partend; - while (p < end - 999) { + while (p < end) { char *name = p; p = strchr(p, '\0') + 1; strcpy(tmp, name); @@ -426,12 +426,14 @@ p = strchr(p, '\0') + 1; strcpy(subtmp, title); - size_t strl = strlen(subtmp); + size_t tmplen = subtmp + strlen(subtmp) + 2 - tmp; + check(tmplen < 1024); + tmp[tmplen-1] = '\0'; // Copy this line into the correct local bucket. //cout << "hashing " << title << endl; unsigned int bbb; - if (-1 != (bbb = push_bucket(heads, bs, title, tmp, subtmp + strl + 1 - tmp))) { + if (-1 != (bbb = push_bucket(heads, bs, title, tmp, tmplen))) { //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; //int bucket_size = max(1000000UL,buflen / ncpus * 3); //cout << "FUCK " << heads[0] - bs[0].buf << " " << bucket_size << " " << heads[1] - bs[1].buf << " " << p - title << endl; @@ -453,7 +455,7 @@ for (unsigned int i = 0; i < ncpus; i++) { bs[i].sz.back() = heads[i] - bs[i].bufs.back(); } - cout << "actress count " << counter << " vs " << bs[0].sz.back()<< endl; + cout << "actress count " << counter << " nbytes " << bs[0].sz.back()<< endl; } const hmap * @@ -514,23 +516,24 @@ actdb::probe1(unsigned int pid, const hmap *hh, const bucket **actbucs) { const hmap &h = *hh; + int hits = 0, misses = 0; for (unsigned int i = 0; i < ncpus; i++) { char *p = actbucs[i][pid].bufs[0], *end = actbucs[i][pid].bufs[0] + actbucs[i][pid].sz[0]; - int hits = 0, misses = 0; while (p < end) { char *name = p; p = strchr(p, '\0') + 1; while (true) { char *title = p; p = strchr(p, '\0') + 1; - // cout << "name " << name << "title: " << title << p - title << endl; + //cout << "name " << name << " title: " << title << p - title << endl; // Emit any joined tuple. if (h.find(title) != h.end()) { //cout << " HIT" << endl; hits++; join(title, name); } else { + //cout << " MISS" << endl; misses++; } // End of tuple? @@ -540,9 +543,8 @@ } } } - //cout << "cpu " << pid << " src " << i << " hits " << hits << " misses " << - //misses << endl; } + cout << "cpu " << pid << " hits " << hits << " misses " << misses << endl; } // vim:et:sw=2:ts=2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-06 17:29:57
|
Revision: 329 http://assorted.svn.sourceforge.net/assorted/?rev=329&view=rev Author: yangzhang Date: 2008-02-06 09:29:57 -0800 (Wed, 06 Feb 2008) Log Message: ----------- fixed more bugs Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 17:09:53 UTC (rev 328) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 17:29:57 UTC (rev 329) @@ -377,6 +377,7 @@ char *title = p; char *release = strchr(p, '\0') + 1; p = strchr(release, '\0') + 2; + // printf("%s (%d) / %s (%d) %d %d %d %d\n", title, strlen(title), release, strlen(release), *(p - 4), *(p - 3), *(p - 2), *(p - 1)); // Copy this line into the correct local bucket. if (-1 != push_bucket(heads, bs, title, title, p - title)) { //cout << "FUCK " << heads[0] - bs[0].buf << " " << heads[1] - bs[1].buf << " " << p - title << endl; @@ -504,7 +505,7 @@ { pthread_t ts[ncpus]; for (unsigned int i = 0; i < ncpus; i++) { - ts[i] = method_thread1(this, &actdb::probe1, i, hs, actbucs); + ts[i] = method_thread1(this, &actdb::probe1, i, &hs[i], actbucs); } for (unsigned int i = 0; i < ncpus; i++) { void *value; @@ -533,7 +534,7 @@ hits++; join(title, name); } else { - //cout << " MISS" << endl; + cout << " MISS " << title << endl; misses++; } // End of tuple? This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-06 23:01:54
|
Revision: 330 http://assorted.svn.sourceforge.net/assorted/?rev=330&view=rev Author: yangzhang Date: 2008-02-06 15:01:49 -0800 (Wed, 06 Feb 2008) Log Message: ----------- cleaned up Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 17:29:57 UTC (rev 329) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 23:01:49 UTC (rev 330) @@ -1,3 +1,4 @@ +#include <memory> #include <cassert> #include <cstdio> #include <iostream> @@ -3,5 +4,4 @@ #include <exception> #include <vector> - #include <ext/hash_map> @@ -10,27 +10,38 @@ #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> - #include <pthread.h> #include "method_thread1.h" // -// c++ commons :: numa -// TODO document +// C++ Commons :: NUMA // using namespace std; -// TODO replace with a macro +// TODO: Figure out how to create an exception with a useful message. inline void -check(bool cond) +_check(bool cond, const char *msg, const char *file, int line) { if (!cond) { throw exception(); } } +#define check(cond) _check(cond, NULL, __FILE__, __LINE__) + +/** + * Similar to assert(), but is not conditionally compiled, so this is safe to + * use as a guard against expected failures (such as checking return codes). + */ +#define checkmsg(cond, msg) \ + bool b = cond; \ + if (!b) _check(b, (msg), __FILE__, __LINE__) + +/** + * Search in p for the nth instance of c and return the character past it. + */ inline const char * strchrrep(const char *p, char c, int n) { @@ -42,13 +53,23 @@ return p; } +/** + * Search in p for the nth instance of c and return the character past it. + * TODO figure out if there's a way to merge this and the above rather than + * maintaining two versions. (Related to Linus Torvalds' post on const?) + */ inline char * strchrrep(char *p, char c, int n) { return const_cast<char *>(strchrrep(const_cast<const char *>(p), c, n)); } -inline long long current_time_millis() { +/** + * Get the current time in milliseconds. + */ +inline long long +current_time_millis() +{ long long t; struct timeval tv; @@ -60,12 +81,16 @@ return t; } +/** + * Convenience class for performing wall-clock benchmarking. + */ class timer { public: timer(const string label) : label(label), start(current_time_millis()), last(start) {} - void print() { + void print() + { long long now = current_time_millis(); cout << label << now - last << endl; last = now; @@ -75,11 +100,47 @@ long long start, last; }; -void -load_file(const char *path, char *&buf, size_t & len, unsigned int ncpus) { +/** + * A functor that checks for string equality. Mainly useful as a template + * parameter to the hash data structures in STL extensions. + */ +struct eqstr +{ + bool operator()(const char* s1, const char* s2) const + { + return strcmp(s1, s2) == 0; + } +}; + +/** + * Look for a substring, but without null-termination conventions. + */ +inline char * +unsafe_strstr(char *p, char *q, char *lim) +{ + if (lim == 0) { + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0'); p++); + return p; + } + } else { + check(p < lim); + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); + if (p == lim) return NULL; + return p; + } + } +} + +/** + * Load an entire file into buf and also give us the length of the buffer. + * TODO this probably isn't very safe, since we're demoting an off_t to a + * size_t. Is there a healthier approach? + */ +char * +load_file(const char *path, size_t & len, unsigned int ncpus) { struct stat sb; - // pthread_t tha[CPUS]; - // void *value; int fd; fd = open(path, 0); @@ -88,119 +149,122 @@ check(fstat(fd, &sb) == 0); check(sb.st_size <= 0xffffffff); - // TODO why don't i need (static) cast here? + // TODO Why don't we need (static) cast here? Isn't this a lossy cast? len = sb.st_size; - buf = new char[len + 1]; + char *buf = new char[len + 1]; check(buf); - // XXX use threads to pull data to the correct initial locations? -// #if CPUS > 1 + // TODO Use threads to pull data to the correct initial locations? size_t chunk_len = len / ncpus; for (unsigned int i = 0; i < ncpus; i++) { - // TODO review C++ cast rules int off = i *chunk_len; ssize_t status = pread(fd, buf + off, chunk_len, off); - // we read the whole chunk or hit the end + // We read the whole chunk or hit the end. size_t nread = static_cast<ssize_t>(status); check(status != -1 && (nread == chunk_len || off + nread == len)); -// tha[i] = method_thread1(this, &MapReduce::readin1, i *chunk_len, chunk_len); } -// for(i = 0; i < ncpus; i++) -// check(pthread_join(tha[i], &value) == 0); -// #else -// readin1(0, len); -// #endif - check(close(fd) == 0); buf[len] = '\0'; // don't let strcmp() run off the end + return buf; } // -// hashjoin +// Hash Join // using namespace std; using namespace __gnu_cxx; -struct eqstr -{ - bool operator()(const char* s1, const char* s2) const - { - return strcmp(s1, s2) == 0; - } -}; - -// TODO dependency injection +// TODO use dependency injection! unsigned int ncpus = 1; typedef hash_map<const char *, const void *, hash<const char *>, eqstr> hmap; const hmap::size_type map_size = 10000000; -class bucket { +/** + * Buckets are produced in the hash-partitioning phase. These are simple + * storage containers. + */ +class bucket +{ public: + ~bucket() + { + // XXX check this + for (size_t i = 0; i < bufs.size(); i++) { + delete [] bufs[i]; + } + } + /** + * The sizes of the bufs. Should always be the same length as bufs. + */ vector<size_t> sz; + /** + * The data that we hold. + */ vector<char *> bufs; }; -class db { +/** + * An abstract in-memory database that holds "tuples" in a contiguous buffer. + * The format/interpretation of the buffers is up to the subclasses. + */ +class db +{ public: - db(const char *path) { load_file(path, buf, buflen, ncpus); } + db(const char *path) : buf(load_file(path, buflen, ncpus)) {} const bucket **partition(); + /** + * This routine runs on each processor to hash-partition the data into local + * buckets. + */ virtual void partition1(unsigned int pid, bucket* bucket) = 0; - virtual ~db() {} + virtual ~db() { delete [] buf; } unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes); protected: - // TODO smart pointer char *buf; size_t buflen; }; -class movdb : public db { +/** + * This is something which we must free. + */ +class movdb : public db +{ public: movdb(const char *path) : db(path) {} virtual ~movdb() {} + /** + * Build the hash map in parallel. + */ const hmap *build(const bucket **movbucs); + /** + * Each node runs this routine to construct its local hash map. + */ void build1(unsigned int pid, const bucket **movbucs, hmap *h); void partition1(unsigned int pid, bucket* bucket); }; -class actdb : public db { +class actdb : public db +{ public: actdb(const char *path) : db(path) {} virtual ~actdb() {} + /** + * Probe the hash maps with tuples from the actor buckets. + */ void probe(const hmap *hs, const bucket **actbucs, bool show_progress); - void probe1(unsigned int pid, const hmap *hh, const bucket **actbucs); + /** + * Each node runs this routine to probe into its local hash map using tuples + * from actor buckets destined for that node. + */ + void probe1(unsigned int pid, const hmap *ph, const bucket **actbucs); void partition1(unsigned int pid, bucket* bucket); }; -// template <typename T> -// class bucket { -// public: -// bucket(int count) { -// ts = new (T*)[count]; -// } -// private: -// T *ts; -// } -// -// typedef vector<const char *> bucket; -// -// class hmap { -// public: -// hmap(int nbuckets) : nbuckets(nbuckets), nentries(0) { -// buckets = new bucket[nbuckets]; -// check(buckets); -// } -// hmap() : nbuckets(nbuckets_default); -// private: -// bucket *buckets; -// int nbuckets; -// int nentries; -// }; - int main(int argc, char *argv[]) { @@ -215,14 +279,18 @@ timer t("main time: "); + // Load the data files. + cout << "loading movies" << endl; - movdb mdb(movies); // "../movie-data/movies.dat" + movdb mdb(movies); t.print(); cout << "loading actresses" << endl; - actdb adb(actresses); // "../movie-data/mdactresses.dat" + actdb adb(actresses); t.print(); + // Hash-partition the data among the nodes. + cout << "hash-partitioning movies into per-core buckets" << endl; const bucket **movbucs = mdb.partition(); t.print(); @@ -231,6 +299,8 @@ const bucket **actbucs = adb.partition(); t.print(); + // Perform the hash-join. + cout << "building with movies" << endl; const hmap *hs = mdb.build(movbucs); t.print(); @@ -271,53 +341,10 @@ check(pthread_join(ts[i], &value) == 0); } - // // Now from the consumer - // for (int i = 0; i < ncpus; i++) { - // ts[i] = method_thread1( - // // XXX - // ); - // } return const_cast<const bucket**>(buckets); // TODO why is this cast needed? } -// XXX -//inline const char * -//unsafe_strstr(const char *p, const char *q, const char *lim) -//{ -// while (true) { -// if (lim > 0 && p >= lim) return NULL; -// p = strchr(p, '\0') + 1; -// if (lim > 0 && p >= lim) return NULL; -// if (*p == '\0') return p; -// } -//} - inline char * -unsafe_strstr(char *p, char *q, char *lim) -{ - if (lim == 0) { - while (true) { - for (; !(*p == '\0' && *(p+1) == '\0'); p++); - return p; - } - } else { - while (true) { - for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); - if (p == lim) return NULL; - return p; - } - } -} - -// inline char * -// unsafe_strstr(char *p, char *q, char *lim) -// { -// return const_cast<char *>(unsafe_strstr(const_cast<const char*>(p), -// const_cast<const char*>(q), -// const_cast<const char*>(lim))); -// } - -inline char * next_tuple(char *p) { char *next = unsafe_strstr(p, "\0\0", 0); @@ -334,16 +361,12 @@ { size_t h = __stl_hash_string(s); unsigned int bucket = h % (map_size * ncpus) / map_size; - //cout << s << " : " << bucket << endl; - //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; int bucket_size = max(1000000UL,buflen / ncpus * 3); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; return -1; } else { - //cout << s << endl; - // cout << (uintptr_t)heads[bucket] << " " << nbytes << " " << (uintptr_t)bs[bucket].buf << " " << bucket_size << endl; bs[bucket].sz.back() = heads[bucket] - bs[bucket].bufs.back(); bs[bucket].bufs.push_back(new char[bucket_size]); check(bs[bucket].bufs.back()); @@ -369,22 +392,15 @@ for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } - // Statistics (TODO dynamic allocation) - int counter = 0, mincount = INT_MAX; + int counter = 0; char *p = partstart, *end = partend; + // Iterate over the partitions. while (p < end) { - // cout << "remaining: " << end - p << " " << (uintptr_t) p << " ; " << ((int) *(p-1)) << " ; " << ((int) *(p)) << " ; " << ((int) *(p+1)) << endl; char *title = p; char *release = strchr(p, '\0') + 1; p = strchr(release, '\0') + 2; - // printf("%s (%d) / %s (%d) %d %d %d %d\n", title, strlen(title), release, strlen(release), *(p - 4), *(p - 3), *(p - 2), *(p - 1)); - // Copy this line into the correct local bucket. - if (-1 != push_bucket(heads, bs, title, title, p - title)) { - //cout << "FUCK " << heads[0] - bs[0].buf << " " << heads[1] - bs[1].buf << " " << p - title << endl; - //mincount = min(mincount, counter); - //if (mincount == counter) cout << "CRAP" << counter << endl; - //cout << "overflowed on: " << title << endl; - } + // Copy this tuple into the correct local bucket. + push_bucket(heads, bs, title, title, p - title); counter++; } // Record the written size of each bucket. @@ -411,37 +427,33 @@ for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } + + // This is used for creating (name, title) tuples. (No tuple may exceed 1024 + // bytes.) char tmp[1024]; - // Statistics (TODO dynamic allocation) - int counter = 0, mincount = INT_MAX; + // Iterate over the partitions. char *p = partstart, *end = partend; + int counter = 0; while (p < end) { char *name = p; p = strchr(p, '\0') + 1; + // Fill in the first part of the tuple. strcpy(tmp, name); char *subtmp = tmp + strlen(name) + 1; - char *tuple_end = unsafe_strstr(p, "\0\0", end) + 2; while (true) { char *title = p; p = strchr(p, '\0') + 1; + // Fill in the second half of the tuple. strcpy(subtmp, title); size_t tmplen = subtmp + strlen(subtmp) + 2 - tmp; check(tmplen < 1024); tmp[tmplen-1] = '\0'; - // Copy this line into the correct local bucket. - //cout << "hashing " << title << endl; + // Copy the tuple into the correct local bucket. unsigned int bbb; - if (-1 != (bbb = push_bucket(heads, bs, title, tmp, tmplen))) { - //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; - //int bucket_size = max(1000000UL,buflen / ncpus * 3); - //cout << "FUCK " << heads[0] - bs[0].buf << " " << bucket_size << " " << heads[1] - bs[1].buf << " " << p - title << endl; - ////mincount = min(mincount, counter); - ////if (mincount == counter) cout << "CRAP" << counter << endl; - //cout << "overflowed " << bbb << " on: " << name << endl; - } + bbb = push_bucket(heads, bs, title, tmp, tmplen); counter++; // End of tuple? @@ -456,6 +468,8 @@ for (unsigned int i = 0; i < ncpus; i++) { bs[i].sz.back() = heads[i] - bs[i].bufs.back(); } + + // TODO fix this log msg to sum up the sz's rather than just showing the last cout << "actress count " << counter << " nbytes " << bs[0].sz.back()<< endl; } @@ -475,9 +489,9 @@ } void -movdb::build1(unsigned int pid, const bucket **movbucs, hmap *hh) +movdb::build1(unsigned int pid, const bucket **movbucs, hmap *ph) { - hmap &h = *hh; + hmap &h = *ph; // Visit each bucket that's destined to us (visit each source). for (unsigned int i = 0; i < ncpus; i++) { char *p = movbucs[i][pid].bufs[0], @@ -489,18 +503,10 @@ // Insert into hash map. h[title] = release; } - //cout << "cpu " << pid << " src " << i << " cumulative h.size " << h.size() - //<< endl; } } void -join(const char *movie, const char *actress) -{ - // cout << "JOINED: " << movie << " WITH " << actress << endl; -} - -void actdb::probe(const hmap *hs, const bucket **actbucs, bool show_progress) { pthread_t ts[ncpus]; @@ -513,31 +519,41 @@ } } +/** + * Dummy function that is called to represent emitting a joined tuple. + */ +inline void +join(const char *movie, const char *actress) +{ + if (false) cout << "JOINED: " << movie << " WITH " << actress << endl; +} + void -actdb::probe1(unsigned int pid, const hmap *hh, const bucket **actbucs) +actdb::probe1(unsigned int pid, const hmap *ph, const bucket **actbucs) { - const hmap &h = *hh; + const hmap &h = *ph; int hits = 0, misses = 0; + // For each source bucket. for (unsigned int i = 0; i < ncpus; i++) { char *p = actbucs[i][pid].bufs[0], *end = actbucs[i][pid].bufs[0] + actbucs[i][pid].sz[0]; + // Iterate over the bucket. while (p < end) { char *name = p; p = strchr(p, '\0') + 1; while (true) { char *title = p; p = strchr(p, '\0') + 1; - //cout << "name " << name << " title: " << title << p - title << endl; - // Emit any joined tuple. + // Emit the joined tuple (if a join was possible). if (h.find(title) != h.end()) { - //cout << " HIT" << endl; hits++; join(title, name); } else { cout << " MISS " << title << endl; misses++; } - // End of tuple? + // End of a tuple? (Don't actually need this check, since the + // hash-partitioning "normalizes" the tuples from the actresses file.) if (*p == '\0') { p++; break; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-06 23:44:17
|
Revision: 331 http://assorted.svn.sourceforge.net/assorted/?rev=331&view=rev Author: yangzhang Date: 2008-02-06 15:44:14 -0800 (Wed, 06 Feb 2008) Log Message: ----------- added initial ncpu logging Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 23:01:49 UTC (rev 330) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 23:44:14 UTC (rev 331) @@ -277,6 +277,8 @@ const char *movies = argv[2]; const char *actresses = argv[3]; + cout << "using " << ncpus << " cpus" << endl; + timer t("main time: "); // Load the data files. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-07 00:51:20
|
Revision: 333 http://assorted.svn.sourceforge.net/assorted/?rev=333&view=rev Author: yangzhang Date: 2008-02-06 16:50:59 -0800 (Wed, 06 Feb 2008) Log Message: ----------- checks out Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-07 00:46:50 UTC (rev 332) +++ hash-join/trunk/src/hashjoin.cc 2008-02-07 00:50:59 UTC (rev 333) @@ -192,7 +192,6 @@ public: ~bucket() { - // XXX check this for (size_t i = 0; i < bufs.size(); i++) { delete [] bufs[i]; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-09 05:11:51
|
Revision: 340 http://assorted.svn.sourceforge.net/assorted/?rev=340&view=rev Author: yangzhang Date: 2008-02-08 21:11:38 -0800 (Fri, 08 Feb 2008) Log Message: ----------- fixed some gcc-4.2 issues Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-07 18:43:54 UTC (rev 339) +++ hash-join/trunk/src/hashjoin.cc 2008-02-09 05:11:38 UTC (rev 340) @@ -116,7 +116,7 @@ * Look for a substring, but without null-termination conventions. */ inline char * -unsafe_strstr(char *p, char *q, char *lim) +unsafe_strstr(char *p, const char *q, const char *lim) { if (lim == 0) { while (true) { @@ -134,6 +134,15 @@ } /** + * Look for a substring, but without null-termination conventions. + */ +inline const char* +unsafe_strstr(const char *p, const char *q, const char *lim) +{ + return unsafe_strstr((char*) p, q, lim); +} + +/** * Load an entire file into buf and also give us the length of the buffer. * TODO this probably isn't very safe, since we're demoting an off_t to a * size_t. Is there a healthier approach? This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-09 19:37:20
|
Revision: 344 http://assorted.svn.sourceforge.net/assorted/?rev=344&view=rev Author: yangzhang Date: 2008-02-09 11:37:26 -0800 (Sat, 09 Feb 2008) Log Message: ----------- fixed some protection/docs Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-09 19:37:17 UTC (rev 343) +++ hash-join/trunk/src/hashjoin.cc 2008-02-09 19:37:26 UTC (rev 344) @@ -55,8 +55,6 @@ /** * Search in p for the nth instance of c and return the character past it. - * TODO figure out if there's a way to merge this and the above rather than - * maintaining two versions. (Related to Linus Torvalds' post on const?) */ inline char * strchrrep(char *p, char c, int n) @@ -223,12 +221,10 @@ { public: db(const char *path) : buf(load_file(path, buflen, ncpus)) {} - const bucket **partition(); /** - * This routine runs on each processor to hash-partition the data into local - * buckets. + * Run hash-partitioning phase on all processors. */ - virtual void partition1(unsigned int pid, bucket* bucket) = 0; + const bucket **partition(); virtual ~db() { delete [] buf; } /** * Push a tuple into one of the buckets. Which bucket is determined by the @@ -243,6 +239,11 @@ unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes); protected: + /** + * This routine runs on each processor to hash-partition the data into local + * buckets. + */ + virtual void partition1(unsigned int pid, bucket* bucket) = 0; char *buf; size_t buflen; }; @@ -259,6 +260,7 @@ * Build the hash map in parallel. */ const hmap *build(const bucket **movbucs); +protected: /** * Each node runs this routine to construct its local hash map. */ @@ -278,6 +280,7 @@ * Probe the hash maps with tuples from the actor buckets. */ void probe(const hmap *hs, const bucket **actbucs, bool show_progress); +protected: /** * Each node runs this routine to probe into its local hash map using tuples * from actor buckets destined for that node. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-09 19:39:45
|
Revision: 345 http://assorted.svn.sourceforge.net/assorted/?rev=345&view=rev Author: yangzhang Date: 2008-02-09 11:39:48 -0800 (Sat, 09 Feb 2008) Log Message: ----------- removed miss msgs Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-09 19:37:26 UTC (rev 344) +++ hash-join/trunk/src/hashjoin.cc 2008-02-09 19:39:48 UTC (rev 345) @@ -571,7 +571,6 @@ hits++; join(title, name); } else { - cout << " MISS " << title << endl; misses++; } // End of a tuple? (Don't actually need this check, since the This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-11 04:52:39
|
Revision: 358 http://assorted.svn.sourceforge.net/assorted/?rev=358&view=rev Author: yangzhang Date: 2008-02-10 20:52:44 -0800 (Sun, 10 Feb 2008) Log Message: ----------- fixed bug omitting subsequent buckets in build/probe Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-11 04:52:20 UTC (rev 357) +++ hash-join/trunk/src/hashjoin.cc 2008-02-11 04:52:44 UTC (rev 358) @@ -13,11 +13,15 @@ #include <pthread.h> #include <commons/check.h> +#include <commons/deque.h> #include <commons/files.h> +#include <commons/hash.h> #include <commons/strings.h> #include <commons/threads.h> #include <commons/time.h> +// TODO: #include <boost/array.hpp> + // // Hash Join // @@ -25,6 +29,7 @@ using namespace std; using namespace __gnu_cxx; using namespace commons; +// TODO: using namespace boost; // TODO use dependency injection! unsigned int ncpus = 1; @@ -51,9 +56,11 @@ /** * The data that we hold. */ - vector<char *> bufs; + vector<char*> bufs; }; +// TODO: typedef list< array<char, bucket_size> > bucket; + /** * An abstract in-memory database that holds "tuples" in a contiguous buffer. * The format/interpretation of the buffers is up to the subclasses. @@ -222,7 +229,7 @@ unsigned int db::push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes) { - size_t h = __stl_hash_string(s); + size_t h = hash_djb2(s); unsigned int bucket = h % (map_size * ncpus) / map_size; size_t bucket_size = max(1000000UL,buflen / ncpus * 3); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { @@ -357,14 +364,19 @@ hmap &h = *ph; // Visit each bucket that's destined to us (visit each source). for (unsigned int i = 0; i < ncpus; i++) { - char *p = movbucs[i][pid].bufs[0], - *end = movbucs[i][pid].bufs[0] + movbucs[i][pid].sz[0]; - while (p < end) { - char *title = p; - char *release = strchr(p, '\0') + 1; - p = strchr(release, '\0') + 2; - // Insert into hash map. - h[title] = release; + const vector<char*>& bufs = movbucs[i][pid].bufs; + const vector<size_t>& sz = movbucs[i][pid].sz; + // Iterate over the bucket. + for (unsigned int j = 0; j < bufs.size(); j++) { + char *p = bufs[j], *end = bufs[j] + sz[j]; + // Iterate over the chunk. + while (p < end) { + char *title = p; + char *release = strchr(p, '\0') + 1; + p = strchr(release, '\0') + 2; + // Insert into hash map. + h[title] = release; + } } } } @@ -398,28 +410,32 @@ int hits = 0, misses = 0; // For each source bucket. for (unsigned int i = 0; i < ncpus; i++) { - char *p = actbucs[i][pid].bufs[0], - *end = actbucs[i][pid].bufs[0] + actbucs[i][pid].sz[0]; + const vector<char*>& bufs = actbucs[i][pid].bufs; + const vector<size_t>& sz = actbucs[i][pid].sz; // Iterate over the bucket. - while (p < end) { - char *name = p; - p = strchr(p, '\0') + 1; - while (true) { - char *title = p; + for (unsigned int j = 0; j < bufs.size(); j++) { + char *p = bufs[j], *end = bufs[j] + sz[j]; + // Iterate over the chunk. + while (p < end) { + char *name = p; p = strchr(p, '\0') + 1; - // Emit the joined tuple (if a join was possible). - if (h.find(title) != h.end()) { - hits++; - join(title, name); - } else { - misses++; + while (true) { + char *title = p; + p = strchr(p, '\0') + 1; + // Emit the joined tuple (if a join was possible). + if (h.find(title) != h.end()) { + hits++; + join(title, name); + } else { + misses++; + } + // End of a tuple? (Don't actually need this check, since the + // hash-partitioning "normalizes" the tuples from the actresses file.) + if (*p == '\0') { + p++; + break; + } } - // End of a tuple? (Don't actually need this check, since the - // hash-partitioning "normalizes" the tuples from the actresses file.) - if (*p == '\0') { - p++; - break; - } } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-11 15:53:14
|
Revision: 374 http://assorted.svn.sourceforge.net/assorted/?rev=374&view=rev Author: yangzhang Date: 2008-02-11 07:53:17 -0800 (Mon, 11 Feb 2008) Log Message: ----------- added debug output Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-11 15:45:31 UTC (rev 373) +++ hash-join/trunk/src/hashjoin.cc 2008-02-11 15:53:17 UTC (rev 374) @@ -427,6 +427,7 @@ hits++; join(title, name); } else { + if (misses == 0) cerr << "MISS: '" << title << '\'' << endl; misses++; } // End of a tuple? (Don't actually need this check, since the This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-11 17:02:51
|
Revision: 375 http://assorted.svn.sourceforge.net/assorted/?rev=375&view=rev Author: yangzhang Date: 2008-02-11 09:02:55 -0800 (Mon, 11 Feb 2008) Log Message: ----------- moved to new output for more friendly log processing Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-11 15:53:17 UTC (rev 374) +++ hash-join/trunk/src/hashjoin.cc 2008-02-11 17:02:55 UTC (rev 375) @@ -151,38 +151,45 @@ cout << "using " << ncpus << " cpus" << endl; - timer t("main time: "); - // Load the data files. cout << "loading movies" << endl; + timer tldmov("loading movies time: "); movdb mdb(movies); - t.print(); + tldmov.print(); cout << "loading actresses" << endl; + timer tldact("loading actresses time: "); actdb adb(actresses); - t.print(); + tldact.print(); + timer ttotal("total time: "); + // Hash-partition the data among the nodes. cout << "hash-partitioning movies into per-core buckets" << endl; + timer thpmov("hash-partitioning movies time: "); const bucket **movbucs = mdb.partition(); - t.print(); + thpmov.print(); cout << "hash-partitioning actresses into per-core buckets" << endl; + timer thpact("hash-partitioning actresses time: "); const bucket **actbucs = adb.partition(); - t.print(); + thpact.print(); // Perform the hash-join. cout << "building with movies" << endl; + timer tbuild("building with movies time: "); const hmap *hs = mdb.build(movbucs); - t.print(); + tbuild.print(); cout << "probing with actresses" << endl; + timer tprobe("probing with actresses time: "); adb.probe(hs, actbucs, true); - t.print(); + tprobe.print(); + ttotal.print(); cout << "done" << endl; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-11 23:23:00
|
Revision: 380 http://assorted.svn.sourceforge.net/assorted/?rev=380&view=rev Author: yangzhang Date: 2008-02-11 15:23:00 -0800 (Mon, 11 Feb 2008) Log Message: ----------- added resize notices; fixed push_bucket Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-11 23:19:42 UTC (rev 379) +++ hash-join/trunk/src/hashjoin.cc 2008-02-11 23:23:00 UTC (rev 380) @@ -31,9 +31,20 @@ using namespace commons; // TODO: using namespace boost; +typedef hash_map<const char*, const void*, hash<const char*>, eqstr> my_hash_map; +class hmap : public my_hash_map +{ +public: + void + resize(size_type hint) + { + cout << "resizing " << this << " to " << hint << endl; + my_hash_map::resize(hint); + } +}; + // TODO use dependency injection! unsigned int ncpus = 1; -typedef hash_map<const char *, const void *, hash<const char *>, eqstr> hmap; const hmap::size_type map_size = 10000000; /** @@ -68,7 +79,10 @@ class db { public: - db(const char *path) : buf(load_file(path, buflen, ncpus)) {} + db(const char *path) : buf(load_file(path, buflen, ncpus)) + { + scan(buf, buflen); + } /** * Run hash-partitioning phase on all processors. */ @@ -237,8 +251,8 @@ db::push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes) { size_t h = hash_djb2(s); - unsigned int bucket = h % (map_size * ncpus) / map_size; - size_t bucket_size = max(1000000UL,buflen / ncpus * 3); + unsigned int bucket = h % ncpus; + size_t bucket_size = max(1000000UL, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; @@ -369,6 +383,7 @@ movdb::build1(unsigned int pid, const bucket **movbucs, hmap *ph) { hmap &h = *ph; + h.resize(map_size); // Visit each bucket that's destined to us (visit each source). for (unsigned int i = 0; i < ncpus; i++) { const vector<char*>& bufs = movbucs[i][pid].bufs; @@ -434,7 +449,7 @@ hits++; join(title, name); } else { - if (misses == 0) cerr << "MISS: '" << title << '\'' << endl; + if (misses == 0) cerr << "MISS: '" << title << '\'' << endl; misses++; } // End of a tuple? (Don't actually need this check, since the This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-11 23:31:11
|
Revision: 384 http://assorted.svn.sourceforge.net/assorted/?rev=384&view=rev Author: yangzhang Date: 2008-02-11 15:31:10 -0800 (Mon, 11 Feb 2008) Log Message: ----------- fixed issues on 32-bit system Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-11 23:25:12 UTC (rev 383) +++ hash-join/trunk/src/hashjoin.cc 2008-02-11 23:31:10 UTC (rev 384) @@ -216,7 +216,7 @@ buckets[i] = new bucket[ncpus]; for (unsigned int j = 0; j < ncpus; j++) { // TODO dependency injection - size_t bucket_size = max(1000000UL,buflen / ncpus * 3); + size_t bucket_size = max(1000000U,buflen / ncpus * 3); // Each bucket should be twice as large as it would be given uniform // distribution. This is just an initial size; extending can happen. buckets[i][j].bufs.push_back(new char[bucket_size]); @@ -252,7 +252,7 @@ { size_t h = hash_djb2(s); unsigned int bucket = h % ncpus; - size_t bucket_size = max(1000000UL, buflen * 3 / ncpus); + size_t bucket_size = max(1000000U, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-12 17:23:41
|
Revision: 387 http://assorted.svn.sourceforge.net/assorted/?rev=387&view=rev Author: yangzhang Date: 2008-02-12 08:33:50 -0800 (Tue, 12 Feb 2008) Log Message: ----------- trying new allocator; fixed bucket_size type issues Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-12 01:47:59 UTC (rev 386) +++ hash-join/trunk/src/hashjoin.cc 2008-02-12 16:33:50 UTC (rev 387) @@ -16,6 +16,7 @@ #include <commons/deque.h> #include <commons/files.h> #include <commons/hash.h> +#include <commons/region.h> #include <commons/strings.h> #include <commons/threads.h> #include <commons/time.h> @@ -31,7 +32,14 @@ using namespace commons; // TODO: using namespace boost; -typedef hash_map<const char*, const void*, hash<const char*>, eqstr> my_hash_map; +typedef hash_map< + const char*, + const void*, + hash<const char*>, + eqstr, + region_alloc<int> +> my_hash_map; + class hmap : public my_hash_map { public: @@ -216,7 +224,7 @@ buckets[i] = new bucket[ncpus]; for (unsigned int j = 0; j < ncpus; j++) { // TODO dependency injection - size_t bucket_size = max(1000000U,buflen / ncpus * 3); + size_t bucket_size = max((size_t) 1000000,buflen / ncpus * 3); // Each bucket should be twice as large as it would be given uniform // distribution. This is just an initial size; extending can happen. buckets[i][j].bufs.push_back(new char[bucket_size]); @@ -252,7 +260,7 @@ { size_t h = hash_djb2(s); unsigned int bucket = h % ncpus; - size_t bucket_size = max(1000000U, buflen * 3 / ncpus); + size_t bucket_size = max((size_t) 1000000, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-12 21:58:47
|
Revision: 395 http://assorted.svn.sourceforge.net/assorted/?rev=395&view=rev Author: yangzhang Date: 2008-02-12 13:58:51 -0800 (Tue, 12 Feb 2008) Log Message: ----------- tweaks Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-12 19:16:18 UTC (rev 394) +++ hash-join/trunk/src/hashjoin.cc 2008-02-12 21:58:51 UTC (rev 395) @@ -1,3 +1,7 @@ +// +// Hash Join +// + #include <memory> #include <cassert> #include <cstdio> @@ -23,10 +27,6 @@ // TODO: #include <boost/array.hpp> -// -// Hash Join -// - using namespace std; using namespace __gnu_cxx; using namespace commons; @@ -46,14 +46,15 @@ void resize(size_type hint) { - cout << "resizing " << this << " to " << hint << endl; + // cout << "resizing " << this << " to " << hint << endl; my_hash_map::resize(hint); } }; // TODO use dependency injection! unsigned int ncpus = 1; -const hmap::size_type map_size = 10000000; +const hmap::size_type map_size = 1000000; +const int min_bucket_size = 1000000; /** * Buckets are produced in the hash-partitioning phase. These are simple @@ -224,7 +225,7 @@ buckets[i] = new bucket[ncpus]; for (unsigned int j = 0; j < ncpus; j++) { // TODO dependency injection - size_t bucket_size = max((size_t) 1000000,buflen / ncpus * 3); + size_t bucket_size = max((size_t) min_bucket_size,buflen / ncpus * 3); // Each bucket should be twice as large as it would be given uniform // distribution. This is just an initial size; extending can happen. buckets[i][j].bufs.push_back(new char[bucket_size]); @@ -260,7 +261,7 @@ { size_t h = hash_djb2(s); unsigned int bucket = h % ncpus; - size_t bucket_size = max((size_t) 1000000, buflen * 3 / ncpus); + size_t bucket_size = max((size_t) min_bucket_size, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; @@ -470,7 +471,7 @@ } } } - cout << "cpu " << pid << " hits " << hits << " misses " << misses << endl; + // cout << "cpu " << pid << " hits " << hits << " misses " << misses << endl; } // vim:et:sw=2:ts=2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-12 22:15:12
|
Revision: 397 http://assorted.svn.sourceforge.net/assorted/?rev=397&view=rev Author: yangzhang Date: 2008-02-12 14:15:13 -0800 (Tue, 12 Feb 2008) Log Message: ----------- readded hit/miss debug Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-12 22:13:04 UTC (rev 396) +++ hash-join/trunk/src/hashjoin.cc 2008-02-12 22:15:13 UTC (rev 397) @@ -471,7 +471,7 @@ } } } - // cout << "cpu " << pid << " hits " << hits << " misses " << misses << endl; + cout << "cpu " << pid << " hits " << hits << " misses " << misses << endl; } // vim:et:sw=2:ts=2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-02-15 03:57:22
|
Revision: 442 http://assorted.svn.sourceforge.net/assorted/?rev=442&view=rev Author: yangzhang Date: 2008-02-14 19:57:22 -0800 (Thu, 14 Feb 2008) Log Message: ----------- tweak Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-15 02:46:25 UTC (rev 441) +++ hash-join/trunk/src/hashjoin.cc 2008-02-15 03:57:22 UTC (rev 442) @@ -410,6 +410,7 @@ } } } + cout << "h.size " << h.size() << endl; } void This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-03-23 22:36:22
|
Revision: 640 http://assorted.svn.sourceforge.net/assorted/?rev=640&view=rev Author: yangzhang Date: 2008-03-23 15:36:23 -0700 (Sun, 23 Mar 2008) Log Message: ----------- - added support for stlport hash_map - fixed non-std c++ - 100K min bucket size - refactored bucket_size calculations - using vectors instead of raw arrays (in certain places) Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-03-19 16:57:53 UTC (rev 639) +++ hash-join/trunk/src/hashjoin.cc 2008-03-23 22:36:23 UTC (rev 640) @@ -8,7 +8,11 @@ #include <iostream> #include <exception> #include <vector> +#ifdef STLPORT +#include <hash_map> +#else #include <ext/hash_map> +#endif #include <sys/types.h> #include <sys/stat.h> @@ -25,12 +29,14 @@ #include <commons/threads.h> #include <commons/time.h> -// TODO: #include <boost/array.hpp> +#include <boost/scoped_array.hpp> using namespace std; +using namespace boost; +#ifndef STLPORT using namespace __gnu_cxx; +#endif using namespace commons; -// TODO: using namespace boost; typedef hash_map< const char*, @@ -52,9 +58,9 @@ }; // TODO use dependency injection! -unsigned int ncpus = 1; -const hmap::size_type map_size = 1000000; -const int min_bucket_size = 1000000; +unsigned int ncpus = 1; +const hmap::size_type map_size = 1000000; // 1MB +const size_t min_bucket_size = 100000; // 100KB /** * Buckets are produced in the hash-partitioning phase. These are simple @@ -88,7 +94,10 @@ class db { public: - db(const char *path) : buf(load_file(path, buflen, ncpus)) + db(const char *path) : + buf(load_file(path, buflen, ncpus)), + // TODO dependency injection + bucket_size(max((size_t) min_bucket_size, buflen / ncpus / ncpus * 3)) { scan(buf, buflen); } @@ -107,8 +116,8 @@ * \param p The start of the tuple. * \param nbytes The length of the tuple. */ - unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char - *p, size_t nbytes); + unsigned int push_bucket(vector<char *> & heads, bucket *bs, const char *s, + const char *p, size_t nbytes); protected: /** * This routine runs on each processor to hash-partition the data into local @@ -117,6 +126,7 @@ virtual void partition1(unsigned int pid, bucket* bucket) = 0; char *buf; size_t buflen; + size_t bucket_size; }; /** @@ -223,9 +233,8 @@ bucket **buckets = new bucket*[ncpus]; for (unsigned int i = 0; i < ncpus; i++) { buckets[i] = new bucket[ncpus]; + check(buckets[i]); for (unsigned int j = 0; j < ncpus; j++) { - // TODO dependency injection - size_t bucket_size = max((size_t) min_bucket_size,buflen / ncpus * 3); // Each bucket should be twice as large as it would be given uniform // distribution. This is just an initial size; extending can happen. buckets[i][j].bufs.push_back(new char[bucket_size]); @@ -237,13 +246,12 @@ // Partition the data into the buckets using the hash function. Reason for // buckets: poor man's message passing. All the data from CPU i to CPU j goes // into bucket[i][j]. - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &db::partition1, i, buckets[i]); } for (unsigned int i = 0; i < ncpus; i++) { - void *value; - check(pthread_join(ts[i], &value) == 0); + check(pthread_join(ts[i], NULL) == 0); } return const_cast<const bucket**>(buckets); // TODO why is this cast needed? @@ -257,11 +265,10 @@ } unsigned int -db::push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes) +db::push_bucket(vector<char*> & heads, bucket *bs, const char *s, const char *p, size_t nbytes) { size_t h = hash_djb2(s); unsigned int bucket = h % ncpus; - size_t bucket_size = max((size_t) min_bucket_size, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; @@ -288,7 +295,7 @@ partstart - buf << ".." << partend - buf << endl; // Position the writing heads at the head of each bucket. (TODO find better // name than head) - char *heads[ncpus]; + vector<char*> heads(ncpus); for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } @@ -323,7 +330,7 @@ // Position the writing heads at the head of each bucket. (TODO find better // name than head) - char *heads[ncpus]; + vector<char *> heads(ncpus); for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } @@ -352,8 +359,7 @@ tmp[tmplen-1] = '\0'; // Copy the tuple into the correct local bucket. - unsigned int bbb; - bbb = push_bucket(heads, bs, title, tmp, tmplen); + push_bucket(heads, bs, title, tmp, tmplen); counter++; // End of tuple? @@ -376,7 +382,7 @@ const hmap * movdb::build(const bucket **movbucs) { - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); hmap *hs = new hmap[ncpus]; for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &movdb::build1, i, movbucs, &hs[i]); @@ -416,7 +422,7 @@ void actdb::probe(const hmap *hs, const bucket **actbucs, bool show_progress) { - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &actdb::probe1, i, &hs[i], actbucs); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-03-24 15:40:43
|
Revision: 643 http://assorted.svn.sourceforge.net/assorted/?rev=643&view=rev Author: yangzhang Date: 2008-03-24 08:40:30 -0700 (Mon, 24 Mar 2008) Log Message: ----------- added libnuma, but it's unused Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-03-24 03:37:25 UTC (rev 642) +++ hash-join/trunk/src/hashjoin.cc 2008-03-24 15:40:30 UTC (rev 643) @@ -31,6 +31,14 @@ #include <boost/scoped_array.hpp> +#ifdef NUMA +#ifdef TILE64 +#include <linux/numa.h> +#else +#include <numa.h> +#endif +#endif + using namespace std; using namespace boost; #ifndef STLPORT This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |