From: <tho...@us...> - 2010-11-23 16:30:25
|
Revision: 3980 http://bigdata.svn.sourceforge.net/bigdata/?rev=3980&view=rev Author: thompsonbry Date: 2010-11-23 16:30:18 +0000 (Tue, 23 Nov 2010) Log Message: ----------- javadoc edits on join graph and extensible hashing Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-23 15:22:27 UTC (rev 3979) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-23 16:30:18 UTC (rev 3980) @@ -131,11 +131,38 @@ * query optimizer SHOULD pay attention to these things and exploit their * conditional selectivity for the query plan.] * - * @todo When there are optional join graphs, are we going to handle that by - * materializing a sample (or all) of the joins feeding that join graph - * and then apply the runtime optimizer to the optional join graph, - * getting out a sample to feed onto any downstream join graph? + * @todo Handle optional join graphs by first applying the runtime optimizer to + * the main join graph and obtaining a sample for the selected join path. + * That sample will then be feed into the the optional join graph in order + * to optimize the join order within the optional join graph (a join order + * which is selective in the optional join graph is better since it will + * result in faster rejections of intermediate results and hence do less + * work). + * <p> + * This is very much related to accepting a collection of non-empty + * binding sets when running the join graph. However, optional join graph + * should be presented in combination with the original join graph and the + * starting paths must be constrained to have the selected join path for + * the original join graph as a prefix. With this setup, the original join + * graph has been locked in to a specific join path and the sampling of + * edges and vertices for the optional join graph can proceed normally. + * <p> + * True optionals will always be appended as part of the "tail plan" for + * any join graph and can not be optimized as each optional join must run + * regardless (as long as the intermediate solution survives the + * non-optional joins). * + * @todo There are two cases where a join graph must be optimized against a + * specific set of inputs. In one case, it is a sample (this is how + * optimization of an optional join group proceeds per above). In the + * other case, the set of inputs is fixed and is provided instead of a + * single empty binding set as the starting condition. This second case is + * actually a bit more complicated since we can not use a random sample of + * vertices unless the do not share any variables with the initial binding + * sets. When there is a shared variable, we need to do a cutoff join of + * the edge with the initial binding sets. When there is not a shared + * variable, we can sample the vertex and then do a cutoff join. + * * @todo When we run into a cardinality estimation underflow (the expected * cardinality goes to zero) we could double the sample size for just * those join paths which hit a zero estimated cardinality and re-run them Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java 2010-11-23 15:22:27 UTC (rev 3979) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java 2010-11-23 16:30:18 UTC (rev 3980) @@ -35,176 +35,125 @@ * Test suite for extensible hashing. * * <br> - * (***) Persistence capable hash table for high volume hash joins. * - * The data will be "rows" in a "relation" modeled using binding sets. We can - * use dense encoding of these rows since they have a fixed schema (some columns - * may allow nulls). There should also be a relationship to how we encode these - * data for network IO. + * @todo Persistence capable hash table for high volume hash joins. The data + * will be "rows" in a "relation" modeled using binding sets. We can use + * dense encoding of these rows since they have a fixed schema (some + * columns may allow nulls). There should also be a relationship to how we + * encode these data for network IO. + * <p> + * @todo Extensible hashing: + * <p> + * - hash(byte[] key) -> IRaba page. Use IRaba for keys/values and key + * search. + * <p> + * - Split if overflows the bucket size (alternative is some versioning + * where the computed hash value indexes into a logical address which is + * then translated to an IRawStore address - does the RWStore help us out + * here?) + * <p> + * - Ring buffer to wire in hot nodes (but expect random touches). + * <p> + * - initially, no history (no versioning). just replace the record when + * it is evicted from the ring buffer. + * <p> + * What follows is a summary of an extensible hashing design for bigdata. + * This covers most aspects of the hash map design, but does not drill + * deeply into the question of scale-out hash maps. The immediate goal is + * to develop a hash map which can be used for a variety of tasks, + * primarily pertaining to analytic query as described above. + * <p> + * Extensible hashing is one form of dynamic hashing in which buckets are + * split or coalesced as necessary and in which the reorganization is + * performed on one bucket at a time. + * <p> + * Given a hash function h generating, e.g., int32 values where b is the + * #of bits in the hash code. At any point, we use 0 LTE i LTE b bits of + * that hash code as an index into a table of bucket addresses. The value + * of i will change as the #of buckets changes based on the scale of the + * data to be addressed. + * <p> + * Given a key K, the bucket address table is indexed with i bits of the + * hash code, h(K). The value at that index is the address of the hash + * bucket. However, several consecutive entries in the hash table may + * point to the same hash bucket (for example, the hash index may be + * created with i=4, which would give 16 index values but only one initial + * bucket). The bucket address table entries which map onto the same hash + * bucket will have a common bit length, which may be LTE [i]. This bit + * length is not stored in the bucket address table, but each bucket knows + * its bit length. Given a global bit length of [i] and a bucket bit + * length of [j], there will be 2^(i-j) bucket address table entries which + * point to the same bucket. + * <p> + * Hash table versioning can be easily implemented by: (a) a checkpoint + * record with the address of the bucket address table (which could be + * broken into a two level table comprised of 4k pages in order to make + * small updates faster); and (b) a store level policy such that we do not + * overwrite the modified records directly (though they may be recycled). + * This will give us the same consistent read behind behavior as the + * B+Tree. + * <p> + * The IIndex interface will need to be partitioned appropriately such + * that the IRangeScan interface is not part of the hash table indices (an + * isBTree() and isHashMap() method might be added). + * <p> + * While the same read-through views for shards should work with hash maps + * as work with B+Tree indices, a different scheme may be necessary to + * locate those shards and we might need to use int64 hash codes in + * scale-out or increase the page size (at least for the read-only hash + * segment files, which would also need a batch build operation). The + * AccessPath will also need to be updated to be aware of classes which do + * not support key-range scans, but only whole relation scans. + * <p> + * Locking on hash tables without versioning should be much simpler than + * locking on B+Trees since there is no hierarchy and more operations can + * proceed without blocking in parallel. + * <p> + * We can represent tuples (key,value pairs) in an IRaba data structure + * and reuse parts of the B+Tree infrastructure relating to compression of + * IRaba, key search, etc. In fact, we might use to lazy reordering notion + * from Monet DB cracking to only sort the keys in a bucket when it is + * persisted. This is also a good opportunity to tackling splitting the + * bucket if it overflows the target record size, e.g., 4k. We could throw + * out an exception if the sorted, serialized, and optionally compressed + * record exceeds the target record size and then split the bucket. All of + * this seems reasonable and we might be able to then back port those + * concepts into the B+Tree. + * <p> + * We need to estimate the #of tuples which will fit within the bucket. We + * can do this based on: (a) the byte length of the keys and values (key + * compression is not going to help out much for a hash index since the + * keys will be evenly distributed even if they are ordered within a + * bucket); (b) the known per tuple overhead and per bucket overhead; (c) + * an estimate of the compression ratio for raba encoding and record + * compression. This estimate could be used to proactively split a bucket + * before it is evicted. This is most critical before anything is evicted + * as we would otherwise have a single very large bucket. So, let's make + * this simple and split the bucket if the sum of the key + val bytes + * exceeds 120% of the target record size (4k, 8k, etc). The target page + * size can be a property of the hash index. [Note: There is an implicit + * limit on the size of a tuple with this approach. The alternative is to + * fix the #of tuples in the bucket and allow buckets to be of whatever + * size they are for the specific data in that bucket.] * - * https://sourceforge.net/apps/trac/bigdata/ticket/203 + * @todo RWStore integration notes: + * <p> + * - RWStore with "temporary" quality. Creates the backing file lazily on + * eviction from the write service. + * <p> + * - RWStore with "RAM" only? (Can not exceed the #of allocated buffers or + * can, but then it might force paging out to swap?) + * <p> + * - RWStore with "RAM" mostly. Converts to disk backed if uses all those + * buffers. Possibly just give the WriteCacheService a bunch of write + * cache buffers (10-100) and have it evict to disk *lazily* rather than + * eagerly (when the #of free buffers is down to 20%). + * <p> + * - RWStore with memory mapped file? As I recall, the problem is that we + * can not guarantee extension or close of the file under Java. But some + * people seem to make this work... * - * - * Extendable hash table: - * - * - hash(byte[] key) -> IRaba page. Use IRaba for keys/values and key search. - * - * - Split if overflows the bucket size (alternative is some versioning where - * the computed hash value indexes into a logical address which is then - * translated to an IRawStore address - does the RWStore help us out here?) - * - * - ring buffer to wire in hot nodes (but expect random touches). - * - * - initially, no history (no versioning). just replace the record when it is - * evicted from the ring buffer. - * - * What follows is a summary of an extendable hash map design for bigdata. This - * covers most aspects of the hash map design, but does not drill deeply into - * the question of scale-out hash maps. The immediate goal is to develop a hash - * map which can be used for a variety of tasks, primarily pertaining to - * analytic query as described above. - * - * Extendable hashing is one form of dynamic hashing in which buckets are split - * or coalesced as necessary and in which the reorganization is performed on one - * bucket at a time. - * - * Given a hash function h generating, e.g., int32 values where b is the #of - * bits in the hash code. At any point, we use 0 LTE i LTE b bits of that hash - * code as an index into a table of bucket addresses. The value of i will change - * as the #of buckets changes based on the scale of the data to be addressed. - * - * Given a key K, the bucket address table is indexed with i bits of the hash - * code, h(K). The value at that index is the address of the hash bucket. - * However, several consecutive entries in the hash table may point to the same - * hash bucket (for example, the hash index may be created with i=4, which would - * give 16 index values but only one initial bucket). The bucket address table - * entries which map onto the same hash bucket will have a common bit length, - * which may be LTE [i]. This bit length is not stored in the bucket address - * table, but each bucket knows its bit length. Given a global bit length of [i] - * and a bucket bit length of [j], there will be 2^(i-j) bucket address table - * entries which point to the same bucket. - * - * Lookup: Compute h(K) and right shift (w/o sign extension) by i bits. Use this - * to index into the bucket address table. The address in the table is the - * bucket address and may be used to directly read the bucket. - * - * Insert: Per lookup. On overflow, we need to split the bucket moving the - * existing records (and the new record) into new buckets. How this proceeds - * depends on whether the hash #of bits used in the bucket is equal to the #of - * bits used to index into the bucket address table. There are two cases: - * - * Split case 1: If i (global bits of the hash which are in use) == j (bucket - * bits of the hash which are in use), then the bucket address table is out of - * space and needs to be resized. Let i := i+1. This doubles the size of the - * bucket address table. Each original entry becomes two entries in the new - * table. For the specific bucket which is to be split, a new bucket is - * allocated and the 2nd bucket address table for that entry is set to the - * address of the new bucket. The tuples are then assigned to the original - * bucket and the new bucket by considering the additional bit of the hash code. - * Assuming that all keys are distinct, then one split will always be sufficient - * unless all tuples in the original bucket have the same hash code when their - * i+1 th bit is considered. In this case, we resort to an "overflow" bucket - * (alternatively, the bucket is allowed to be larger than the target size and - * gets treated as a blob). - * - * Split case 2: If i is GT j, then there will be at least two entries in the - * bucket address table which point to the same bucket. One of those entries is - * relabeled. Both the original bucket and the new bucket have their #of bits - * incremented by one, but the #of global bits in use does not change. Of the - * entries in the bucket address table which used to point to the original - * bucket, the 1st half are left alone and the 2nd half are updated to point to - * the new bucket. (Note that the #of entries depends on the global #of hash - * bits in use and the bucket local #of hash bits in use and will be 2 if there - * is a difference of one between those values but can be more than 2 and will - * always be an even number). The entries in the original bucket are rehashed - * and assigned based on the new #of hash bits to be considered to either the - * original bucket or the new bucket. The record is then inserted based on the - * new #of hash bits to be considered. If it still does not fit, then either - * handle by case (1) or case (2) as appropriate. - * - * Note that records which are in themselves larger than the bucket size must - * eventually be handled by: (A) using an overflow record; (B) allowing the - * bucket to become larger than the target page size (using a larger allocation - * slot or becoming a blob); or (C) recording the tuple as a raw record and - * maintaining only the full hash code of the tuple and its raw record address - * in the bucket (this would allow us to automatically promote long literals out - * of the hash bucket and a similar approach might be used for a B+Tree leaf, - * except that a long key will still cause a problem [also, this implies that - * deleting a bucket or leaf on the unisolated index of the RWStore might - * require a scan of the IRaba to identify blob references which must also be - * deleted, so it makes sense to track those as part of the bucket/leaf - * metadata). - * - * Delete: Buckets may be removed no later than when they become empty and doing - * this is a local operation with costs similar to splitting a bucket. Likewise, - * it is clearly possible to coalesce buckets which underflow before they become - * empty by scanning the 2^(i-j) buckets indexed from the entries in the bucket - * address table using i bits from h(K). [I need to research handling deletes a - * little more, including under what conditions it is cost effective to reduce - * the size of the bucket address table itself.] - * - * Hash table versioning can be easily implemented by: (a) a checkpoint record - * with the address of the bucket address table (which could be broken into a - * two level table comprised of 4k pages in order to make small updates faster); - * and (b) a store level policy such that we do not overwrite the modified - * records directly (though they may be recycled). This will give us the same - * consistent read behind behavior as the B+Tree. - * - * The IIndex interface will need to be partitioned appropriately such that the - * IRangeScan interface is not part of the hash table indices (an isBTree() and - * isHashMap() method might be added). - * - * While the same read-through views for shards should work with hash maps as - * work with B+Tree indices, a different scheme may be necessary to locate those - * shards and we might need to use int64 hash codes in scale-out or increase the - * page size (at least for the read-only hash segment files, which would also - * need a batch build operation). The AccessPath will also need to be updated to - * be aware of classes which do not support key-range scans, but only whole - * relation scans. - * - * Locking on hash tables without versioning should be much simpler than locking - * on B+Trees since there is no hierarchy and more operations can proceed - * without blocking in parallel. - * - * We can represent tuples (key,value pairs) in an IRaba data structure and - * reuse parts of the B+Tree infrastructure relating to compression of IRaba, - * key search, etc. In fact, we might use to lazy reordering notion from Monet - * DB cracking to only sort the keys in a bucket when it is persisted. This is - * also a good opportunity to tackling splitting the bucket if it overflows the - * target record size, e.g., 4k. We could throw out an exception if the sorted, - * serialized, and optionally compressed record exceeds the target record size - * and then split the bucket. All of this seems reasonable and we might be able - * to then back port those concepts into the B+Tree. - * - * We need to estimate the #of tuples which will fit within the bucket. We can - * do this based on: (a) the byte length of the keys and values (key compression - * is not going to help out much for a hash index since the keys will be evenly - * distributed even if they are ordered within a bucket); (b) the known per - * tuple overhead and per bucket overhead; (c) an estimate of the compression - * ratio for raba encoding and record compression. This estimate could be used - * to proactively split a bucket before it is evicted. This is most critical - * before anything is evicted as we would otherwise have a single very large - * bucket. So, let's make this simple and split the bucket if the sum of the key - * + val bytes exceeds 120% of the target record size (4k, 8k, etc). The target - * page size can be a property of the hash index. [Note: There is an implicit - * limit on the size of a tuple with this approach. The alternative is to fix - * the #of tuples in the bucket and allow buckets to be of whatever size they - * are for the specific data in that bucket.] - * - * - RWStore with "temporary" quality. Creates the backing file lazily on - * eviction from the write service. - * - * - RWStore with "RAM" only? (Can not exceed the #of allocated buffers or can, - * but then it might force paging out to swap?) - * - * - RWStore with "RAM" mostly. Converts to disk backed if uses all those - * buffers. Possibly just give the WriteCacheService a bunch of write cache - * buffers (10-100) and have it evict to disk *lazily* rather than eagerly (when - * the #of free buffers is down to 20%). - * - * - RWStore with memory mapped file? As I recall, the problem is that we can - * not guarantee extension or close of the file under Java. But some people seem - * to make this work... + * @see https://sourceforge.net/apps/trac/bigdata/ticket/203 */ public class TestExtensibleHashing extends TestCase2 { @@ -547,13 +496,18 @@ return bucketSize; } - + /** * Return <code>true</code> iff the hash table contains the key. + * <p> + * Lookup: Compute h(K) and right shift (w/o sign extension) by i bits. + * Use this to index into the bucket address table. The address in the + * table is the bucket address and may be used to directly read the + * bucket. * * @param key * The key. - * + * * @return <code>true</code> iff the key was found. */ public boolean contains(final int key) { @@ -565,7 +519,12 @@ /** * Insert the key into the hash table. Duplicates are allowed. + * <p> + * Insert: Per lookup. On overflow, we need to split the bucket moving + * the existing records (and the new record) into new buckets. * + * @see #split(int, int, SimpleBucket) + * * @param key * The key. * @@ -577,12 +536,155 @@ final int h = hash(key); final int addr = addrOf(h); final SimpleBucket b = getBucket(addr); - b.insert(h,key); + if (b.insert(h, key)) { + return; + } + // split the bucket and insert the record (recursive?) + split(key, b); } /** + * Split the bucket, adjusting the address map iff necessary. How this + * proceeds depends on whether the hash #of bits used in the bucket is + * equal to the #of bits used to index into the bucket address table. + * There are two cases: + * <p> + * Case 1: If {@link #globalHashBits} EQ the + * {@link SimpleBucket#localHashBits}, then the bucket address table is + * out of space and needs to be resized. + * <p> + * Case 2: If {@link #globalHashBits} is GT + * {@link SimpleBucket#localHashBits}, then there will be at least two + * entries in the bucket address table which point to the same bucket. + * One of those entries is relabeled. The record is then inserted based + * on the new #of hash bits to be considered. If it still does not fit, + * then either handle by case (1) or case (2) as appropriate. + * <p> + * Note that records which are in themselves larger than the bucket size + * must eventually be handled by: (A) using an overflow record; (B) + * allowing the bucket to become larger than the target page size (using + * a larger allocation slot or becoming a blob); or (C) recording the + * tuple as a raw record and maintaining only the full hash code of the + * tuple and its raw record address in the bucket (this would allow us + * to automatically promote long literals out of the hash bucket and a + * similar approach might be used for a B+Tree leaf, except that a long + * key will still cause a problem [also, this implies that deleting a + * bucket or leaf on the unisolated index of the RWStore might require a + * scan of the IRaba to identify blob references which must also be + * deleted, so it makes sense to track those as part of the bucket/leaf + * metadata). + * + * @param h + * The key which triggered the split. + * @param b + * The bucket lacking sufficient room for the key which + * triggered the split. + * + * @todo caller will need an exclusive lock if this is to be thread + * safe. + * + * @todo Overflow buckets (or oversize buckets) are required when all + * hash bits considered by the local bucket are the same, when all + * keys in the local bucket are the same, and when the record to + * be inserted is larger than the bucket. In order to handle these + * cases we may need to more closely integrate the insert/split + * logic since detecting some of these cases requires transparency + * into the bucket. + */ + private void split(final int key, final SimpleBucket b) { + if (globalHashBits < b.localHashBits) { + // This condition should never arise. + throw new AssertionError(); + } + if (globalHashBits == b.localHashBits) { + /* + * The address table is out of space and needs to be resized. + * + * Let {@link #globalHashBits} := {@link #globalHashBits} + 1. + * This doubles the size of the bucket address table. Each + * original entry becomes two entries in the new table. For the + * specific bucket which is to be split, a new bucket is + * allocated and the 2nd bucket address table for that entry is + * set to the address of the new bucket. The tuples are then + * assigned to the original bucket and the new bucket by + * considering the additional bit of the hash code. Assuming + * that all keys are distinct, then one split will always be + * sufficient unless all tuples in the original bucket have the + * same hash code when their (i+1)th bit is considered (this can + * also occur if duplicate keys are allow). In this case, we + * resort to an "overflow" bucket (alternatively, the bucket is + * allowed to be larger than the target size and gets treated as + * a blob). + */ +// doubleAddressSpace(); + /* + * Create a new bucket and wire it into the 2nd entry for the + * hash code for that key. + */ +// final int h = hash(key); +// final int addr1 = addrOf(h); +// final int addr2 = addr + 1; +// final SimpleBucket b1 = getBucket(addr); +// if (b1.insert(h, key)) { +// return; +// } + throw new UnsupportedOperationException(); + } + if (globalHashBits > b.localHashBits) { + /* + * There will be at least two entries in the address table which + * point to this bucket. One of those entries is relabeled. Both + * the original bucket and the new bucket have their {@link + * SimpleBucket#localHashBits} incremented by one, but the + * {@link #globalHashBits}. Of the entries in the bucket address + * table which used to point to the original bucket, the 1st + * half are left alone and the 2nd half are updated to point to + * the new bucket. (Note that the #of entries depends on the + * global #of hash bits in use and the bucket local #of hash + * bits in use and will be 2 if there is a difference of one + * between those values but can be more than 2 and will always + * be an even number). The entries in the original bucket are + * rehashed and assigned based on the new #of hash bits to be + * considered to either the original bucket or the new bucket. + * The record is then inserted based on the new #of hash bits to + * be considered. If it still does not fit, then either handle + * by case (1) or case (2) as appropriate. + */ + throw new UnsupportedOperationException(); + } + } + + /** + * Doubles the address space. + * + * FIXME Review the exact rule for doubling the address space. + */ + private void doubleAddressSpace() { + globalHashBits += 1; + final int[] tmp = addressMap; + addressMap = new int[tmp.length << 1]; + for (int i = 0, j = 0; i < tmp.length; i++) { + addressMap[j++] = tmp[i]; + addressMap[j++] = tmp[i]; + } + } + + private void merge(final int h, final SimpleBucket b) { + throw new UnsupportedOperationException(); + } + + /** * Delete the key from the hash table (in the case of duplicates, a * random entry having that key is deleted). + * <p> + * Delete: Buckets may be removed no later than when they become empty + * and doing this is a local operation with costs similar to splitting a + * bucket. Likewise, it is clearly possible to coalesce buckets which + * underflow before they become empty by scanning the 2^(i-j) buckets + * indexed from the entries in the bucket address table using i bits + * from h(K). [I need to research handling deletes a little more, + * including under what conditions it is cost effective to reduce the + * size of the bucket address table itself.] * * @param key * The key. @@ -590,6 +692,10 @@ * @return <code>true</code> iff a tuple having that key was deleted. * * @todo return the deleted tuple. + * + * @todo merge buckets when they underflow/become empty? (but note that + * we do not delete anything from the hash map for a hash join, + * just insert, insert, insert). */ public boolean delete(final int key) { final int h = hash(key); @@ -676,8 +782,10 @@ * The hash code of the key. * @param key * The key. + * + * @return <code>false</code> iff the bucket must be split. */ - public void insert(final int h, final int key) { + public boolean insert(final int h, final int key) { if (size == data.length) { /* @@ -693,11 +801,13 @@ * manage the split. If the bucket handles splits, then we need * to pass in the table reference. */ - throw new UnsupportedOperationException(); + return false; } data[size++] = key; + return true; + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |