[Assorted-commits] SF.net SVN: assorted: [640] hash-join/trunk/src/hashjoin.cc
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-03-23 22:36:22
|
Revision: 640 http://assorted.svn.sourceforge.net/assorted/?rev=640&view=rev Author: yangzhang Date: 2008-03-23 15:36:23 -0700 (Sun, 23 Mar 2008) Log Message: ----------- - added support for stlport hash_map - fixed non-std c++ - 100K min bucket size - refactored bucket_size calculations - using vectors instead of raw arrays (in certain places) Modified Paths: -------------- hash-join/trunk/src/hashjoin.cc Modified: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc 2008-03-19 16:57:53 UTC (rev 639) +++ hash-join/trunk/src/hashjoin.cc 2008-03-23 22:36:23 UTC (rev 640) @@ -8,7 +8,11 @@ #include <iostream> #include <exception> #include <vector> +#ifdef STLPORT +#include <hash_map> +#else #include <ext/hash_map> +#endif #include <sys/types.h> #include <sys/stat.h> @@ -25,12 +29,14 @@ #include <commons/threads.h> #include <commons/time.h> -// TODO: #include <boost/array.hpp> +#include <boost/scoped_array.hpp> using namespace std; +using namespace boost; +#ifndef STLPORT using namespace __gnu_cxx; +#endif using namespace commons; -// TODO: using namespace boost; typedef hash_map< const char*, @@ -52,9 +58,9 @@ }; // TODO use dependency injection! -unsigned int ncpus = 1; -const hmap::size_type map_size = 1000000; -const int min_bucket_size = 1000000; +unsigned int ncpus = 1; +const hmap::size_type map_size = 1000000; // 1MB +const size_t min_bucket_size = 100000; // 100KB /** * Buckets are produced in the hash-partitioning phase. These are simple @@ -88,7 +94,10 @@ class db { public: - db(const char *path) : buf(load_file(path, buflen, ncpus)) + db(const char *path) : + buf(load_file(path, buflen, ncpus)), + // TODO dependency injection + bucket_size(max((size_t) min_bucket_size, buflen / ncpus / ncpus * 3)) { scan(buf, buflen); } @@ -107,8 +116,8 @@ * \param p The start of the tuple. * \param nbytes The length of the tuple. */ - unsigned int push_bucket(char **heads, bucket *bs, const char *s, const char - *p, size_t nbytes); + unsigned int push_bucket(vector<char *> & heads, bucket *bs, const char *s, + const char *p, size_t nbytes); protected: /** * This routine runs on each processor to hash-partition the data into local @@ -117,6 +126,7 @@ virtual void partition1(unsigned int pid, bucket* bucket) = 0; char *buf; size_t buflen; + size_t bucket_size; }; /** @@ -223,9 +233,8 @@ bucket **buckets = new bucket*[ncpus]; for (unsigned int i = 0; i < ncpus; i++) { buckets[i] = new bucket[ncpus]; + check(buckets[i]); for (unsigned int j = 0; j < ncpus; j++) { - // TODO dependency injection - size_t bucket_size = max((size_t) min_bucket_size,buflen / ncpus * 3); // Each bucket should be twice as large as it would be given uniform // distribution. This is just an initial size; extending can happen. buckets[i][j].bufs.push_back(new char[bucket_size]); @@ -237,13 +246,12 @@ // Partition the data into the buckets using the hash function. Reason for // buckets: poor man's message passing. All the data from CPU i to CPU j goes // into bucket[i][j]. - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &db::partition1, i, buckets[i]); } for (unsigned int i = 0; i < ncpus; i++) { - void *value; - check(pthread_join(ts[i], &value) == 0); + check(pthread_join(ts[i], NULL) == 0); } return const_cast<const bucket**>(buckets); // TODO why is this cast needed? @@ -257,11 +265,10 @@ } unsigned int -db::push_bucket(char **heads, bucket *bs, const char *s, const char *p, size_t nbytes) +db::push_bucket(vector<char*> & heads, bucket *bs, const char *s, const char *p, size_t nbytes) { size_t h = hash_djb2(s); unsigned int bucket = h % ncpus; - size_t bucket_size = max((size_t) min_bucket_size, buflen * 3 / ncpus); if (heads[bucket] + nbytes < bs[bucket].bufs.back() + bucket_size) { memcpy(heads[bucket], p, nbytes); heads[bucket] += nbytes; @@ -288,7 +295,7 @@ partstart - buf << ".." << partend - buf << endl; // Position the writing heads at the head of each bucket. (TODO find better // name than head) - char *heads[ncpus]; + vector<char*> heads(ncpus); for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } @@ -323,7 +330,7 @@ // Position the writing heads at the head of each bucket. (TODO find better // name than head) - char *heads[ncpus]; + vector<char *> heads(ncpus); for (unsigned int i = 0; i < ncpus; i++) { heads[i] = bs[i].bufs[0]; } @@ -352,8 +359,7 @@ tmp[tmplen-1] = '\0'; // Copy the tuple into the correct local bucket. - unsigned int bbb; - bbb = push_bucket(heads, bs, title, tmp, tmplen); + push_bucket(heads, bs, title, tmp, tmplen); counter++; // End of tuple? @@ -376,7 +382,7 @@ const hmap * movdb::build(const bucket **movbucs) { - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); hmap *hs = new hmap[ncpus]; for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &movdb::build1, i, movbucs, &hs[i]); @@ -416,7 +422,7 @@ void actdb::probe(const hmap *hs, const bucket **actbucs, bool show_progress) { - pthread_t ts[ncpus]; + vector<pthread_t> ts(ncpus); for (unsigned int i = 0; i < ncpus; i++) { ts[i] = method_thread(this, &actdb::probe1, i, &hs[i], actbucs); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |