[Assorted-commits] SF.net SVN: assorted: [330] hash-join/trunk/src/hashjoin.cc
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-06 23:01:54
|
Revision: 330 http://assorted.svn.sourceforge.net/assorted/?rev=330&view=rev Author: yangzhang Date: 2008-02-06 15:01:49 -0800 (Wed, 06 Feb 2008) Log Message: ----------- cleaned up Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-02-06 17:29:57 UTC (rev 329) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 23:01:49 UTC (rev 330) @@ -1,3 +1,4 @@ +#include <memory> #include <cassert> #include <cstdio> #include <iostream> @@ -3,5 +4,4 @@ #include <exception> #include <vector> - #include <ext/hash_map> @@ -10,27 +10,38 @@ #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> - #include <pthread.h> #include "method_thread1.h" // -// c++ commons :: numa -// TODO document +// C++ Commons :: NUMA // using namespace std; -// TODO replace with a macro +// TODO: Figure out how to create an exception with a useful message. inline void -check(bool cond) +_check(bool cond, const char *msg, const char *file, int line) { if (!cond) { throw exception(); } } +#define check(cond) _check(cond, NULL, __FILE__, __LINE__) + +/** + * Similar to assert(), but is not conditionally compiled, so this is safe to + * use as a guard against expected failures (such as checking return codes). + */ +#define checkmsg(cond, msg) \ + bool b = cond; \ + if (!b) _check(b, (msg), __FILE__, __LINE__) + +/** + * Search in p for the nth instance of c and return the character past it. + */ inline const char * strchrrep(const char *p, char c, int n) { @@ -42,13 +53,23 @@ return p; } +/** + * Search in p for the nth instance of c and return the character past it. + * TODO figure out if there's a way to merge this and the above rather than + * maintaining two versions. (Related to Linus Torvalds' post on const?) + */ inline char * strchrrep(char *p, char c, int n) { return const_cast<char *>(strchrrep(const_cast<const char *>(p), c, n)); } -inline long long current_time_millis() { +/** + * Get the current time in milliseconds. + */ +inline long long +current_time_millis() +{ long long t; struct timeval tv; @@ -60,12 +81,16 @@ return t; } +/** + * Convenience class for performing wall-clock benchmarking. + */ class timer { public: timer(const string label) : label(label), start(current_time_millis()), last(start) {} - void print() { + void print() + { long long now = current_time_millis(); cout << label << now - last << endl; last = now; @@ -75,11 +100,47 @@ long long start, last; }; -void -load_file(const char *path, char *&buf, size_t & len, unsigned int ncpus) { +/** + * A functor that checks for string equality. Mainly useful as a template + * parameter to the hash data structures in STL extensions. + */ +struct eqstr +{ + bool operator()(const char* s1, const char* s2) const + { + return strcmp(s1, s2) == 0; + } +}; + +/** + * Look for a substring, but without null-termination conventions. + */ +inline char * +unsafe_strstr(char *p, char *q, char *lim) +{ + if (lim == 0) { + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0'); p++); + return p; + } + } else { + check(p < lim); + while (true) { + for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); + if (p == lim) return NULL; + return p; + } + } +} + +/** + * Load an entire file into buf and also give us the length of the buffer. + * TODO this probably isn't very safe, since we're demoting an off_t to a + * size_t. Is there a healthier approach? + */ +char * +load_file(const char *path, size_t & len, unsigned int ncpus) { struct stat sb; - // pthread_t tha[CPUS]; - // void *value; int fd; fd = open(path, 0); @@ -88,119 +149,122 @@ check(fstat(fd, &sb) == 0); check(sb.st_size <= 0xffffffff); - // TODO why don't i need (static) cast here? + // TODO Why don't we need (static) cast here? Isn't this a lossy cast? len = sb.st_size; - buf = new char[len + 1]; + char *buf = new char[len + 1]; check(buf); - // XXX use threads to pull data to the correct initial locations? -// #if CPUS > 1 + // TODO Use threads to pull data to the correct initial locations? size_t chunk_len = len / ncpus; for (unsigned int i = 0; i < ncpus; i++) { - // TODO review C++ cast rules int off = i *chunk_len; ssize_t status = pread(fd, buf + off, chunk_len, off); - // we read the whole chunk or hit the end + // We read the whole chunk or hit the end. size_t nread = static_cast<ssize_t>(status); check(status != -1 && (nread == chunk_len || off + nread == len)); -// tha[i] = method_thread1(this, &MapReduce::readin1, i *chunk_len, chunk_len); } -// for(i = 0; i < ncpus; i++) -// check(pthread_join(tha[i], &value) == 0); -// #else -// readin1(0, len); -// #endif - check(close(fd) == 0); buf[len] = '\0'; // don't let strcmp() run off the end + return buf; } // -// hashjoin +// Hash Join // using namespace std; using namespace __gnu_cxx; -struct eqstr -{ - bool operator()(const char* s1, const char* s2) const - { - return strcmp(s1, s2) == 0; - } -}; - -// TODO dependency injection +// TODO use dependency injection! unsigned int ncpus = 1; typedef hash_map<const char *, const void *, hash<const char *>, eqstr> hmap; const hmap::size_type map_size = 10000000; -class bucket { +/** + * Buckets are produced in the hash-partitioning phase. These are simple + * storage containers. + */ +class bucket +{ public: + ~bucket() + { + // XXX check this + for (size_t i = 0; i < bufs.size(); i++) { + delete [] bufs[i]; + } + } + /** + * The sizes of the bufs. Should always be the same length as bufs. + */ vector<size_t> sz; + /** + * The data that we hold. + */ vector<char *> bufs; }; -class db { +/** + * An abstract in-memory database that holds "tuples" in a contiguous buffer. + * The format/interpretation of the buffers is up to the subclasses. + */ +class db +{ public: - db(const char *path) { load_file(path, buf, buflen, ncpus); } + db(const char *path) : buf(load_file(path, buflen, ncpus)) {} const bucket **partition(); + /** + * This routine runs on each processor to hash-partition the data into local + * buckets. + */ virtual void partition1(unsigned int pid, bucket* bucket) = 0; - virtual ~db() {} + virtual ~db() { delete [] buf; } unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes); protected: - // TODO smart pointer char *buf; size_t buflen; }; -class movdb : public db { +/** + * This is something which we must free. + */ +class movdb : public db +{ public: movdb(const char *path) : db(path) {} virtual ~movdb() {} + /** + * Build the hash map in parallel. + */ const hmap *build(const bucket **movbucs); + /** + * Each node runs this routine to construct its local hash map. + */ void build1(unsigned int pid, const bucket **movbucs, hmap *h); void partition1(unsigned int pid, bucket* bucket); }; -class actdb : public db { +class actdb : public db +{ public: actdb(const char *path) : db(path) {} virtual ~actdb() {} + /** + * Probe the hash maps with tuples from the actor buckets. + */ void probe(const hmap *hs, const bucket **actbucs, bool show_progress); - void probe1(unsigned int pid, const hmap *hh, const bucket **actbucs); + /** + * Each node runs this routine to probe into its local hash map using tuples + * from actor buckets destined for that node. + */ + void probe1(unsigned int pid, const hmap *ph, const bucket **actbucs); void partition1(unsigned int pid, bucket* bucket); }; -// template <typename T> -// class bucket { -// public: -// bucket(int count) { -// ts = new (T*)[count]; -// } -// private: -// T *ts; -// } -// -// typedef vector<const char *> bucket; -// -// class hmap { -// public: -// hmap(int nbuckets) : nbuckets(nbuckets), nentries(0) { -// buckets = new bucket[nbuckets]; -// check(buckets); -// } -// hmap() : nbuckets(nbuckets_default); -// private: -// bucket *buckets; -// int nbuckets; -// int nentries; -// }; - int main(int argc, char *argv[]) { @@ -215,14 +279,18 @@ timer t("main time: "); + // Load the data files. + cout << "loading movies" << endl; - movdb mdb(movies); // "../movie-data/movies.dat" + movdb mdb(movies); t.print(); cout << "loading actresses" << endl; - actdb adb(actresses); // "../movie-data/mdactresses.dat" + actdb adb(actresses); t.print(); + // Hash-partition the data among the nodes. + cout << "hash-partitioning movies into per-core buckets" << endl; const bucket **movbucs = mdb.partition(); t.print(); @@ -231,6 +299,8 @@ const bucket **actbucs = adb.partition(); t.print(); + // Perform the hash-join. + cout << "building with movies" << endl; const hmap *hs = mdb.build(movbucs); t.print(); @@ -271,53 +341,10 @@ check(pthread_join(ts[i], &value) == 0); } - // // Now from the consumer - // for (int i = 0; i < ncpus; i++) { - // ts[i] = method_thread1( - // // XXX - // ); - // } return const_cast<const bucket**>(buckets); // TODO why is this cast needed? } -// XXX -//inline const char * -//unsafe_strstr(const char *p, const char *q, const char *lim) -//{ -// while (true) { -// if (lim > 0 && p >= lim) return NULL; -// p = strchr(p, '\0') + 1; -// if (lim > 0 && p >= lim) return NULL; -// if (*p == '\0') return p; -// } -//} - inline char * -unsafe_strstr(char *p, char *q, char *lim) -{ - if (lim == 0) { - while (true) { - for (; !(*p == '\0' && *(p+1) == '\0'); p++); - return p; - } - } else { - while (true) { - for (; !(*p == '\0' && *(p+1) == '\0') && p < lim; p++); - if (p == lim) return NULL; - return p; - } - } -} - -// inline char * -// unsafe_strstr(char *p, char *q, char *lim) -// { -// return const_cast<char *>(unsafe_strstr(const_cast<const char*>(p), -// const_cast<const char*>(q), -// const_cast<const char*>(lim))); -// } - -inline char * next_tuple(char *p) { char *next = unsafe_strstr(p, "\0\0", 0); @@ -334,16 +361,12 @@ { size_t h = __stl_hash_string(s); unsigned int bucket = h % (map_size * ncpus) / map_size; - //cout << s << " : " << bucket << endl; - //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; int bucket_size = max(1000000UL,buflen / ncpus * 3); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; return -1; } else { - //cout << s << endl; - // cout << (uintptr_t)heads[bucket] << " " << nbytes << " " << (uintptr_t)bs[bucket].buf << " " << bucket_size << endl; bs[bucket].sz.back() = heads[bucket] - bs[bucket].bufs.back(); bs[bucket].bufs.push_back(new char[bucket_size]); check(bs[bucket].bufs.back()); @@ -369,22 +392,15 @@ for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } - // Statistics (TODO dynamic allocation) - int counter = 0, mincount = INT_MAX; + int counter = 0; char *p = partstart, *end = partend; + // Iterate over the partitions. while (p < end) { - // cout << "remaining: " << end - p << " " << (uintptr_t) p << " ; " << ((int) *(p-1)) << " ; " << ((int) *(p)) << " ; " << ((int) *(p+1)) << endl; char *title = p; char *release = strchr(p, '\0') + 1; p = strchr(release, '\0') + 2; - // printf("%s (%d) / %s (%d) %d %d %d %d\n", title, strlen(title), release, strlen(release), *(p - 4), *(p - 3), *(p - 2), *(p - 1)); - // Copy this line into the correct local bucket. - if (-1 != push_bucket(heads, bs, title, title, p - title)) { - //cout << "FUCK " << heads[0] - bs[0].buf << " " << heads[1] - bs[1].buf << " " << p - title << endl; - //mincount = min(mincount, counter); - //if (mincount == counter) cout << "CRAP" << counter << endl; - //cout << "overflowed on: " << title << endl; - } + // Copy this tuple into the correct local bucket. + push_bucket(heads, bs, title, title, p - title); counter++; } // Record the written size of each bucket. @@ -411,37 +427,33 @@ for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } + + // This is used for creating (name, title) tuples. (No tuple may exceed 1024 + // bytes.) char tmp[1024]; - // Statistics (TODO dynamic allocation) - int counter = 0, mincount = INT_MAX; + // Iterate over the partitions. char *p = partstart, *end = partend; + int counter = 0; while (p < end) { char *name = p; p = strchr(p, '\0') + 1; + // Fill in the first part of the tuple. strcpy(tmp, name); char *subtmp = tmp + strlen(name) + 1; - char *tuple_end = unsafe_strstr(p, "\0\0", end) + 2; while (true) { char *title = p; p = strchr(p, '\0') + 1; + // Fill in the second half of the tuple. strcpy(subtmp, title); size_t tmplen = subtmp + strlen(subtmp) + 2 - tmp; check(tmplen < 1024); tmp[tmplen-1] = '\0'; - // Copy this line into the correct local bucket. - //cout << "hashing " << title << endl; + // Copy the tuple into the correct local bucket. unsigned int bbb; - if (-1 != (bbb = push_bucket(heads, bs, title, tmp, tmplen))) { - //size_t bucket_size = max(1000000,buflen / ncpus * 2); //2 * buflen / ncpus; - //int bucket_size = max(1000000UL,buflen / ncpus * 3); - //cout << "FUCK " << heads[0] - bs[0].buf << " " << bucket_size << " " << heads[1] - bs[1].buf << " " << p - title << endl; - ////mincount = min(mincount, counter); - ////if (mincount == counter) cout << "CRAP" << counter << endl; - //cout << "overflowed " << bbb << " on: " << name << endl; - } + bbb = push_bucket(heads, bs, title, tmp, tmplen); counter++; // End of tuple? @@ -456,6 +468,8 @@ for (unsigned int i = 0; i < ncpus; i++) { bs[i].sz.back() = heads[i] - bs[i].bufs.back(); } + + // TODO fix this log msg to sum up the sz's rather than just showing the last cout << "actress count " << counter << " nbytes " << bs[0].sz.back()<< endl; } @@ -475,9 +489,9 @@ } void -movdb::build1(unsigned int pid, const bucket **movbucs, hmap *hh) +movdb::build1(unsigned int pid, const bucket **movbucs, hmap *ph) { - hmap &h = *hh; + hmap &h = *ph; // Visit each bucket that's destined to us (visit each source). for (unsigned int i = 0; i < ncpus; i++) { char *p = movbucs[i][pid].bufs[0], @@ -489,18 +503,10 @@ // Insert into hash map. h[title] = release; } - //cout << "cpu " << pid << " src " << i << " cumulative h.size " << h.size() - //<< endl; } } void -join(const char *movie, const char *actress) -{ - // cout << "JOINED: " << movie << " WITH " << actress << endl; -} - -void actdb::probe(const hmap *hs, const bucket **actbucs, bool show_progress) { pthread_t ts[ncpus]; @@ -513,31 +519,41 @@ } } +/** + * Dummy function that is called to represent emitting a joined tuple. + */ +inline void +join(const char *movie, const char *actress) +{ + if (false) cout << "JOINED: " << movie << " WITH " << actress << endl; +} + void -actdb::probe1(unsigned int pid, const hmap *hh, const bucket **actbucs) +actdb::probe1(unsigned int pid, const hmap *ph, const bucket **actbucs) { - const hmap &h = *hh; + const hmap &h = *ph; int hits = 0, misses = 0; + // For each source bucket. for (unsigned int i = 0; i < ncpus; i++) { char *p = actbucs[i][pid].bufs[0], *end = actbucs[i][pid].bufs[0] + actbucs[i][pid].sz[0]; + // Iterate over the bucket. while (p < end) { char *name = p; p = strchr(p, '\0') + 1; while (true) { char *title = p; p = strchr(p, '\0') + 1; - //cout << "name " << name << " title: " << title << p - title << endl; - // Emit any joined tuple. + // Emit the joined tuple (if a join was possible). if (h.find(title) != h.end()) { - //cout << " HIT" << endl; hits++; join(title, name); } else { cout << " MISS " << title << endl; misses++; } - // End of tuple? + // End of a tuple? (Don't actually need this check, since the + // hash-partitioning "normalizes" the tuples from the actresses file.) if (*p == '\0') { p++; break; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |