Thread: [Assorted-commits] SF.net SVN: assorted: [325] hash-join
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-06 08:41:50
|
Revision: 325 http://assorted.svn.sourceforge.net/assorted/?rev=325&view=rev Author: yangzhang Date: 2008-02-06 00:41:56 -0800 (Wed, 06 Feb 2008) Log Message: ----------- added hash join Added Paths: ----------- hash-join/ hash-join/trunk/ hash-join/trunk/src/ hash-join/trunk/src/Makefile hash-join/trunk/src/hashjoin.cc Added: hash-join/trunk/src/Makefile =================================================================== --- hash-join/trunk/src/Makefile (rev 0) +++ hash-join/trunk/src/Makefile 2008-02-06 08:41:56 UTC (rev 325) @@ -0,0 +1,9 @@ +all: hashjoin + +hashjoin: hashjoin.cc + g++ -g3 -Wall -o hashjoin hashjoin.cc -lprofiler + +clean: + rm -f hashjoin + +.PHONY: clean Added: hash-join/trunk/src/hashjoin.cc =================================================================== --- hash-join/trunk/src/hashjoin.cc (rev 0) +++ hash-join/trunk/src/hashjoin.cc 2008-02-06 08:41:56 UTC (rev 325) @@ -0,0 +1,289 @@ +#include <cassert> +#include <cstdio> +#include <iostream> +#include <exception> +#include <vector> + +#include <ext/hash_map> + +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/time.h> +#include <fcntl.h> + +#include <pthread.h> + +// +// c++ commons :: numa +// TODO document +// + +using namespace std; + +// TODO replace with a macro +inline void +check(bool cond) +{ + if (!cond) { + throw exception(); + } +} + +inline const char * +strchrrep(const char *p, char c, int n) +{ + for (int i = 0; i < n; i++) { + p = strchr(p, c); + check(p); + p++; + } + return p; +} + +inline char * +strchrrep(char *p, char c, int n) +{ + return const_cast<char *>(strchrrep(const_cast<const char *>(p), c, n)); +} + +inline long long current_time_millis() { + long long t; + struct timeval tv; + + gettimeofday(&tv, 0); + + t = tv.tv_sec; + t = (t *1000) + (tv.tv_usec/1000); + + return t; +} + +class timer +{ +public: + timer() : start(current_time_millis()), last(start) {} + void print() { + long long now = current_time_millis(); + cout << now - last << endl; + last = now; + } +private: + long long start, last; +}; + +void +load_file(const char *path, char *&buf, size_t & len, unsigned int ncpus) { + struct stat sb; + // pthread_t tha[CPUS]; + // void *value; + int fd; + + fd = open(path, 0); + check(fd >= 0); + + check(fstat(fd, &sb) == 0); + check(sb.st_size <= 0xffffffff); + + // TODO why don't i need (static) cast here? + len = sb.st_size; + + buf = new char[len + 1]; + check(buf); + + // XXX use threads to pull data to the correct initial locations? +// #if CPUS > 1 + size_t chunk_len = len / ncpus; + for (unsigned int i = 0; i < ncpus; i++) { + // TODO review C++ cast rules + int off = i *chunk_len; + ssize_t status = pread(fd, buf + off, chunk_len, off); + // we read the whole chunk or hit the end + size_t nread = static_cast<ssize_t>(status); + check(status != -1 && (nread == chunk_len || off + nread == len)); +// tha[i] = method_thread1(this, &MapReduce::readin1, i *chunk_len, chunk_len); + } + +// for(i = 0; i < ncpus; i++) +// check(pthread_join(tha[i], &value) == 0); +// #else +// readin1(0, len); +// #endif + + check(close(fd) == 0); + + buf[len] = '\0'; // don't let strcmp() run off the end +} + +// +// hashjoin +// + +using namespace std; +using namespace __gnu_cxx; + +struct eqstr +{ + bool operator()(const char* s1, const char* s2) const + { + return strcmp(s1, s2) == 0; + } +}; + +// TODO dependency injection +const unsigned int ncpus = 2; +typedef hash_map<const char *, const void *, hash<const char *>, eqstr> hmap; +const hmap::size_type map_size = 1000000; + +class db { +public: + db(const char *path) { load_file(path, buf, buflen, ncpus); } +protected: + // TODO smart pointer + char *buf; + size_t buflen; +}; + +class movdb : db { +public: + movdb(const char *path) : db(path) {} + void build(hmap &h); +}; + +class actdb : db { +public: + actdb(const char *path) : db(path) {} + void probe(const hmap &h, bool show_progress); +}; + +// template <typename T> +// class bucket { +// public: +// bucket(int count) { +// ts = new (T*)[count]; +// } +// private: +// T *ts; +// } +// +// typedef vector<const char *> bucket; +// +// class hmap { +// public: +// hmap(int nbuckets) : nbuckets(nbuckets), nentries(0) { +// buckets = new bucket[nbuckets]; +// check(buckets); +// } +// hmap() : nbuckets(nbuckets_default); +// private: +// bucket *buckets; +// int nbuckets; +// int nentries; +// }; + +int +main(int argc, char *argv[]) +{ + timer t; + + cout << "loading movies" << endl; + movdb movdb("../movie-data/movies.list"); + t.print(); + + cout << "loading actresses" << endl; + actdb actdb("../movie-data/actresses.list"); + t.print(); + + hmap h(map_size); + + cout << "building with movies" << endl; + movdb.build(h); + t.print(); + + cout << "probing with actresses" << endl; + actdb.probe(h, true); + t.print(); + + cout << "done" << endl; +} + +void +movdb::build(hmap &h) +{ + char *p = buf, *end = buf + buflen; + // Skip 20 lines. + p = strchrrep(p, '\n', 20); + while (p < end) { + // Search for end of title and null-terminate it. + char *title = strsep(&p, "\t"); + if (p == NULL) { + // The last line is ----------- so ignore it. + break; + } + // Search for start of release date. + char *release = p + strcspn(p, "\t"); + // Insert into hash map. + h[title] = release; + // Search for next line. + char *newline = strchr(release, '\n'); + check(newline != NULL); + p = newline + 1; + } +} + +void +join(const char *movie, const char *actress) +{ + // cout << "JOINED: " << movie << " WITH " << actress << endl; +} + +void +actdb::probe(const hmap &h, bool show_progress) +{ + char *p = buf, *end = buf + buflen; + p = strchrrep(p, '\n', 240); + int counter = 0, hits = 0, misses = 0; + while (p < end) { + counter++; + // Search for end of name and null-terminate it. + char *name = strsep(&p, "\t"); + if (p == NULL) { + // The last line is ------------ so ignore it. + break; + } + if (show_progress && counter % 100000 == 0) { + cout << hits << " " << misses << " " << name << endl; + } + while (true) { + // Search for start of movie title. + char *title = p + strspn(p, "\t"); + // Search for end of movie title and null-terminate it. + char *title_end1 = strstr(title, " "); + char *title_end2 = strchr(title, '\n'); + check(title_end2); + char *title_end = title_end1 != NULL && title_end1 < title_end2 ? title_end1 : title_end2; + *title_end = '\0'; + p = title_end + 2; + // Emit any joined tuple. + hmap::const_iterator i = h.find(title); + if (i != h.end()) { + hits++; + join(i->first, name); + } else { + misses++; + } + // Search for next line. + char *newline = strchr(p, '\n'); + check(newline); + p = newline + 1; + // Two consecutive newlines means new actress. + if (*p == '\n') { + p++; + break; + } else { + check(*p == '\t'); + } + } + } +} + +// vim:et:sw=2:ts=2 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |