This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2010-11-22 21:12:29
|
Revision: 3976 http://bigdata.svn.sourceforge.net/bigdata/?rev=3976&view=rev Author: thompsonbry Date: 2010-11-22 21:12:22 +0000 (Mon, 22 Nov 2010) Log Message: ----------- Added a skeleton for an extensible hash map and unit tests. This gets as far as needing to split a bucket. The implementation in the test class uses int32 keys and exists just to gain familiarity with extensible hashing and prove out the control logic. Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/htbl/TestExtensibleHashing.java 2010-11-22 21:12:22 UTC (rev 3976) @@ -0,0 +1,877 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Nov 22, 2010 + */ +package com.bigdata.htbl; + +import java.util.ArrayList; +import java.util.Iterator; + +import junit.framework.TestCase2; + +/** + * Test suite for extensible hashing. + * + * <br> + * (***) Persistence capable hash table for high volume hash joins. + * + * The data will be "rows" in a "relation" modeled using binding sets. We can + * use dense encoding of these rows since they have a fixed schema (some columns + * may allow nulls). There should also be a relationship to how we encode these + * data for network IO. + * + * https://sourceforge.net/apps/trac/bigdata/ticket/203 + * + * + * Extendable hash table: + * + * - hash(byte[] key) -> IRaba page. Use IRaba for keys/values and key search. + * + * - Split if overflows the bucket size (alternative is some versioning where + * the computed hash value indexes into a logical address which is then + * translated to an IRawStore address - does the RWStore help us out here?) + * + * - ring buffer to wire in hot nodes (but expect random touches). + * + * - initially, no history (no versioning). just replace the record when it is + * evicted from the ring buffer. + * + * What follows is a summary of an extendable hash map design for bigdata. This + * covers most aspects of the hash map design, but does not drill deeply into + * the question of scale-out hash maps. The immediate goal is to develop a hash + * map which can be used for a variety of tasks, primarily pertaining to + * analytic query as described above. + * + * Extendable hashing is one form of dynamic hashing in which buckets are split + * or coalesced as necessary and in which the reorganization is performed on one + * bucket at a time. + * + * Given a hash function h generating, e.g., int32 values where b is the #of + * bits in the hash code. At any point, we use 0 LTE i LTE b bits of that hash + * code as an index into a table of bucket addresses. The value of i will change + * as the #of buckets changes based on the scale of the data to be addressed. + * + * Given a key K, the bucket address table is indexed with i bits of the hash + * code, h(K). The value at that index is the address of the hash bucket. + * However, several consecutive entries in the hash table may point to the same + * hash bucket (for example, the hash index may be created with i=4, which would + * give 16 index values but only one initial bucket). The bucket address table + * entries which map onto the same hash bucket will have a common bit length, + * which may be LTE [i]. This bit length is not stored in the bucket address + * table, but each bucket knows its bit length. Given a global bit length of [i] + * and a bucket bit length of [j], there will be 2^(i-j) bucket address table + * entries which point to the same bucket. + * + * Lookup: Compute h(K) and right shift (w/o sign extension) by i bits. Use this + * to index into the bucket address table. The address in the table is the + * bucket address and may be used to directly read the bucket. + * + * Insert: Per lookup. On overflow, we need to split the bucket moving the + * existing records (and the new record) into new buckets. How this proceeds + * depends on whether the hash #of bits used in the bucket is equal to the #of + * bits used to index into the bucket address table. There are two cases: + * + * Split case 1: If i (global bits of the hash which are in use) == j (bucket + * bits of the hash which are in use), then the bucket address table is out of + * space and needs to be resized. Let i := i+1. This doubles the size of the + * bucket address table. Each original entry becomes two entries in the new + * table. For the specific bucket which is to be split, a new bucket is + * allocated and the 2nd bucket address table for that entry is set to the + * address of the new bucket. The tuples are then assigned to the original + * bucket and the new bucket by considering the additional bit of the hash code. + * Assuming that all keys are distinct, then one split will always be sufficient + * unless all tuples in the original bucket have the same hash code when their + * i+1 th bit is considered. In this case, we resort to an "overflow" bucket + * (alternatively, the bucket is allowed to be larger than the target size and + * gets treated as a blob). + * + * Split case 2: If i is GT j, then there will be at least two entries in the + * bucket address table which point to the same bucket. One of those entries is + * relabeled. Both the original bucket and the new bucket have their #of bits + * incremented by one, but the #of global bits in use does not change. Of the + * entries in the bucket address table which used to point to the original + * bucket, the 1st half are left alone and the 2nd half are updated to point to + * the new bucket. (Note that the #of entries depends on the global #of hash + * bits in use and the bucket local #of hash bits in use and will be 2 if there + * is a difference of one between those values but can be more than 2 and will + * always be an even number). The entries in the original bucket are rehashed + * and assigned based on the new #of hash bits to be considered to either the + * original bucket or the new bucket. The record is then inserted based on the + * new #of hash bits to be considered. If it still does not fit, then either + * handle by case (1) or case (2) as appropriate. + * + * Note that records which are in themselves larger than the bucket size must + * eventually be handled by: (A) using an overflow record; (B) allowing the + * bucket to become larger than the target page size (using a larger allocation + * slot or becoming a blob); or (C) recording the tuple as a raw record and + * maintaining only the full hash code of the tuple and its raw record address + * in the bucket (this would allow us to automatically promote long literals out + * of the hash bucket and a similar approach might be used for a B+Tree leaf, + * except that a long key will still cause a problem [also, this implies that + * deleting a bucket or leaf on the unisolated index of the RWStore might + * require a scan of the IRaba to identify blob references which must also be + * deleted, so it makes sense to track those as part of the bucket/leaf + * metadata). + * + * Delete: Buckets may be removed no later than when they become empty and doing + * this is a local operation with costs similar to splitting a bucket. Likewise, + * it is clearly possible to coalesce buckets which underflow before they become + * empty by scanning the 2^(i-j) buckets indexed from the entries in the bucket + * address table using i bits from h(K). [I need to research handling deletes a + * little more, including under what conditions it is cost effective to reduce + * the size of the bucket address table itself.] + * + * Hash table versioning can be easily implemented by: (a) a checkpoint record + * with the address of the bucket address table (which could be broken into a + * two level table comprised of 4k pages in order to make small updates faster); + * and (b) a store level policy such that we do not overwrite the modified + * records directly (though they may be recycled). This will give us the same + * consistent read behind behavior as the B+Tree. + * + * The IIndex interface will need to be partitioned appropriately such that the + * IRangeScan interface is not part of the hash table indices (an isBTree() and + * isHashMap() method might be added). + * + * While the same read-through views for shards should work with hash maps as + * work with B+Tree indices, a different scheme may be necessary to locate those + * shards and we might need to use int64 hash codes in scale-out or increase the + * page size (at least for the read-only hash segment files, which would also + * need a batch build operation). The AccessPath will also need to be updated to + * be aware of classes which do not support key-range scans, but only whole + * relation scans. + * + * Locking on hash tables without versioning should be much simpler than locking + * on B+Trees since there is no hierarchy and more operations can proceed + * without blocking in parallel. + * + * We can represent tuples (key,value pairs) in an IRaba data structure and + * reuse parts of the B+Tree infrastructure relating to compression of IRaba, + * key search, etc. In fact, we might use to lazy reordering notion from Monet + * DB cracking to only sort the keys in a bucket when it is persisted. This is + * also a good opportunity to tackling splitting the bucket if it overflows the + * target record size, e.g., 4k. We could throw out an exception if the sorted, + * serialized, and optionally compressed record exceeds the target record size + * and then split the bucket. All of this seems reasonable and we might be able + * to then back port those concepts into the B+Tree. + * + * We need to estimate the #of tuples which will fit within the bucket. We can + * do this based on: (a) the byte length of the keys and values (key compression + * is not going to help out much for a hash index since the keys will be evenly + * distributed even if they are ordered within a bucket); (b) the known per + * tuple overhead and per bucket overhead; (c) an estimate of the compression + * ratio for raba encoding and record compression. This estimate could be used + * to proactively split a bucket before it is evicted. This is most critical + * before anything is evicted as we would otherwise have a single very large + * bucket. So, let's make this simple and split the bucket if the sum of the key + * + val bytes exceeds 120% of the target record size (4k, 8k, etc). The target + * page size can be a property of the hash index. [Note: There is an implicit + * limit on the size of a tuple with this approach. The alternative is to fix + * the #of tuples in the bucket and allow buckets to be of whatever size they + * are for the specific data in that bucket.] + * + * - RWStore with "temporary" quality. Creates the backing file lazily on + * eviction from the write service. + * + * - RWStore with "RAM" only? (Can not exceed the #of allocated buffers or can, + * but then it might force paging out to swap?) + * + * - RWStore with "RAM" mostly. Converts to disk backed if uses all those + * buffers. Possibly just give the WriteCacheService a bunch of write cache + * buffers (10-100) and have it evict to disk *lazily* rather than eagerly (when + * the #of free buffers is down to 20%). + * + * - RWStore with memory mapped file? As I recall, the problem is that we can + * not guarantee extension or close of the file under Java. But some people seem + * to make this work... + */ +public class TestExtensibleHashing extends TestCase2 { + + public TestExtensibleHashing() { + } + + public TestExtensibleHashing(String name) { + super(name); + } + + /** + * Find the first power of two which is GTE the given value. This is used to + * compute the size of the address space (in bits) which is required to + * address a hash table with that many buckets. + */ + private static int getMapSize(final int initialCapacity) { + + if (initialCapacity <= 0) + throw new IllegalArgumentException(); + + int i = 1; + + while ((1 << i) < initialCapacity) + i++; + + return i; + + } + + /** + * Unit test for {@link #getMapSize(int)}. + */ + public void test_getMapSize() { + + assertEquals(1/* addressSpaceSize */, getMapSize(1)/* initialCapacity */); + assertEquals(1/* addressSpaceSize */, getMapSize(2)/* initialCapacity */); + assertEquals(2/* addressSpaceSize */, getMapSize(3)/* initialCapacity */); + assertEquals(2/* addressSpaceSize */, getMapSize(4)/* initialCapacity */); + assertEquals(3/* addressSpaceSize */, getMapSize(5)/* initialCapacity */); + assertEquals(3/* addressSpaceSize */, getMapSize(6)/* initialCapacity */); + assertEquals(3/* addressSpaceSize */, getMapSize(7)/* initialCapacity */); + assertEquals(3/* addressSpaceSize */, getMapSize(8)/* initialCapacity */); + assertEquals(4/* addressSpaceSize */, getMapSize(9)/* initialCapacity */); + + assertEquals(5/* addressSpaceSize */, getMapSize(32)/* initialCapacity */); + + assertEquals(10/* addressSpaceSize */, getMapSize(1024)/* initialCapacity */); + + } + + /** + * Return a bit mask which reveals only the low N bits of an int32 value. + * + * @param nbits + * The #of bits to be revealed. + * @return The mask. + */ + private static int getMaskBits(final int nbits) { + + if (nbits < 0 || nbits > 32) + throw new IllegalArgumentException(); + +// int mask = 1; // mask +// int pof2 = 1; // power of two. +// while (pof2 < nbits) { +// pof2 = pof2 << 1; +// mask |= pof2; +// } + + int mask = 0; + int bit; + + for (int i = 0; i < nbits; i++) { + + bit = (1 << i); + + mask |= bit; + + } + +// System.err.println(nbits +" : "+Integer.toBinaryString(mask)); + + return mask; + + } + + /** + * Unit test for {@link #getMaskBits(int)} + */ + public void test_getMaskBits() { + + assertEquals(0x00000001, getMaskBits(1)); + assertEquals(0x00000003, getMaskBits(2)); + assertEquals(0x00000007, getMaskBits(3)); + assertEquals(0x0000000f, getMaskBits(4)); + assertEquals(0x0000001f, getMaskBits(5)); + assertEquals(0x0000003f, getMaskBits(6)); + assertEquals(0x0000007f, getMaskBits(7)); + assertEquals(0x000000ff, getMaskBits(8)); + + assertEquals(0x0000ffff, getMaskBits(16)); + + assertEquals(0xffffffff, getMaskBits(32)); + + } + +// private static int[] getMaskArray() { +// +// } + + /** + * Extensible hashing data structure. + * + * @todo allow duplicate tuples - caller can enforce distinct if they like. + * + * @todo automatically promote large tuples into raw record references, + * leaving the hash code of the key and the address of the raw record + * in the hash bucket. + * + * @todo initially manage the address table in an int[]. + * + * @todo use 4k buckets. split buckets when the sum of the data is GT 4k + * (reserve space for a 4byte checksum). use a compact record + * organization. if a tuple is deleted, bit flag it (but immediately + * delete the raw record if one is associated with the tuple). before + * splitting, compact the bucket to remove any deleted tuples. + * + * @todo the tuple / raw record promotion logic should be shared with the + * B+Tree. The only catch is that large B+Tree keys will always remain + * a stress factor. For example, TERM2ID will have large B+Tree keys + * if TERM is large and promoting to a blob will not help. In that + * case, we actually need to hash the TERM and store the hash as the + * key (or index only the first N bytes of the term). + */ + public static class ExtensibleHashBag { + + } + + /** + * An implementation of an extensible hash map using a 32 bit hash code and + * a fixed length int[] for the bucket. The keys are int32 values. The data + * stored in the hash map is just the key. Buckets provide a perfect fit for + * N keys. This is used to explore the dynamics of the extensible hashing + * algorithm using some well known examples. + * <p> + * This implementation is not thread-safe. I have not attempted to provide + * for visibility guarantees when resizing the map and I have not attempted + * to provide for concurrent updates. The implementation exists solely to + * explore the extensible hashing algorithm. + * <p> + * The hash code + */ + private static class SimpleExtensibleHashMap { + + /** + * The #of int32 positions which are available in a {@link SimpleBucket} + * . + */ + private final int bucketSize; + + /** + * The #of hash code bits which are in use by the {@link #addressMap}. + * Each hash bucket also as a local #of hash bits. Given <code>i</code> + * is the #of global hash bits and <code>j</code> is the number of hash + * bits in some bucket, there will be <code>2^(i-j)</code> addresses + * which point to the same bucket. + */ + private int globalHashBits; + + /** + * The size of the address space (#of buckets addressable given the #of + * {@link #globalHashBits} in use). + */ + private int addressSpaceSize; + + /** + * The address map. You index into this map using + * {@link #globalHashBits} out of the hash code for a probe key. The + * value of the map is the index into the {@link #buckets} array of the + * bucket to which that key is hashed. + */ + private int[] addressMap; + + /** + * The buckets. The first bucket is pre-allocated when the address table + * is setup and all addresses in the table are initialized to point to + * that bucket. Thereafter, buckets are allocated when a bucket is + * split. + */ + private final ArrayList<SimpleBucket> buckets; + + /** + * An array of mask values. The index in the array is the #of bits of + * the hash code to be considered. The value at that index in the array + * is the mask to be applied to mask off to zero the high bits of the + * hash code which are to be ignored. + */ + private final int[] masks; + + /** + * The current mask for the current {@link #globalHashBits}. + */ + private int globalMask; + + /** + * + * @param initialCapacity + * The initial capacity is the #of buckets which may be + * stored in the hash table before it must be resized. It is + * expressed in buckets and not tuples because there is not + * (in general) a fixed relationship between the size of a + * bucket and the #of tuples which can be stored in that + * bucket. This will be rounded up to the nearest power of + * two. + * @param bucketSize + * The #of int tuples which may be stored in a bucket. + */ + public SimpleExtensibleHashMap(final int initialCapacity, final int bucketSize) { + + if (initialCapacity <= 0) + throw new IllegalArgumentException(); + + if (bucketSize <= 0) + throw new IllegalArgumentException(); + + this.bucketSize = bucketSize; + + /* + * Setup the hash table given the initialCapacity (in buckets). We + * need to find the first power of two which is GTE the + * initialCapacity. + */ + globalHashBits = getMapSize(initialCapacity); + + if (globalHashBits > 32) { + /* + * The map is restricted to 32-bit hash codes so we can not + * address this many buckets. + */ + throw new IllegalArgumentException(); + } + + // Populate the array of masking values. + masks = new int[32]; + + for (int i = 0; i < 32; i++) { + + masks[i] = getMaskBits(i); + + } + + // save the current masking value for the current #of global bits. + globalMask = masks[globalHashBits]; + + /* + * Now work backwards to determine the size of the address space (in + * buckets). + */ + addressSpaceSize = 1 << globalHashBits; + + /* + * Allocate and initialize the address space. All indices are + * initially mapped onto the same bucket. + */ + addressMap = new int[addressSpaceSize]; + + buckets = new ArrayList<SimpleBucket>(addressSpaceSize/* initialCapacity */); + + buckets.add(new SimpleBucket(1/* localHashBits */, bucketSize)); + + } + +// private void toString(StringBuilder sb) { +// sb.append("addressMap:"+Arrays.toString(addressMap)); +// } + + /** The hash of an int key is that int. */ + private int hash(final int key) { + return key; + } + + /** The bucket address given the hash code of a key. */ + private int addrOf(final int h) { + + final int maskedOffIndex = h & globalMask; + + return addressMap[maskedOffIndex]; + + } + + /** + * Return the pre-allocated bucket having the given address. + * + * @param addr + * The address. + * + * @return The bucket. + */ + private SimpleBucket getBucket(final int addr) { + + return buckets.get(addr); + + } + + /** + * The #of hash bits which are being used by the address table. + */ + public int getGlobalHashBits() { + + return globalHashBits; + + } + + /** + * The size of the address space (the #of positions in the address + * table, which is NOT of necessity the same as the #of distinct buckets + * since many address positions can point to the same bucket). + */ + public int getAddressSpaceSize() { + + return addressSpaceSize; + + } + + /** + * The #of buckets backing the map. + */ + public int getBucketCount() { + + return buckets.size(); + + } + + /** + * The size of a bucket (the #of int32 values which may be stored + * in a bucket). + */ + public int getBucketSize() { + + return bucketSize; + + } + + /** + * Return <code>true</code> iff the hash table contains the key. + * + * @param key + * The key. + * + * @return <code>true</code> iff the key was found. + */ + public boolean contains(final int key) { + final int h = hash(key); + final int addr = addrOf(h); + final SimpleBucket b = getBucket(addr); + return b.contains(h,key); + } + + /** + * Insert the key into the hash table. Duplicates are allowed. + * + * @param key + * The key. + * + * @todo define a put() method which returns the old value (no + * duplicates). this could be just sugar over contains(), delete() + * and insert(). + */ + public void insert(final int key) { + final int h = hash(key); + final int addr = addrOf(h); + final SimpleBucket b = getBucket(addr); + b.insert(h,key); + } + + /** + * Delete the key from the hash table (in the case of duplicates, a + * random entry having that key is deleted). + * + * @param key + * The key. + * + * @return <code>true</code> iff a tuple having that key was deleted. + * + * @todo return the deleted tuple. + */ + public boolean delete(final int key) { + final int h = hash(key); + final int addr = addrOf(h); + final SimpleBucket b = getBucket(addr); + return b.delete(h,key); + } + + /** + * Visit the buckets. + * <p> + * Note: This is NOT thread-safe! + */ + public Iterator<SimpleBucket> buckets() { + return buckets.iterator(); + } + + } + + /** + * A (very) simple hash bucket. The bucket holds N int32 keys. + */ + private static class SimpleBucket { + + /** The #of hash code bits which are in use by this {@link SimpleBucket}. */ + int localHashBits; + + /** + * The #of keys stored in the bucket. The keys are stored in a dense + * array. For a given {@link #size}, the only indices of the array which + * have any data are [0:{@link #size}-1]. + */ + int size; + + /** + * The user data for the bucket. + */ + final int[] data; + + public SimpleBucket(final int localHashBits,final int bucketSize) { + + if (localHashBits <= 0 || localHashBits > 32) + throw new IllegalArgumentException(); + + this.localHashBits = localHashBits; + + this.data = new int[bucketSize]; + + } + + /** + * Return <code>true</code> if the bucket contains the key. + * + * @param h + * The hash code of the key. + * @param key + * The key. + * + * @return <code>true</code> if the key was found in the bucket. + * + * @todo passing in the hash code here makes sense when the bucket + * stores the hash values, e.g., if we always do that or if we + * have an out of bucket reference to a raw record because the + * tuple did not fit in the bucket. + */ + public boolean contains(final int h, final int key) { + + for (int i = 0; i < size; i++) { + + if (data[i] == key) + return true; + + } + + return false; + + } + + /** + * Insert the key into the bucket (duplicates are allowed). It is an + * error if the bucket is full. + * + * @param h + * The hash code of the key. + * @param key + * The key. + */ + public void insert(final int h, final int key) { + + if (size == data.length) { + /* + * The bucket must be split, potentially recursively. + * + * Note: Splits need to be triggered based on information which + * is only available to the bucket when it considers the insert + * of a specific tuple, including whether the tuple is promoted + * to a raw record reference, whether the bucket has deleted + * tuples which can be compacted, etc. + * + * @todo I need to figure out where the control logic goes to + * manage the split. If the bucket handles splits, then we need + * to pass in the table reference. + */ + throw new UnsupportedOperationException(); + } + + data[size++] = key; + + } + + /** + * Delete a tuple having the specified key. If there is more than one + * such tuple, then a random tuple having the key is deleted. + * + * @param h + * The hash code of the key. + * @param key + * The key. + * + * @todo return the delete tuple. + */ + public boolean delete(final int h, final int key) { + + for (int i = 0; i < size; i++) { + + if (data[i] == key) { + + // #of tuples remaining beyond this point. + final int length = size - i - 1; + + if (length > 0) { + + // Keep the array dense by copying down by one. + System.arraycopy(data, i + 1/* srcPos */, data/* dest */, + i/* destPos */, length); + + } + + size--; + + return true; + + } + + } + + return false; + + } + + } + + /** + * Map constructor tests. + */ + public void test_ctor() { + + final SimpleExtensibleHashMap map = new SimpleExtensibleHashMap( + 1/* initialCapacity */, 3/* bucketSize */); + + assertEquals("globalHashBits", 1, map.getGlobalHashBits()); + + assertEquals("addressSpaceSize", 2, map.getAddressSpaceSize()); + + assertEquals("bucketCount", 1, map.getBucketCount()); + + assertEquals("bucketSize", 3, map.getBucketSize()); + + } + + /** + * Simple CRUD test operating against the initial bucket without triggering + * any splits. + */ + public void test_crud1() { + + final SimpleExtensibleHashMap map = new SimpleExtensibleHashMap( + 1/* initialCapacity */, 3/* bucketSize */); + + // a bunch of things which are not in the map. + for (int i : new int[] { 0, 1, -4, 31, -93, 912 }) { + + assertFalse(map.contains(i)); + + } + + /* + * Insert a record, then delete it, verifying that contains() reports + * true or false as appropriate for the pre-/post- conditions. + */ + + assertFalse(map.contains(83)); + + map.insert(83); + + assertTrue(map.contains(83)); + + map.delete(83); + + assertFalse(map.contains(83)); + + } + + /** + * CRUD test which inserts some duplicate tuples, but not enough to split + * the initial bucket, and the deletes them out again. + */ + public void test_crud2() { + + final SimpleExtensibleHashMap map = new SimpleExtensibleHashMap( + 1/* initialCapacity */, 3/* bucketSize */); + + assertEquals("bucketCount", 1, map.getBucketCount()); + + assertFalse(map.contains(83)); + + // insert once. + map.insert(83); + + assertTrue(map.contains(83)); + + // insert again. + map.insert(83); + + assertTrue(map.contains(83)); + + // did not split the bucket. + assertEquals("bucketCount", 1, map.getBucketCount()); + + // delete once. + map.delete(83); + + // still found. + assertTrue(map.contains(83)); + + // delete again. + map.delete(83); + + // now gone. + assertFalse(map.contains(83)); + + } + + /** + * Test repeated insert of a key until the bucket splits. + */ + public void test_split() { + + final int bucketSize = 3; + + final SimpleExtensibleHashMap map = new SimpleExtensibleHashMap( + 1/* initialCapacity */, bucketSize); + + assertEquals("bucketCount", 1, map.getBucketCount()); + + map.insert(83); + map.insert(83); + map.insert(83); + + // still not split. + assertEquals("bucketCount", 1, map.getBucketCount()); + + // force a split. + map.insert(83); + + assertEquals("bucketCount", 2, map.getBucketCount()); + + } + + /** + * Unit test with the following configuration and insert / event sequence: + * <ul> + * <li>bucket size := 4k</li> + * <li></li> + * <li></li> + * <li></li> + * </ul> + * <pre> + * </pre> + */ + public void test_simple() { + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-22 19:22:11
|
Revision: 3973 http://bigdata.svn.sourceforge.net/bigdata/?rev=3973&view=rev Author: thompsonbry Date: 2010-11-22 19:22:05 +0000 (Mon, 22 Nov 2010) Log Message: ----------- Removed an informational log message used to track down large queue/chunk capacities. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-11-22 19:16:23 UTC (rev 3972) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2010-11-22 19:22:05 UTC (rev 3973) @@ -383,10 +383,10 @@ final int minimumChunkSize, final long chunkTimeout, final TimeUnit chunkTimeoutUnit, final boolean ordered) { - if (minimumChunkSize >= 1000 || queue.remainingCapacity() >= 1000) - log.fatal(new RuntimeException("queueCapacity=" - + queue.remainingCapacity() + ", minimumChunkSize=" - + minimumChunkSize)); +// if (minimumChunkSize >= 1000 || queue.remainingCapacity() >= 1000) +// log.fatal(new RuntimeException("queueCapacity=" +// + queue.remainingCapacity() + ", minimumChunkSize=" +// + minimumChunkSize)); if (queue == null) throw new IllegalArgumentException(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2010-11-22 19:16:30
|
Revision: 3972 http://bigdata.svn.sourceforge.net/bigdata/?rev=3972&view=rev Author: mrpersonick Date: 2010-11-22 19:16:23 +0000 (Mon, 22 Nov 2010) Log Message: ----------- javadoc Modified Paths: -------------- trunk/bigdata-sails/src/test/com/bigdata/rdf/sail/TestSids.java Modified: trunk/bigdata-sails/src/test/com/bigdata/rdf/sail/TestSids.java =================================================================== --- trunk/bigdata-sails/src/test/com/bigdata/rdf/sail/TestSids.java 2010-11-22 17:42:16 UTC (rev 3971) +++ trunk/bigdata-sails/src/test/com/bigdata/rdf/sail/TestSids.java 2010-11-22 19:16:23 UTC (rev 3972) @@ -44,6 +44,8 @@ import com.bigdata.rdf.vocab.NoVocabulary; /** + * Test case for reverse lookup from SID to statement. + * * @author <a href="mailto:mrp...@us...">Mike Personick</a> * @version $Id$ */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-11-22 17:42:24
|
Revision: 3971 http://bigdata.svn.sourceforge.net/bigdata/?rev=3971&view=rev Author: btmurphy Date: 2010-11-22 17:42:16 +0000 (Mon, 22 Nov 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - fixes and additions to QuorumPeerService ServiceImpl class and related config to support starting and stopping multiple instances, added new QuorumPeerServiceTest to verify the fixes and additions Modified Paths: -------------- branches/dev-btm/bigdata/src/java/com/bigdata/counters/AbstractCounterSet.java branches/dev-btm/bigdata/src/resources/logging/log4j.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/Constants.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/config/quorum.config branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/default-deploy.properties branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/deploy.properties branches/dev-btm/build.xml Added Paths: ----------- branches/dev-btm/bigdata-jini/lib/apache/zookeeper-3.3.2.jar branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/ branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java Modified: branches/dev-btm/bigdata/src/java/com/bigdata/counters/AbstractCounterSet.java =================================================================== --- branches/dev-btm/bigdata/src/java/com/bigdata/counters/AbstractCounterSet.java 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata/src/java/com/bigdata/counters/AbstractCounterSet.java 2010-11-22 17:42:16 UTC (rev 3971) @@ -217,6 +217,8 @@ strBuf.append( new String(new byte[] {pathBytes[i]}) ); } } + +//BTM - workaround for http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6935535 logger.warn("***** AbstractCounterSet.getPath: CONTAINS SLASH-SLASH: path CONVERTED = "+strBuf.toString()); } int slashSlashIndex = path.indexOf("//"); Modified: branches/dev-btm/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/dev-btm/bigdata/src/resources/logging/log4j.properties 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata/src/resources/logging/log4j.properties 2010-11-22 17:42:16 UTC (rev 3971) @@ -211,8 +211,13 @@ #log4j.logger.com.bigdata.boot.launcher.ConfigReaderUnitTest=DEBUG #log4j.logger.com.bigdata.process.ProcessConfigXmlHandlerTest=DEBUG -log4j.logger.com.bigdata.loadbalancer=DEBUG -log4j.logger.com.bigdata.transaction=DEBUG -log4j.logger.com.bigdata.metadata=DEBUG -log4j.logger.com.bigdata.shard=DEBUG -log4j.logger.com.bigdata.executor=DEBUG +# For the quorum server tests +log4j.logger.com.bigdata.quorum.QuorumPeerServiceTest=INFO + +# For smart proxy implementations of the services +#log4j.logger.com.bigdata.loadbalancer=DEBUG +#log4j.logger.com.bigdata.transaction=DEBUG +#log4j.logger.com.bigdata.metadata=DEBUG +#log4j.logger.com.bigdata.shard=DEBUG +#log4j.logger.com.bigdata.executor=DEBUG +#log4j.logger.com.bigdata.quorum=DEBUG Added: branches/dev-btm/bigdata-jini/lib/apache/zookeeper-3.3.2.jar =================================================================== (Binary files differ) Property changes on: branches/dev-btm/bigdata-jini/lib/apache/zookeeper-3.3.2.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/Constants.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/Constants.java 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/Constants.java 2010-11-22 17:42:16 UTC (rev 3971) @@ -38,4 +38,9 @@ /* Name of this component; used in config entry retrieval and the logger.*/ String COMPONENT_NAME = ((Constants.class).getPackage()).getName(); String F_SEP = System.getProperty("file.separator"); + + // Time (in seconds) to wait for discovery of other peers on 1st start up + long LOWER_BOUND_PEER_DISCOVERY_PERIOD = 1L; + long UPPER_BOUND_PEER_DISCOVERY_PERIOD = Long.MAX_VALUE; + long DEFAULT_PEER_DISCOVERY_PERIOD = 5L*60L; } Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-11-22 17:42:16 UTC (rev 3971) @@ -31,6 +31,7 @@ import com.bigdata.attr.ServiceInfo; import com.bigdata.service.QuorumPeerService; import com.bigdata.service.QuorumPeerService.QuorumPeerData; +import com.bigdata.util.EntryUtil; import com.bigdata.util.Util; import com.bigdata.util.config.ConfigDeployUtil; import com.bigdata.util.config.LogUtil; @@ -332,12 +333,23 @@ joinMgr = new JoinManager(outerProxy, serviceAttrs, serviceId, ldm, null, config); - if((peerState.getNQuorumPeers() > 1L) && (peerState.getPeerId() == 0L)) - { - //discover all other peers to determine peer ids and server info - //so the entire quorum peer ensemble can be initialized & persisted - initQuorumPeerData(peerState, configStateInfo); + // If not standalone and if this is the very first time this + // service instance has started (that is, it has not previously + // persisted any state related to the other peers in the + // ensemble), then attempt to discover all other peers in the + // ensemble to determine peer ids and server info so the entire + // quorum peer ensemble can be initialized & persisted. + if((peerState.getNQuorumPeers() > 1L)&&(peerState.getPeerId() == 0L)) { + long waitPeriod = + Config.getLongEntry(config, + COMPONENT_NAME, + "peerDiscoveryPeriod", + DEFAULT_PEER_DISCOVERY_PERIOD, + LOWER_BOUND_PEER_DISCOVERY_PERIOD, + UPPER_BOUND_PEER_DISCOVERY_PERIOD); + initQuorumPeerData(peerState, configStateInfo, waitPeriod); + //update the peerId of the QuorumPeerAttr QuorumPeerAttr tmplVal = new QuorumPeerAttr(); QuorumPeerAttr changeVal = new QuorumPeerAttr(); @@ -354,6 +366,18 @@ myidOut.flush(); myidOut.close(); + // zookeeperJmxLog4j item is not part of the persisted state + Boolean defaultJmxLog4j = + ConfigDeployUtil.getBoolean("federation.zookeeperJmxLog4j"); + boolean zookeeperJmxLog4j = (Boolean)config.getEntry + (COMPONENT_NAME, + "zookeeperJmxLog4j", + Boolean.class, + defaultJmxLog4j); + if (!zookeeperJmxLog4j) { + System.setProperty("zookeeper.jmx.log4j.disable", "true"); + } + this.quorumPeerMainTaskExecutor = Executors.newFixedThreadPool(1); this.quorumPeerMainTask = new QuorumPeerMainTask(configStateInfo); this.quorumPeerMainTaskExecutor.execute(this.quorumPeerMainTask); @@ -374,7 +398,8 @@ } private boolean initQuorumPeerData(QuorumPeerState peerState, - ConfigStateInfo configStateInfo) + ConfigStateInfo configStateInfo, + long peerDiscoveryPeriod) throws IOException, ConfigurationException { int nPeersInEnsemble = peerState.getNQuorumPeers(); @@ -387,21 +412,26 @@ Class[] peerTmplTypes = new Class[] { QuorumPeerService.class }; QuorumPeerAttr peerAttr = new QuorumPeerAttr(); - //match on all ports - peerAttr.peerPort = peerState.getPeerPort(); - peerAttr.electionPort = peerState.getElectionPort(); - peerAttr.clientPort = peerState.getClientPort(); + +//BTM Ports can all be different (which helps when running multiple +//BTM servers on the same node), so no need to do exact matching +//BTM on the client, peer, and election ports. Thus, discover all +//BTM servers in the ensemble by federation group and service type, +//BTM and wildcard the port values. + +//BTM peerAttr.peerPort = peerState.getPeerPort(); +//BTM peerAttr.electionPort = peerState.getElectionPort(); +//BTM peerAttr.clientPort = peerState.getClientPort(); + Entry[] peerTmplAttrs = new Entry[] { peerAttr }; - ServiceTemplate peerTmpl = new ServiceTemplate(peerTmplId, peerTmplTypes, peerTmplAttrs); - - long nWait = 5L*60L*1000L;//wait 5 minutes, then give up ServiceItem[] peerItems = null; try { peerItems = sdm.lookup(peerTmpl, nPeersInEnsemble, - nPeersInEnsemble, null, nWait); + nPeersInEnsemble, null, + peerDiscoveryPeriod); if((peerItems == null) || (peerItems.length < nPeersInEnsemble)) { return false; } @@ -411,34 +441,63 @@ // Found all peers, including self. Set peerId based on serviceId: // "smallest" serviceId is set to 1, next "smallest" set to 2, etc. + // unless a discovered service already has a non-zero peerId field + // in its QuroumPeerAttr attribute. // - // Use TreeSet to order the proxyId's from lowest to highest + // Use TreeMap to order the proxyId's from lowest to highest // (the UUID elements provide a compareTo method for consistent // ordering). - Set<UUID> orderedProxyIdSet = new TreeSet<UUID>(); - for(int i=0; i<peerItems.length; i++) { - orderedProxyIdSet.add - (((QuorumPeerService)(peerItems[i].service)).getServiceUUID()); - } + // + // Also, while populating the ordered map, determine this service's + // own proxyId and peerId so they can be used later when constructing + // and persisting the map of QuorumPeerData. + UUID thisProxyId = peerState.getProxyId(); if(thisProxyId == null) { throw new NullPointerException("initQuorumPeerData: " +"null proxyId from peerState"); } + Long thisPeerId = 0L;//will replace this with non-zero below - // Determine this service's own peerId and create an ordered map - // that maps each service's proxyId to its corresponding peerId - // so that the QuorumPeerData map can be constructed and persisted. - long thisPeerId = 0L; + // Populate the map with the ordered proxy id keys and either + // a non-zero peer id (indicating the discovered service had a + // previously initialized - and persisted - peerId) or 0 to + // indicate that the discovered service has been started for + // the very first time + Map<UUID, Long> orderedPeerIdMap = new TreeMap<UUID, Long>(); - Iterator<UUID> proxyItr = orderedProxyIdSet.iterator(); - for(long peerId=1; proxyItr.hasNext(); peerId++) { - UUID nextProxyId = proxyItr.next(); - orderedPeerIdMap.put(nextProxyId, peerId); - if( thisProxyId.equals(nextProxyId) ) { - thisPeerId = peerId; + for(int i=0; i<peerItems.length; i++) { + UUID proxyId = + ((QuorumPeerService)(peerItems[i].service)).getServiceUUID(); + Entry[] attrs = peerItems[i].attributeSets; + QuorumPeerAttr quorumPeerAttr = + EntryUtil.getEntryByType(attrs, QuorumPeerAttr.class); + Long peerId = (quorumPeerAttr == null ? 0L:quorumPeerAttr.peerId); + orderedPeerIdMap.put(proxyId, peerId); + logger.debug("PUT peerId >>> ["+proxyId+", "+peerId+"]"); + } + + // Replace any 0-valued peer ids from above with a non-zero value + // in the correct order. + + Set<Map.Entry<UUID, Long>> orderedSet = orderedPeerIdMap.entrySet(); + Iterator<Map.Entry<UUID, Long>> itr = orderedSet.iterator(); + for(Long peerIdCntr=1L; itr.hasNext(); peerIdCntr++ ) { + Map.Entry<UUID, Long> pair = itr.next(); + UUID curProxyId = pair.getKey(); + Long curPeerId = pair.getValue(); + if (curPeerId == 0L) { + curPeerId = peerIdCntr; + orderedPeerIdMap.put(curProxyId, curPeerId);//replace + logger.debug("REPLACE peerId >>> " + +"["+curProxyId+", "+curPeerId+"]"); } + if( thisProxyId.equals(curProxyId) ) {//determine own peerId + thisPeerId = curPeerId; + } } + + //verify this service's peerId was indeed determined above if(thisPeerId == 0) return false; peerState.setPeerId(thisPeerId); @@ -462,6 +521,8 @@ ( new InetSocketAddress(peerAddress, peerPort) ); peerData.setElectionAddress ( new InetSocketAddress(peerAddress, electionPort) ); + + peerDataMap.put(peerId, peerData); } peerState.setPeerDataMap(peerDataMap); @@ -718,6 +779,7 @@ (new Integer(peerState.getMaxClientCnxns())).toString()); Map<Long, QuorumPeerData> peerDataMap = peerState.getPeerDataMap(); + logger.debug("peerDataMap.size() = "+peerDataMap.size()); for(QuorumPeerData peerData : peerDataMap.values()) { long peerId = peerData.getPeerId(); InetSocketAddress pAddr = peerData.getPeerAddress(); @@ -728,6 +790,7 @@ String serverKey = "server."+peerId; String serverVal = peerAddr.getHostAddress() +":"+peerPort+":"+electionPort; + logger.debug("serverKey="+serverKey+", serverVal="+serverVal); configProps.setProperty(serverKey, serverVal); } return configProps; @@ -885,10 +948,10 @@ //clientPort //for zookeeper 3.2.1 - this.peerState.setClientPort(zConfig.getClientPort()); +// this.peerState.setClientPort(zConfig.getClientPort()); //for zookeeper 3.3.0+ -// this.peerState.setClientPort -// (zConfig.getClientPortAddress().getPort()); + this.peerState.setClientPort + (zConfig.getClientPortAddress().getPort()); //dataDir this.peerState.setDataDir(zConfig.getDataDir()); @@ -977,6 +1040,7 @@ if( peerIdFound ) break; } } + this.peerState.setPeerDataMap(peerDataMap); this.peerState.setNQuorumPeers(peerDataMap.size()); } @@ -984,13 +1048,17 @@ } else {//retrieve from jini config logger.log(Level.DEBUG, "INITIAL START: " - +"[use jini config]"); + +"[use deployment config]"); //clientPort + Integer defaultClientPort = + ConfigDeployUtil.getInt + ("federation.zookeeperClientPort"); Integer zClientPort = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperClientPort", - Integer.class, 2181); + Integer.class, + defaultClientPort); if(zClientPort == null) { throw new ConfigurationException ("null zookeeperClientPort"); @@ -998,7 +1066,9 @@ this.peerState.setClientPort(zClientPort); //dataDir - String defaultDataDir = "data"; + String defaultDataDir = + ConfigDeployUtil.getString + ("federation.zookeeperDataDir"); String zDataDir = persistBaseStr + F_SEP + (String)config.getEntry(COMPONENT_NAME, "zookeeperDataDir", @@ -1011,7 +1081,9 @@ this.peerState.setDataDir(zDataDir); //dataLogDir - String defaultDataLogDir = "data.log"; + String defaultDataLogDir = + ConfigDeployUtil.getString + ("federation.zookeeperDataLogDir"); String zDataLogDir = persistBaseStr + F_SEP + (String)config.getEntry (COMPONENT_NAME, @@ -1025,10 +1097,14 @@ this.peerState.setDataLogDir(zDataLogDir); //tickTime + Integer defaultTickTime = + ConfigDeployUtil.getInt + ("federation.zookeepeTickTime"); Integer zTickTime = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperTickTime", - Integer.class, 2000); + Integer.class, + defaultTickTime); if(zTickTime == null) { throw new ConfigurationException ("null zookeeperTickTime"); @@ -1036,6 +1112,9 @@ this.peerState.setTickTime(zTickTime); //initLimit + Integer defaultInitLimit = + ConfigDeployUtil.getInt + ("federation.zookeeperInitLimit"); Integer zInitLimit = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperInitLimit", @@ -1047,10 +1126,14 @@ this.peerState.setInitLimit(zInitLimit); //syncLimit + Integer defaultSyncLimit = + ConfigDeployUtil.getInt + ("federation.zookeeperSyncLimit"); Integer zSyncLimit = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperSyncLimit", - Integer.class, 2); + Integer.class, + defaultSyncLimit); if(zSyncLimit == null) { throw new ConfigurationException ("null zookeeperSyncLimit"); @@ -1058,10 +1141,14 @@ this.peerState.setSyncLimit(zSyncLimit); //electionAlg + Integer defaultElectionAlg = + ConfigDeployUtil.getInt + ("federation.zookeeperElectionAlg"); Integer zElectionAlg = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperElectionAlg", - Integer.class, 3); + Integer.class, + defaultElectionAlg); if(zElectionAlg == null) { throw new ConfigurationException ("null zookeeperElectionAlg"); @@ -1069,10 +1156,14 @@ this.peerState.setElectionAlg(zElectionAlg); //maxClientCnxns + Integer defaultMaxClientCnxns = + ConfigDeployUtil.getInt + ("federation.zookeeperMaxClientCnxns"); Integer zMaxClientCnxns = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperMaxClientCnxns", - Integer.class, 10); + Integer.class, + defaultMaxClientCnxns); if(zMaxClientCnxns == null) { throw new ConfigurationException ("null zookeeperMaxClientCnxns"); @@ -1080,10 +1171,11 @@ this.peerState.setMaxClientCnxns(zMaxClientCnxns); // Because this is the first time this service is started, - // and because the config is retrieved from a jini config, - // there is no knowledge (yet) of the other zookeeper - // servers, other than the total number of peers that - // are expected to make up the ensemble. If the ensemble + // and because the config is retrieved from a deployment + // specific configuration, there is no knowledge (yet) + // of the other zookeeper servers, other than the total + // number of peers that are expected to make up the + // ensemble (zookeeperEnsembleSize). If the ensemble // will consist of only this service peer, then the // peerConfigMap can be created and populated with this // service's server information. But if the ensemble will @@ -1092,21 +1184,31 @@ // servers are discovered and a leader is elected. //zookeeperNetwork (peerAddress) - String zookeeperNetwork = NicUtil.getIpAddress("default.nic", ConfigDeployUtil.getString("node.serviceNetwork"), false); + String zookeeperNetwork = + NicUtil.getIpAddress + ("default.nic", + ConfigDeployUtil.getString + ("node.serviceNetwork"), + false); if(zookeeperNetwork == null) { throw new ConfigurationException ("null zookeeperNetwork"); } InetAddress peerAddress = - NicUtil.getInetAddress(zookeeperNetwork, 0, null, true); + NicUtil.getInetAddress + (zookeeperNetwork, 0, null, true); this.peerState.setPeerAddress(peerAddress); //peerPort + Integer defaultPeerPort = + ConfigDeployUtil.getInt + ("federation.zookeeperPeerPort"); Integer peerPort = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperPeerPort", - Integer.class, 2888); + Integer.class, + defaultPeerPort); if(peerPort == null) { throw new ConfigurationException ("null zookeeperPeerPort"); @@ -1114,6 +1216,9 @@ this.peerState.setPeerPort(peerPort); //electionPort + Integer defaultElectionPort = + ConfigDeployUtil.getInt + ("federation.zookeeperElectionPort"); Integer electionPort = (Integer)config.getEntry(COMPONENT_NAME, "zookeeperElectionPort", @@ -1125,10 +1230,14 @@ this.peerState.setElectionPort(electionPort); //nQuorumPeers + Integer defaultEnsembleSize = + ConfigDeployUtil.getInt + ("federation.zookeeperEnsembleSize"); Integer nQuorumPeers = (Integer)config.getEntry(COMPONENT_NAME, - "nQuorumPeers", - Integer.class, 1); + "zookeeperEnsembleSize", + Integer.class, + defaultEnsembleSize); if(nQuorumPeers == null) { throw new ConfigurationException ("null nQuorumPeers"); @@ -1140,7 +1249,18 @@ this.peerState.setNQuorumPeers(nQuorumPeers); if(nQuorumPeers > 1) { - this.peerState.setPeerId(0L);//0 - no peers discovered + + // nQuorumPeers > 1 means that the ensemble is + // configured to not be standalone; in which case, + // this service's peer id is initially set to 0 + // to indicate to this service's init method that + // the initQuorumPeerData method must be invoked + // to dicover the other peers so in the ensemble + // so that each peer's id (myid) can be set to a + // unique value between 1 and nQuorumPeers + + this.peerState.setPeerId(0L); + } else {//nQuorumPeers == 1, populate peerConfigMap long peerId = 1L; this.peerState.setPeerId(peerId); Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/config/quorum.config =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/config/quorum.config 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/config/quorum.config 2010-11-22 17:42:16 UTC (rev 3971) @@ -58,14 +58,7 @@ ( new String[] { System.getProperty("app.home", "${user.dir}"), "${/}var${/}state${/}quorumState" } ); - zookeeperDataDir = "data"; - zookeeperDataLogDir = "data.log"; - zookeeperClientPort = new Integer(2181); - zookeeperTickTime = new Integer(2000); - zookeeperInitLimit = new Integer(5); - zookeeperSyncLimit = new Integer(2); - zookeeperElectionAlg = new Integer(3);//0 = udp, 3 = tcp - zookeeperMaxClientCnxns = new Integer(10); + peerDiscoveryPeriod = 300000L;// wait 5 minutes for peer discovery // If standard zookeeper config is specified, // it will override jini config; for example, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/ConfigDeployUtil.java 2010-11-22 17:42:16 UTC (rev 3971) @@ -400,10 +400,10 @@ throws ConfigurationException { String validValuesStr = - (String) getDeploymentProperties().get(parameter + STRINGVALS); + (String) getDeploymentProperties().get(parameter + STRINGVALS); if (validValuesStr != null) { - String[] validValues = validValuesStr.split(","); + String[] validValues = (validValuesStr.trim()).split(","); if (!Arrays.asList(validValues).contains(value)) { throw new ConfigurationException ("invalid string parameter ["+parameter+"] in " @@ -417,11 +417,11 @@ throws ConfigurationException { String validValuesStr = - (String)(getDeploymentProperties().get(parameter + STRINGVALS)); + (String)(getDeploymentProperties().get(parameter + STRINGVALS)); String[] values = value.split(","); if (validValuesStr != null) { - String[] validValues = validValuesStr.split(","); + String[] validValues = (validValuesStr.trim()).split(","); List validValuesList = Arrays.asList(validValues); for (int i=0; i<values.length; i++) { if (!validValuesList.contains(values[i])) { @@ -434,6 +434,39 @@ return values; } +//NOTE: there seems to be a problem with NumberFormat.parse; in particular, +// with parsing strings representing integers. During testing +// there were numerous occasions where, when parsing a string value +// from the properties (from either default-deploy.properties or +// from deploy.properties), either a NumberFormatException or +// a ParseException would occur; even when the value being parsed +// was valid. These exceptions, which occurred randomly, typically +// would indicate that value being parsed was the empty string ("") +// for the case of a NumberFormatException, or the resulting parsed +// max (or min) value did not satisfy the desired criteria. For +// example, when a strvalue of "2181" was input, and the maximum +// value retrieved from the properties was "65353", upon parsing the +// string "65353", the value 1024 was returned; thus, because 1024 +// is less than 2181, a ParseException was thrown. In other cases, +// although a NumberFormatException was thrown by the call to +// NumberFormat.parse because that method interpretted the string +// "65353" as the empty string. +// +// Upon doing a search of the various bug databases and related +// user reports, there seems to be some indication that the +// parse method may at some point invoke the indexOf method on +// the string that is being parsed. Thus, the problem being +// described here may be related to JDK bug described at the url, +// +// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6935535 +// +// (Or it may be related to the locale being used by the tests that +// encountered this issue?) +// +// As a work around, the validateInt and validateLong methods each +// parse the given string using Integer.parseInt and Long.parseLong +// respectively; at least until this issue is resolved. + private static int validateInt(String parameter, String strvalue) throws ConfigurationException { @@ -445,33 +478,42 @@ int value = str2int(strvalue); if (maxString != null) { +/* see note ************************************************ try { - int max = numberFormat.parse(maxString).intValue(); - if (value > max) { - throw new ConfigurationException - ("parameter ["+parameter+"] " - +"exceeds maximum ["+max+"]"); - } + int max = numberFormat.parse(maxString.trim()).intValue(); } catch (ParseException e) { throw new NumberFormatException ("invalid maximum integer for parameter: " +parameter); } +************************************************************* */ +int max = Integer.parseInt(maxString.trim()); + if (value > max) { + throw new ConfigurationException + ("parameter ["+parameter+"] exceeds maximum " + +"["+strvalue+" > "+max + +" (maxString="+maxString+")]"); + } } if (minString != null) { +/* see note ************************************************ try { - int min = numberFormat.parse(minString).intValue(); - if (value < min) { - throw new ConfigurationException - ("parameter ["+parameter+"] " - +"is less than maximum ["+min+"]"); - } + int min = numberFormat.parse(minString.trim()).intValue(); } catch (ParseException e) { throw new NumberFormatException ("invalid minimum integer for parameter: " +parameter); } +************************************************************* */ +int min = Integer.parseInt(minString.trim()); + if (value < min) { + throw new ConfigurationException + ("parameter ["+parameter+"] is less than minimum " + +"["+strvalue+" < "+min + +" (minString="+minString+")]"); + } } + return value; } @@ -486,32 +528,40 @@ long value = str2long(strvalue); if (maxString != null) { +/* see note ************************************************ try { - long max = numberFormat.parse(maxString).longValue(); - if (value > max) { - throw new ConfigurationException - ("parameter ["+parameter+"] " - +"exceeds maximum ["+max+"]"); - } + long max = numberFormat.parse(maxString.trim()).longValue(); } catch (ParseException e) { throw new NumberFormatException ("invalid maximum long for parameter: " +parameter); } +************************************************************* */ +long max = Long.parseLong(maxString.trim()); + if (value > max) { + throw new ConfigurationException + ("parameter ["+parameter+"] exceeds maximum " + +"["+strvalue+" > "+max + +" (maxString="+maxString+")]"); + } } if (minString != null) { +/* see note ************************************************ try { - long min = numberFormat.parse(minString).longValue(); - if (value < min) { - throw new ConfigurationException - ("parameter ["+parameter+"] " - +"is less than manimum ["+min+"]"); - } + long min = numberFormat.parse(minString.trim()).longValue(); } catch (ParseException e) { throw new NumberFormatException ("invalid minimum long for parameter: " +parameter); } +************************************************************* */ +long min = Long.parseLong(minString.trim()); + if (value < min) { + throw new ConfigurationException + ("parameter ["+parameter+"] is less than minimum " + +"["+strvalue+" < "+min + +" (minString="+minString+")]"); + } } return value; } @@ -603,7 +653,7 @@ //BTM - when the new version str2long was used. Thus, at //BTM - least temporarily, until the problem can be //BTM - diagnosed and fixed, the old versions of those -//BTM - methods are still be used below. +//BTM - methods are still being used below. //BTM private static int str2int(String argx) { //BTM Number n = null; //BTM try { Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/default-deploy.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/default-deploy.properties 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/default-deploy.properties 2010-11-22 17:42:16 UTC (rev 3971) @@ -16,12 +16,72 @@ federation.minNodes.max=1000 federation.minNodes.type=long -federation.quorumSize.description=Number of service instances required for a quorum -federation.quorumSize.default=1 -federation.quorumSize.min=1 -federation.quorumSize.max=11 -federation.quorumSize.type=long +federation.zookeeperEnsembleSize.description=Number of quorum servers making up the federation's ZooKeeper ensemble +federation.zookeeperEnsembleSize.default=1 +federation.zookeeperEnsembleSize.min=1 +federation.zookeeperEnsembleSize.max=11 +federation.zookeeperEnsembleSize.type=int +federation.zookeeperClientPort.description=Port on which each quorum server should listen for connection requests from clients +federation.zookeeperClientPort.default=2181 +federation.zookeeperClientPort.min=1024 +federation.zookeeperClientPort.max=65353 +federation.zookeeperClientPort.type=int + +federation.zookeeperDataDir.description=Path to the directory where each quorum server should persist its in-memory database snapshots, as well as its transaction logs, unless the federation.zookeeperDataLogDir parameter is set to a different value +federation.zookeeperDataDir.default=data +federation.zookeeperDataDir.type=string + +federation.zookeeperDataLogDir.description=Path to the directory where each quorum server should persist its transaction logs +federation.zookeeperDataLogDir.default=data +federation.zookeeperDataLogDir.type=string + +federation.zookeepeTickTime.description=Length of the basic time unit (in milliseconds) used by each quorum server to regulate the heartbeats and timeouts enforced by each server +federation.zookeepeTickTime.default=2000 +federation.zookeepeTickTime.min=1 +federation.zookeepeTickTime.max=2147483647 +federation.zookeepeTickTime.type=int + +federation.zookeeperInitLimit.description=Amount of time (number of ticks) each quorum server should allow for followers to connect and sync with the leader in the federation's ensemble; which should be increased as needed, when the amount of data being managed by the ensemble is large +federation.zookeeperInitLimit.default=5 +federation.zookeeperInitLimit.min=1 +federation.zookeeperInitLimit.max=2147483647 +federation.zookeeperInitLimit.type=int + +federation.zookeeperSyncLimit.description=Amount of time (number of ticks) each quorum server should allow for followers to sync with the leader in the federation's ensemble; where if a given follower falls too far behind the leader, the follower is dropped +federation.zookeeperSyncLimit.default=2 +federation.zookeeperSyncLimit.min=1 +federation.zookeeperSyncLimit.max=2147483647 +federation.zookeeperSyncLimit.type=int + +federation.zookeeperElectionAlgorithm.description=Leader election algorithm the federation's ensemble should use when electing a leader; where a value of 0 corresponds to the original UDP-based version, 1 corresponds to the non-authenticated UDP-based version of fast leader election, 2 corresponds to the authenticated UDP-based version of fast leader election, and 3 corresponds to the TCP-based version of leader election (note that only 0 and 3 are currently supported) +federation.zookeeperElectionAlg.default=3 +federation.zookeeperElectionAlg.min=0 +federation.zookeeperElectionAlg.max=3 +federation.zookeeperElectionAlg.type=int + +federation.zookeeperMaxClientCnxns.description=Maximum number of concurrent connections (at the socket level) that a single client, identified by IP address, is allowed to make to a single member of the federation's ensemble; where 0 corresponds to no limit +federation.zookeeperMaxClientCnxns.default=0 +federation.zookeeperMaxClientCnxns.min=0 +federation.zookeeperMaxClientCnxns.max=2147483647 +federation.zookeeperMaxClientCnxns.type=int + +federation.zookeeperPeerPort.description=Port on which quorum servers acting as followers in the federation's ensemble request a connection to the leader in the ensemble +federation.zookeeperPeerPort.default=2888 +federation.zookeeperPeerPort.min=1024 +federation.zookeeperPeerPort.max=65353 +federation.zookeeperPeerPort.type=int + +federation.zookeeperElectionPort.description=Port on which connections are requested by each quorum server when participating in leader election +federation.zookeeperElectionPort.default=3888 +federation.zookeeperElectionPort.min=1024 +federation.zookeeperElectionPort.max=65353 +federation.zookeeperElectionPort.type=int + +federation.zookeeperJmxLog4j.description=When true, each peer in the federation's ensemble registers MBeans that allow one to manage log4j settings using JMX +federation.zookeeperJmxLog4j.type=boolean +federation.zookeeperJmxLog4j.default=false + # Node properties node.name.description=logical name assigned to the node node.name.default= @@ -95,4 +155,3 @@ rmi.connectTimeout.type=long rmi.connectTimeout.default=10000 rmi.connectTimeout.min=0 - Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/deploy.properties =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/deploy.properties 2010-11-19 18:12:50 UTC (rev 3970) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/config/deploy.properties 2010-11-22 17:42:16 UTC (rev 3971) @@ -3,7 +3,18 @@ # Federation properties #federation.name=com.bigdata.group.0 #federation.minNodes=1 -#federation.quorumSize=1 +#federation.zookeeperEnsembleSize=1 +#federation.zookeeperClientPort=2181 +#federation.zookeeperDataDir=data +#federation.zookeeperDataLogDir=data +#federation.zookeepeTickTime=2000 +#federation.zookeeperInitLimit=5 +#federation.zookeeperSyncLimit=2 +#federation.zookeeperElectionAlg=3 +#federation.zookeeperMaxClientCnxns=0 +#federation.zookeeperPeerPort=2888 +#federation.zookeeperElectionPort=3888 +#federation.zookeeperJmxLog4j=false # Node properties #node.name=myHost Added: branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java (rev 0) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/quorum/QuorumPeerServiceTest.java 2010-11-22 17:42:16 UTC (rev 3971) @@ -0,0 +1,844 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.quorum; + +// NOTE: remove commented out references to org.junit and annotations +// when/if the junit infrastructure is upgraded to a version that +// supports those constructs. + +import static junit.framework.Assert.*; + +//import static org.junit.Assert.*; +//import org.junit.After; +//import org.junit.BeforeClass; +//import org.junit.Test; + +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import com.bigdata.service.QuorumPeerService; +import com.bigdata.util.Util; +import com.bigdata.util.config.NicUtil; +import com.bigdata.util.config.ConfigDeployUtil; +import com.bigdata.util.config.ConfigurationUtil; +import com.bigdata.util.config.LogUtil; + +import com.sun.jini.admin.DestroyAdmin; +import com.sun.jini.start.NonActivatableServiceDescriptor; +import com.sun.jini.start.NonActivatableServiceDescriptor.Created; +import net.jini.admin.Administrable; +import net.jini.config.AbstractConfiguration; +import net.jini.config.ConfigurationException; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.lookup.ServiceItem; +import net.jini.core.lookup.ServiceRegistrar; +import net.jini.core.lookup.ServiceTemplate; +import net.jini.discovery.DiscoveryManagement; +import net.jini.discovery.DiscoveryGroupManagement; +import net.jini.discovery.DiscoveryLocatorManagement; +import net.jini.discovery.DiscoveryListener; +import net.jini.discovery.DiscoveryEvent; +import net.jini.discovery.LookupDiscoveryManager; +import net.jini.lookup.LookupCache; +import net.jini.lookup.ServiceDiscoveryEvent; +import net.jini.lookup.ServiceDiscoveryListener; +import net.jini.lookup.ServiceDiscoveryManager; +import net.jini.security.BasicProxyPreparer; +import net.jini.security.ProxyPreparer; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import org.apache.zookeeper.ZooKeeper; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.rmi.RMISecurityManager; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/* + * Tests the QuorumPeerService smart proxy implementation that wraps the + * ZooKeeper QuorumPeerMain server. Tests include starting and stopping + * multiple instances of the service, discovering the service as a Jini + * service, and verification of various client interactions with the + * service. + * + * NOTE: The intent of this test class is to start a single ensemble + * of N QuorumPeerService instances once, when this class is + * instantiated, allowing all test methods of this class to interact + * with only that one ensemble, and then shutdown the ensemble + * only after all tests have been run (or this class exits on + * failure); rather than start and stop a new ensemble with each + * test method executed. With more recent versions of junit, this + * is possible using annotations such as @BeforeClass and @After. + * Unfortunately, the current version of junit that is being used + * does not support such annotations. Additionally, the current + * design of the test infrastructure expects that this test class + * sub-classes org.junit.TestCase, which generally requires that + * constructors be provided with this class. + * + * In order to achieve the desired intent described above, while + * adhering to the requirements imposed by the current test + * framework, this class provide a method (beforeClassSetup) that + * is invoked once (from within the constructor), prior to the + * execution of any of the test methods; and a method that is + * invoked only after all the test methods of this class have been + * invoked (afterClassTeardown). + * + * When/if the junit infrastructure used by this test framework is + * ever upgraded to a version that supports annotations, and this test + * class is changed so that it no longer has to extend TestCase, then + * the appropriate changes should be made to this class to exploit + * the features of that new version; as indicated in the documentation + * below. + */ +public class QuorumPeerServiceTest extends TestCase { + + private static String pSep = System.getProperty("path.separator"); + private static String fSep = System.getProperty("file.separator"); + private static String tmpDir = System.getProperty("java.io.tmpdir"); + private static String userDir = System.getProperty("user.dir"); + private static String policy = + System.getProperty("java.security.policy"); + private static String log4jJar = System.getProperty("log4j.jar"); + private static String jskPlatformJar = + System.getProperty("jsk-platform.jar"); + private static String jskLibJar = System.getProperty("jsk-lib.jar"); + private static String zookeeperJar = System.getProperty("zookeeper.jar"); + private static String federationName = + System.getProperty("federation.name"); + private static String bigdataRoot = System.getProperty("app.home"); + private static String stateBase = tmpDir+fSep+"state"; + private static String codebasePortStr = System.getProperty + ("codebase.port","23333"); + private static int codebasePort = Integer.parseInt(codebasePortStr); + + private static Logger logger; + + // for starting smart proxy implementation of ZooKeeper quorum server + private static String thisHost; + private static String jskCodebase; + private static String groups;//for overriding the configured groups + + private static int[] quorumClientPort = {2180, 2181, 2182}; + private static int[] quorumPeerPort = {2887, 2888, 2889}; + private static int[] quorumElectionPort = {3887, 3888, 3889}; + private static String quorumServerCodebase; + private static String quorumCodebase; + private static String quorumClasspath = jskPlatformJar+pSep + +jskLibJar+pSep + +zookeeperJar+pSep + +log4jJar; + private static String quorumImplName = "com.bigdata.quorum.ServiceImpl"; + private static String quorumConfig = bigdataRoot+fSep + +"dist"+fSep + +"bigdata"+fSep + +"var"+fSep + +"config"+fSep + +"jini"+fSep + +"quorum.config"; + private static String[] quorumStateDir = + { + stateBase+fSep+"quorumState"+"."+quorumClientPort[0] + +"."+quorumPeerPort[0] + +"."+quorumElectionPort[0], + stateBase+fSep+"quorumState"+"."+quorumClientPort[1] + +"."+quorumPeerPort[1] + +"."+quorumElectionPort[1], + stateBase+fSep+"quorumState"+"."+quorumClientPort[2] + +"."+quorumPeerPort[2] + +"."+quorumElectionPort[2] + }; + private static String[] quorumPersistenceOverride = + { + "com.bigdata.quorum.persistenceDirectory=new String(" + +"\""+quorumStateDir[0]+"\")", + "com.bigdata.quorum.persistenceDirectory=new String(" + +"\""+quorumStateDir[1]+"\")", + "com.bigdata.quorum.persistenceDirectory=new String(" + +"\""+quorumStateDir[2]+"\")" + }; + private static int nQuorumServicesExpected = + quorumPersistenceOverride.length; + private static boolean quorumsAlreadyStarted = false; + private static HashSet<QuorumPeerService> quorumSet = + new HashSet<QuorumPeerService>(); + + private static String[] groupsToDiscover = new String[] {"qaQuorumGroup"}; + private static LookupLocator[] locsToDiscover = new LookupLocator[0]; + private static DiscoveryManagement ldm; + protected static ServiceDiscoveryManager sdm; + private static CacheListener cacheListener; + private static LookupCache quorumCache; + + private static ExecutorService serviceStarterTaskExecutor = + Executors.newFixedThreadPool(quorumStateDir.length); + + private static boolean setupOnceAlready = false; + private static boolean exceptionInSetup = false; + private static boolean lastTest = false; + + // When using ServiceStarter to start the desired service instances, + // need to hold references to each service instance that is created + // to prevent distributed garbage collection on each service ref. The + // map below is used to hold those references; where the map's key + // is the persistence directory path for the corresponding service + // reference. + private static Map<String, Created> refMap = + new ConcurrentHashMap<String, Created>(); + + private String testName; + private boolean testPassed; + + // NOTE: remove constructors and when/if the junit infrastructure + // is upgraded to a version that supports annotations and this test + // is changed so that it no longer has to extend TestCase. + public QuorumPeerServiceTest() throws Exception { + beforeClassSetup(); + } + + public QuorumPeerServiceTest(String name) throws Exception { + super(name); + beforeClassSetup(); + } + + // Test framework methods ------------------------------------------------ + + // Intended to be run before any test methods are executed. This method + // starts all services and creates any resources whose life cycles + // are intended to span all tests; rather than being set up and torn down + // from test to test. + // + // NOTE: use the @BeforeClass annotation when/if the junit framework is + // upgraded to a version that supports annotations. +// @BeforeClass public static void beforeClassSetup() { + public static synchronized void beforeClassSetup() throws Exception { + if (setupOnceAlready) return; + setupOnceAlready = true; + try { + String logConfigFile = userDir+fSep+"bigdata"+fSep+"src"+fSep + +"resources"+fSep+"logging"+fSep + +"log4j.properties"; + System.setProperty("log4j.configuration", logConfigFile); + logger = LogUtil.getLog4jLogger + ( (QuorumPeerServiceTest.class).getName() ); + logger.debug("\n\n-- beforeClassSetup ENTER ----------\n"); + + // Setup both lookup & service discovery, plus groups, codebase ... + setupDiscovery(); + + // Start the services making up the ensemble + + String ensembleSize = "com.bigdata.quorum.zookeeperEnsembleSize=" + +"new Integer("+quorumStateDir.length+")"; + long discoveryPeriod = 10L*1000L; + String discoveryPeriodStr = + "com.bigdata.quorum.peerDiscoveryPeriod="+discoveryPeriod; + String jmxLog4j = + "com.bigdata.quorum.zookeeperJmxLog4j=new Boolean("+false+")"; + for (int i=0; i<quorumStateDir.length; i++) { + String clientPort = + "com.bigdata.quorum.zookeeperClientPort=" + +"new Integer("+quorumClientPort[i]+")"; + String peerPort = + "com.bigdata.quorum.zookeeperPeerPort=" + +"new Integer("+quorumPeerPort[i]+")"; + String electionPort = + "com.bigdata.quorum.zookeeperElectionPort=" + +"new Integer("+quorumElectionPort[i]+")"; + String joinGroups = + "com.bigdata.quorum.groupsToJoin=new String[] " + +groups; + + serviceStarterTaskExecutor.execute + ( new ServiceStarterTask(quorumStateDir[i], + ensembleSize, + clientPort, + peerPort, + electionPort, + quorumPersistenceOverride[i], + joinGroups, + discoveryPeriodStr, + jmxLog4j) ); + } + + // Give the services time to start & rendezvous with each other + int nWait = 10; + for (int i=0; i<nWait; i++) { + if (refMap.size() == quorumStateDir.length) break; + Util.delayMS(1L*1000L); + } + if (refMap.size() != quorumStateDir.length) { + throw new Exception("did not start all expected services " + +"[expected="+quorumStateDir.length + +", actual="+refMap.size()+"]"); + } + } catch(Exception e) { + exceptionInSetup = true; + throw e; + } + logger.debug("\n\n-- beforeClassSetup EXIT ----------\n"); + } + + // Intended to be run after all test methods have completed executing. + // This method terminates all services and cleans up resources whose + // life cycles span all tests; rather than being set up and torn down + // from test to test. + // + // NOTE: use the @AfterClass annotation when/if the junit framework is + // upgraded to a version that supports annotations. +// @AfterClass public void afterClassTearDown() { + public void afterClassTearDown() { + logger.debug("\n\n-- afterClassTearDown ENTER ----------\n"); + if(sdm != null) { + try { + sdm.terminate(); + logger.log(Level.INFO, "terminated sdm"); + } catch(Throwable t) { } + } + + if(ldm != null) { + try { + ldm.terminate(); + logger.log(Level.INFO, "terminated ldm"); + } catch(Throwable t) { } + } + + stopAllQuorums(); + + // Clean up persistence directories. Note that a delay is injected + // before attempting to delete the persistence directories. This + // is done to allow ZooKeeper to fully complete whatever clean up + // processing it performs. Through trial and error it has been + // found that without such a delay, although this test class + // seems to complete its processing successfully, the next test + // class that is run often times exits before executing any + // test methods; without any indication of what caused the + // premature exit. Thus, until it can be determined what is + // actually causing this issue, and how to address it, the delay + // performed below will be used to allow ZooKeeper to clean + // up appropriately. + + Util.delayMS(1L*1000L);//delay 1 second + for (int i=0; i<quorumStateDir.length; i++) { + String persistenceDirectory = quorumStateDir[i]; +... [truncated message content] |
From: <tho...@us...> - 2010-11-19 18:12:56
|
Revision: 3970 http://bigdata.svn.sourceforge.net/bigdata/?rev=3970&view=rev Author: thompsonbry Date: 2010-11-19 18:12:50 +0000 (Fri, 19 Nov 2010) Log Message: ----------- Added a skeleton for tests for binary compatibility. Added Paths: ----------- trunk/bigdata-compatibility/ trunk/bigdata-compatibility/src/ trunk/bigdata-compatibility/src/test/ trunk/bigdata-compatibility/src/test/com/ trunk/bigdata-compatibility/src/test/com/bigdata/ trunk/bigdata-compatibility/src/test/com/bigdata/journal/ trunk/bigdata-compatibility/src/test/com/bigdata/journal/TestBinaryCompatibility.java Added: trunk/bigdata-compatibility/src/test/com/bigdata/journal/TestBinaryCompatibility.java =================================================================== --- trunk/bigdata-compatibility/src/test/com/bigdata/journal/TestBinaryCompatibility.java (rev 0) +++ trunk/bigdata-compatibility/src/test/com/bigdata/journal/TestBinaryCompatibility.java 2010-11-19 18:12:50 UTC (rev 3970) @@ -0,0 +1,276 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +/* + * Created on Nov 19, 2010 + */ +package com.bigdata.journal; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.UUID; + +import junit.framework.TestCase2; + +import com.bigdata.Banner; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.IndexMetadata; + +/** + * Test suite for binary compatibility, portability, and forward compatibility + * or automated migration of persistent stores and persistence or serialization + * capable objects across different bigdata releases. The tests in this suite + * rely on artifacts which are archived within SVN. + * + * @todo create w/ small extent and truncate (RW store does not support + * truncate). + * + * @todo test binary migration and forward compatibility. + * + * @todo stubs to create and organize artifacts,etc. + * + * @todo data driven test suite? + * + * @todo create artifact for each release, name the artifacts systematically, + * e.g., test.release.(RW|WORM).jnl or test.release.seg. Collect a list of + * the created artifacts and run each test against each of the versions of + * the artifact. + * + * @todo Force artifact file name case for file system compatibility? + * + * @todo test journal (WORM and RW), btree, index segment, row store, persistent + * data structures (checkpoints, index metadata, tuple serializers, etc.), + * RDF layer, RMI message formats, etc. + * + * @todo Specific tests for + * <p> + * Name2Addr and DefaultKeyBuilderFactory portability problem. See + * https://sourceforge.net/apps/trac/bigdata/ticket/193 + * <p> + * WORM global row store resolution problem introduced in the + * JOURNAL_HA_BRANCH. See + * https://sourceforge.net/apps/trac/bigdata/ticket/171#comment:5 + * <p> + * Sparse row store JDK encoding problem: + * https://sourceforge.net/apps/trac/bigdata/ticket/107 + */ +public class TestBinaryCompatibility extends TestCase2 { + + /** + * + */ + public TestBinaryCompatibility() { + } + + /** + * @param name + */ + public TestBinaryCompatibility(String name) { + super(name); + } + + /** + * @todo munge the release version into a name that is compatibility with + * the file system ("." to "_"). Store artifacts at each release? At + * each release in which an incompatibility is introduced? At each + * release in which a persistence capable data structure or change is + * introduced? + */ + static protected final File artifactDir = new File( + "bigdata-compatibility/src/resources/artifacts"); + + protected static class Version { + private final String version; + private final String revision; + public Version(String version,String revision) { + this.version = version; + this.revision = revision; + } + + /** + * The bigdata version number associated with the release. This is in + * the form <code>xx.yy.zz</code> + */ + public String getVersion() { + return version; + } + + /** + * The SVN repository revision associated with the release. This is in + * the form <code>####</code>. + */ + public String getRevision() { + return revision; + } + } + + /** + * Known release versions. + */ + protected static Version V_0_83_2 = new Version("0.83.2", "3349"); + + /** + * Tested Versions. + */ + protected Version[] versions = new Version[] { + V_0_83_2 + }; + + protected void setUp() throws Exception { + + Banner.banner(); + + super.setUp(); + + if (!artifactDir.exists()) { + + if (!artifactDir.mkdirs()) { + + throw new IOException("Could not create: " + artifactDir); + + } + + } + + for (Version version : versions) { + + final File versionDir = new File(artifactDir, version.getVersion()); + + if (!versionDir.exists()) { + + if (!versionDir.mkdirs()) { + + throw new IOException("Could not create: " + versionDir); + + } + + } + + } + + } + + protected void tearDown() throws Exception { + + super.tearDown(); + + } + + /** + * @throws Throwable + * + * @todo Each 'test' should run an instance of a class which knows how to + * create the appropriate artifacts and how to test them. + */ + public void test_WORM_compatibility_with_JOURNAL_HA_BRANCH() + throws Throwable { + + final Version version = V_0_83_2; + + final File versionDir = new File(artifactDir, version.getVersion()); + + final File artifactFile = new File(versionDir, getName() + + BufferMode.DiskWORM + Journal.Options.JNL); + + if (!artifactFile.exists()) { + + createArtifact(artifactFile); + + } + + verifyArtifact(artifactFile); + + } + + protected void createArtifact(final File artifactFile) throws Throwable { + + if (log.isInfoEnabled()) + log.info("Creating: " + artifactFile); + + final Properties properties = new Properties(); + + properties.setProperty(Journal.Options.FILE, artifactFile.toString()); + + properties.setProperty(Journal.Options.INITIAL_EXTENT, "" + + Journal.Options.minimumInitialExtent); + + final Journal journal = new Journal(properties); + + try { + + final IndexMetadata md = new IndexMetadata(UUID.randomUUID()); + + final IIndex ndx = journal.registerIndex("kb.spo.SPO", md); + + ndx.insert(1,1); + + journal.commit(); + + // reduce to minimum footprint. + journal.truncate(); + + } catch (Throwable t) { + + journal.destroy(); + + throw new RuntimeException(t); + + } finally { + + if (journal.isOpen()) + journal.close(); + + } + + } + + protected void verifyArtifact(final File artifactFile) throws Throwable { + + if (log.isInfoEnabled()) + log.info("Verifying: " + artifactFile); + + final Properties properties = new Properties(); + + properties.setProperty(Journal.Options.FILE, artifactFile.toString()); + + final Journal journal = new Journal(properties); + + try { + + final IIndex ndx = journal.getIndex("kb.spo.SPO"); + + assertNotNull(ndx); + + assertEquals(1,ndx.lookup(1)); + + } finally { + + journal.close(); + + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 19:26:25
|
Revision: 3969 http://bigdata.svn.sourceforge.net/bigdata/?rev=3969&view=rev Author: thompsonbry Date: 2010-11-18 19:26:19 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Renamed m_bits to m_live to be in keeping with the wiki documentation. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-18 19:18:43 UTC (rev 3968) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-11-18 19:26:19 UTC (rev 3969) @@ -31,7 +31,7 @@ /** * Bit maps for an allocator. The allocator is a bit map managed as int[]s. * - * @todo change to make {@link #m_transients}, {@link #m_bits}, and + * @todo change to make {@link #m_transients}, {@link #m_live}, and * {@link #m_commit} final fields and then modify {@link FixedAllocator} * to use {@link System#arraycopy(Object, int, Object, int, int)} to copy * the data rather than cloning it. @@ -70,7 +70,7 @@ * Just the newly allocated bits. This will be copied onto {@link #m_commit} * when the current native transaction commits. */ - final int m_bits[]; + final int m_live[]; /** * All of the bits from the commit point on entry to the current native * transaction plus any newly allocated bits. @@ -86,7 +86,7 @@ // m_writeCache = cache; m_ints = bitSize; m_commit = new int[bitSize]; - m_bits = new int[bitSize]; + m_live = new int[bitSize]; m_transients = new int[bitSize]; } @@ -98,7 +98,7 @@ // Now check to see if it allocated final int bit = (addr - m_addr) / size; - return RWStore.tstBit(m_bits, bit); + return RWStore.tstBit(m_live, bit); } public boolean addressInRange(final int addr, final int size) { @@ -116,7 +116,7 @@ } public boolean freeBit(final int bit) { - if (!RWStore.tstBit(m_bits, bit)) { + if (!RWStore.tstBit(m_live, bit)) { throw new IllegalArgumentException("Freeing bit not set"); } @@ -129,7 +129,7 @@ * output to the file by removing any pending write to the now freed * address. On large transaction scopes this may be significant. */ - RWStore.clrBit(m_bits, bit); + RWStore.clrBit(m_live, bit); if (!RWStore.tstBit(m_commit, bit)) { RWStore.clrBit(m_transients, bit); @@ -162,7 +162,7 @@ final int bit = RWStore.fndBit(m_transients, m_ints); if (bit != -1) { - RWStore.setBit(m_bits, bit); + RWStore.setBit(m_live, bit); RWStore.setBit(m_transients, bit); return bit; @@ -173,7 +173,7 @@ public boolean hasFree() { for (int i = 0; i < m_ints; i++) { - if (m_bits[i] != 0xFFFFFFFF) { + if (m_live[i] != 0xFFFFFFFF) { return true; } } @@ -185,7 +185,7 @@ int total = m_ints * 32; int allocBits = 0; for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { + if (RWStore.tstBit(m_live, i)) { allocBits++; } } @@ -211,7 +211,7 @@ final int total = m_ints * 32; for (int i = 0; i < total; i++) { - if (RWStore.tstBit(m_bits, i)) { + if (RWStore.tstBit(m_live, i)) { addrs.add(new Integer(rootAddr - i)); } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 19:18:43 UTC (rev 3968) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 19:26:19 UTC (rev 3969) @@ -61,7 +61,7 @@ if (log.isDebugEnabled()) log.debug("Restored index " + index + " with " + getStartAddr() - + "[" + fb.m_bits[0] + "] from " + m_diskAddr); + + "[" + fb.m_live[0] + "] from " + m_diskAddr); m_index = index; } @@ -111,7 +111,7 @@ final int bit = offset % allocBlockRange; - if (RWStore.tstBit(block.m_bits, bit)) { + if (RWStore.tstBit(block.m_live, bit)) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; @@ -170,7 +170,7 @@ try { final AllocBlock fb = m_allocBlocks.get(0); if (log.isDebugEnabled()) - log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_bits[0]); + log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); final byte[] buf = new byte[1024]; final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); try { @@ -182,11 +182,11 @@ str.writeInt(block.m_addr); for (int i = 0; i < m_bitSize; i++) { - str.writeInt(block.m_bits[i]); + str.writeInt(block.m_live[i]); } // if (!m_store.isSessionPreserved()) { - block.m_transients = block.m_bits.clone(); + block.m_transients = block.m_live.clone(); // } /** @@ -196,11 +196,11 @@ if (m_context != null) { assert block.m_saveCommit != null; - block.m_saveCommit = block.m_bits.clone(); + block.m_saveCommit = block.m_live.clone(); // } else if (m_store.isSessionPreserved()) { // block.m_commit = block.m_transients.clone(); } else { - block.m_commit = block.m_bits.clone(); + block.m_commit = block.m_live.clone(); } } // add checksum @@ -243,17 +243,17 @@ block.m_addr = str.readInt(); for (int i = 0; i < m_bitSize; i++) { - block.m_bits[i] = str.readInt(); + block.m_live[i] = str.readInt(); /** * Need to calc how many free blocks are available, minor * optimization by checking against either empty or full to * avoid scanning every bit unnecessarily **/ - if (block.m_bits[i] == 0) { // empty + if (block.m_live[i] == 0) { // empty m_freeBits += 32; - } else if (block.m_bits[i] != 0xFFFFFFFF) { // not full - final int anInt = block.m_bits[i]; + } else if (block.m_live[i] != 0xFFFFFFFF) { // not full + final int anInt = block.m_live[i]; for (int bit = 0; bit < 32; bit++) { if ((anInt & (1 << bit)) == 0) { m_freeBits++; @@ -262,8 +262,8 @@ } } - block.m_transients = (int[]) block.m_bits.clone(); - block.m_commit = (int[]) block.m_bits.clone(); + block.m_transients = (int[]) block.m_live.clone(); + block.m_commit = (int[]) block.m_live.clone(); if (m_startAddr == 0) { m_startAddr = block.m_addr; @@ -642,7 +642,7 @@ final int bit = offset % allocBlockRange; - return RWStore.tstBit(block.m_bits, bit); + return RWStore.tstBit(block.m_live, bit); } public boolean isCommitted(int offset) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 19:18:50
|
Revision: 3968 http://bigdata.svn.sourceforge.net/bigdata/?rev=3968&view=rev Author: thompsonbry Date: 2010-11-18 19:18:43 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Updated to leave the version which works uncommented. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 19:14:05 UTC (rev 3967) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 19:18:43 UTC (rev 3968) @@ -182,9 +182,9 @@ * try to read it. */ // Obtaining a tx here protects against recycling. -// final long tx2 = store.newTx(ITx.READ_COMMITTED); + final long tx2 = store.newTx(ITx.READ_COMMITTED); // Using a historical read w/o a tx does NOT protect against recycling. - final long tx2 = store.getLastCommitTime(); +// final long tx2 = store.getLastCommitTime(); try { // lookup the UNISOLATED B+Tree. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 19:14:12
|
Revision: 3967 http://bigdata.svn.sourceforge.net/bigdata/?rev=3967&view=rev Author: thompsonbry Date: 2010-11-18 19:14:05 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Added exception to help identify problems where the caller reading against a historical commit point is not using a read-only transaction and hence is not protected by a read-lock. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PhysicalAddressResolutionException.java Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PhysicalAddressResolutionException.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PhysicalAddressResolutionException.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PhysicalAddressResolutionException.java 2010-11-18 19:14:05 UTC (rev 3967) @@ -0,0 +1,54 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Nov 18, 2010 + */ + +package com.bigdata.rwstore; + +/** + * Exception thrown when a logical address maps onto a physical address which is + * not currently allocated. The most common cause of this exception is a read on + * the database using a historical commit point which is not protected by a read + * lock. You should be using a read-only transaction rather than a bare + * historical read in order to be protected by a read lock. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class PhysicalAddressResolutionException extends + IllegalArgumentException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public PhysicalAddressResolutionException(final long addr) { + + super("Address did not resolve to physical address: " + addr); + + } + +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PhysicalAddressResolutionException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 18:33:30 UTC (rev 3966) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 19:14:05 UTC (rev 3967) @@ -1258,12 +1258,7 @@ assertAllocators(); - final String msg = "Address did not resolve to physical address: " - + addr; - - log.warn(msg); - - throw new IllegalArgumentException(msg); + throw new PhysicalAddressResolutionException(addr); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 18:33:30 UTC (rev 3966) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 19:14:05 UTC (rev 3967) @@ -171,10 +171,20 @@ } - /* - * Open a read-only transaction on the last commit time. - */ - final long tx2 = store.newTx(ITx.READ_COMMITTED); + /* + * Open a read-only transaction on the last commit time. + * + * Note: If you use store.getLastCommitTime() here instead you will + * have a read-historical view of the same data, but that view is + * NOT protected by a read lock. Running the example with this + * change will cause the RWStore to throw out an exception since the + * writes will have overwritten the historical data by the time you + * try to read it. + */ + // Obtaining a tx here protects against recycling. +// final long tx2 = store.newTx(ITx.READ_COMMITTED); + // Using a historical read w/o a tx does NOT protect against recycling. + final long tx2 = store.getLastCommitTime(); try { // lookup the UNISOLATED B+Tree. @@ -186,30 +196,20 @@ store.commit(); /* - * Verify that the read-only view has not seen those changes. - */ - if(false) { - final BTree readOnlyBTree = (BTree) store.getIndex(name, tx2); - - verifyWriteSet1(readOnlyBTree); - - } - - /* * Write some new records on the unisolated index. */ writeSet2(unisolatedBTree); store.commit(); - - /* - * Verify that the read-only view has not seen those changes. - * - * Note: This probably hits the index cache and everything in - * the read-only B+Tree was already materialized when we - * verified it just before writing writeSet2 onto the unisolated - * index, so this is not really testing anything. - */ + + /* + * Verify that the read-only view has not seen those changes. + * + * Note: If you used a historical read rather than a read-only + * tx then this is where the RWStore will throw out an exception + * because the recycled has reused some of the records + * associated with the historical revision of the BTree. + */ { final BTree readOnlyBTree = (BTree) store.getIndex(name, tx2); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 18:33:37
|
Revision: 3966 http://bigdata.svn.sourceforge.net/bigdata/?rev=3966&view=rev Author: thompsonbry Date: 2010-11-18 18:33:30 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Found a fence post in RWStore#checkDeferredFrees(). It needed to add a total of (2) to the release time. +1 since we want to read the delete blocks for the first commit record which we MAY NOT release. +1 for the inclusive upper bound semantics for the range scan of the index. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/ITransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -247,8 +247,9 @@ * derived from the timestamp of the earliest running transaction MINUS the * minimum release age and is updated whenever the earliest running * transaction terminates. This value is monotonically increasing. It will - * never be GT the last commit time. It will never be negative. It MAY be - * ZERO (0L) and will be ZERO (0L) on startup. + * always be LT the last non-zero last commit time. It will never be + * negative. It MAY be ZERO (0L) and will be ZERO (0L) on startup (unless + * explicitly set by the database to the last known commit time). */ public long getReleaseTime() throws IOException; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -31,9 +31,7 @@ import java.io.IOException; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import com.bigdata.service.AbstractFederation; import com.bigdata.service.AbstractTransactionService; @@ -444,15 +442,18 @@ } - /** - * Ignored since the {@link Journal} records the last commit time - * in its root blocks. + /* @todo This is only true for the WORM. For the RWStore, the release time + * will advance normally and things can get aged out of the store. */ - public void notifyCommit(long commitTime) { - - // NOP - - } +// /** +// * Ignored since the {@link Journal} records the last commit time +// * in its root blocks. +// */ +// public void notifyCommit(long commitTime) { +// +// // NOP +// +// } /* @todo This is only true for the WORM. For the RWStore, the release time * will advance normally and things can get aged out of the store. @@ -501,42 +502,42 @@ } - /** - * Invoke a method with the {@link AbstractTransactionService}'s lock held. - * - * @param <T> - * @param callable - * @return - * @throws Exception - */ - public <T> T callWithLock(final Callable<T> callable) throws Exception { - lock.lock(); - try { - return callable.call(); - } finally { - lock.unlock(); - } - } +// /** +// * Invoke a method with the {@link AbstractTransactionService}'s lock held. +// * +// * @param <T> +// * @param callable +// * @return +// * @throws Exception +// */ +// public <T> T callWithLock(final Callable<T> callable) throws Exception { +// lock.lock(); +// try { +// return callable.call(); +// } finally { +// lock.unlock(); +// } +// } +// +// /** +// * Invoke a method with the {@link AbstractTransactionService}'s lock held. +// * +// * But throw immediate exception if try fails. +// * +// * @param <T> +// * @param callable +// * @return +// * @throws Exception +// */ +// public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { +// if (!lock.tryLock(waitFor,unit)) { +// throw new RuntimeException("Lock not available"); +// } +// try { +// return callable.call(); +// } finally { +// lock.unlock(); +// } +// } - /** - * Invoke a method with the {@link AbstractTransactionService}'s lock held. - * - * But throw immediate exception if try fails. - * - * @param <T> - * @param callable - * @return - * @throws Exception - */ - public <T> T tryCallWithLock(final Callable<T> callable, long waitFor, TimeUnit unit) throws Exception { - if (!lock.tryLock(waitFor,unit)) { - throw new RuntimeException("Lock not available"); - } - try { - return callable.call(); - } finally { - lock.unlock(); - } - } - } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -216,6 +216,9 @@ * (but not yet committed). * <p> * Read-only mode. + * <p> + * Unit tests looking for persistent memory leaks (e.g., all allocated + * space can be reclaimed). */ public class RWStore implements IStore { @@ -2051,10 +2054,20 @@ // the effective release time. long latestReleasableTime = transactionService.getReleaseTime(); - // add one to give this inclusive upper bound semantics. + /* + * add one because we want to read the delete blocks for all commit + * points up to and including the first commit point that we may not + * release. + */ latestReleasableTime++; /* + * add one to give this inclusive upper bound semantics to the range + * scan. + */ + latestReleasableTime++; + + /* * Free deferrals. * * Note: This adds one to the lastDeferredReleaseTime to give Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 18:17:22 UTC (rev 3965) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 18:33:30 UTC (rev 3966) @@ -1120,7 +1120,7 @@ * (commitTime-1) then compute and set the new releaseTime. * <p> * Note: This method was historically part of {@link #notifyCommit(long)}. - * It was moved into its own method so it can be overriden for some unit + * It was moved into its own method so it can be overridden for some unit * tests. * * @throws IllegalMonitorStateException This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 18:17:28
|
Revision: 3965 http://bigdata.svn.sourceforge.net/bigdata/?rev=3965&view=rev Author: thompsonbry Date: 2010-11-18 18:17:22 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Synching code to Martyn looking at blob alloc/release/deferred frees Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:49:55 UTC (rev 3964) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 18:17:22 UTC (rev 3965) @@ -285,6 +285,7 @@ /** * Extended to shutdown the embedded transaction service. */ + @Override public void shutdown() { ((JournalTransactionService) getTransactionService()) @@ -297,6 +298,7 @@ /** * Extended to shutdown the embedded transaction service. */ + @Override public void shutdownNow() { ((JournalTransactionService) getTransactionService()) Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-18 17:49:55 UTC (rev 3964) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-18 18:17:22 UTC (rev 3965) @@ -851,15 +851,15 @@ try { - byte[] buf = new byte[1024 * 2048]; // 2Mb buffer of random data + final byte[] buf = new byte[1024 * 2048]; // 2Mb buffer of random data r.nextBytes(buf); - ByteBuffer bb = ByteBuffer.wrap(buf); + final ByteBuffer bb = ByteBuffer.wrap(buf); - RWStrategy bs = (RWStrategy) store + final RWStrategy bs = (RWStrategy) store .getBufferStrategy(); - RWStore rw = bs.getRWStore(); + final RWStore rw = bs.getRWStore(); long faddr = bs.write(bb); // rw.alloc(buf, buf.length); @@ -870,12 +870,16 @@ assertEquals(bb, rdBuf); // now delete the memory - bs.delete(faddr); // immediateFree! - + bs.delete(faddr); + + // verify immediateFree! + assertEquals(0L,bs.getPhysicalAddress(faddr)); + + // allocate another address, might (or might not) be the same. faddr = bs.write(bb); // rw.alloc(buf, buf.length); bb.position(0); - System.out.println("Now commit to disk"); + System.out.println("Now commit to disk (1)"); store.commit(); @@ -887,13 +891,26 @@ // now delete the memory bs.delete(faddr); + + // Must not have been immediately freed. + assertNotSame(0L, bs.getPhysicalAddress(faddr)); + + /* + * Commit before testing for deferred frees. Since there is a + * prior commit point, we are not allowed to immediately free + * any record from that commit point in order to preserve the + * consistency of the last commit point, so we have to commit + * first then test for deferred frees. + */ + System.out.println("Now commit to disk (2)"); - // since deferred frees, we must commit in order to ensure the - // address in invalid, indicating it is available for store.commit(); - rw.checkDeferredFrees(true, store); - + // Request release of deferred frees. + rw.checkDeferredFrees(true/* freeNow */, store); + + assertEquals(0L, bs.getPhysicalAddress(faddr)); + try { rdBuf = bs.read(faddr); // should fail with illegal argument throw new RuntimeException("Fail"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 17:50:01
|
Revision: 3964 http://bigdata.svn.sourceforge.net/bigdata/?rev=3964&view=rev Author: thompsonbry Date: 2010-11-18 17:49:55 +0000 (Thu, 18 Nov 2010) Log Message: ----------- +1 for rnd.next(n) to avoid zero size allocs. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-18 17:49:36 UTC (rev 3963) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-11-18 17:49:55 UTC (rev 3964) @@ -415,7 +415,7 @@ ArrayList<Integer> sizes = new ArrayList<Integer>(); TreeMap<Long, Integer> paddrs = new TreeMap<Long, Integer>(); for (int i = 0; i < 100000; i++) { - int s = r.nextInt(250); + int s = r.nextInt(250)+1; sizes.add(s); int a = rw.alloc(s, null); long pa = rw.physicalAddress(a); @@ -424,7 +424,7 @@ } for (int i = 0; i < 50; i++) { - int s = r.nextInt(500); + int s = r.nextInt(500)+1; sizes.add(s); int a = rw.alloc(s, null); long pa = rw.physicalAddress(a); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 17:49:42
|
Revision: 3963 http://bigdata.svn.sourceforge.net/bigdata/?rev=3963&view=rev Author: thompsonbry Date: 2010-11-18 17:49:36 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Added trap for non-positive sizes on alloc. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 17:44:52 UTC (rev 3962) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 17:49:36 UTC (rev 3963) @@ -461,8 +461,13 @@ * must therefore handle the */ public int alloc(final RWStore store, final int size, final IAllocationContext context) { - int addr = -1; + if (size <= 0) + throw new IllegalArgumentException( + "Allocate requires positive size, got: " + size); + + int addr = -1; + final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); int count = -1; while (addr == -1 && iter.hasNext()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 17:44:59
|
Revision: 3962 http://bigdata.svn.sourceforge.net/bigdata/?rev=3962&view=rev Author: thompsonbry Date: 2010-11-18 17:44:52 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Integrated the RWStore with the AbstractTransactionService so that it will defer frees if there is an open tx. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -238,6 +238,23 @@ final JournalTransactionService abstractTransactionService = new JournalTransactionService( properties, this) { + + { + + final long lastCommitTime = Journal.this.getLastCommitTime(); + + if (lastCommitTime != 0L) { + + /* + * Notify the transaction service on startup so it can set + * the effective release time based on the last commit time + * for the store. + */ + updateReleaseTimeForBareCommit(lastCommitTime); + + } + + } protected void activateTx(final TxState state) { final IBufferStrategy bufferStrategy = Journal.this.getBufferStrategy(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/JournalTransactionService.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -127,7 +127,7 @@ } - protected long findNextCommitTime(long commitTime) { + protected long findNextCommitTime(final long commitTime) { final ICommitRecord commitRecord = journal.getCommitRecordIndex() .findNext(commitTime); @@ -454,17 +454,20 @@ } - /** - * Always returns ZERO (0L) since history can not be released on the - * {@link Journal}. + /* @todo This is only true for the WORM. For the RWStore, the release time + * will advance normally and things can get aged out of the store. */ - @Override - public long getReleaseTime() { +// /** +// * Always returns ZERO (0L) since history can not be released on the +// * {@link Journal}. +// */ +// @Override +// public long getReleaseTime() { +// +// return 0L; +// +// } - return 0L; - - } - /** * Throws exception since distributed transactions are not used for a single * {@link Journal}. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -1442,8 +1442,8 @@ alwaysDefer = context == null && !m_contexts.isEmpty(); if (alwaysDefer) if (log.isDebugEnabled()) - log.debug("Should defer " + physicalAddress(addr)); - if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { + log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { deferFree(addr, sze); } else { immediateFree(addr, sze); @@ -1505,8 +1505,10 @@ if (alloc == null) { throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); } - final long pa = alloc.getPhysicalAddress(addrOffset); - alloc.free(addr, sze); + final long pa = alloc.getPhysicalAddress(addrOffset); + if (log.isTraceEnabled()) + log.trace("Freeing allocation at " + addr + ", physical address: " + pa); + alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! // the allocation lock protects against a concurrent re-allocation // of the address before the cache has been cleared @@ -2038,16 +2040,16 @@ final JournalTransactionService transactionService = (JournalTransactionService) journal .getLocalTransactionManager().getTransactionService(); - // the previous commit point. - long latestReleasableTime = journal.getLastCommitTime(); - - if (latestReleasableTime == 0L) { - // Nothing committed. - return; - } +// // the previous commit point. +// long lastCommitTime = journal.getLastCommitTime(); +// +// if (lastCommitTime == 0L) { +// // Nothing committed. +// return; +// } - // subtract out the retention period. - latestReleasableTime -= transactionService.getMinReleaseAge(); + // the effective release time. + long latestReleasableTime = transactionService.getReleaseTime(); // add one to give this inclusive upper bound semantics. latestReleasableTime++; @@ -4219,6 +4221,8 @@ m_allocationLock.lock(); try { m_activeTxCount++; + if(log.isInfoEnabled()) + log.info("#activeTx="+m_activeTxCount); } finally { m_allocationLock.unlock(); } @@ -4228,6 +4232,8 @@ m_allocationLock.lock(); try { m_activeTxCount--; + if(log.isInfoEnabled()) + log.info("#activeTx="+m_activeTxCount); } finally { m_allocationLock.unlock(); } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 17:12:23 UTC (rev 3961) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/service/AbstractTransactionService.java 2010-11-18 17:44:52 UTC (rev 3962) @@ -1128,9 +1128,11 @@ */ protected void updateReleaseTimeForBareCommit(final long commitTime) { - if(!lock.isHeldByCurrentThread()) - throw new IllegalMonitorStateException(); - +// if(!lock.isHeldByCurrentThread()) +// throw new IllegalMonitorStateException(); + + lock.lock(); + try { synchronized (startTimeIndex) { if (this.releaseTime < (commitTime - 1) @@ -1159,6 +1161,9 @@ } } + } finally { + lock.unlock(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 17:12:30
|
Revision: 3961 http://bigdata.svn.sourceforge.net/bigdata/?rev=3961&view=rev Author: thompsonbry Date: 2010-11-18 17:12:23 +0000 (Thu, 18 Nov 2010) Log Message: ----------- deactivateTx() was incrementing rather than decrementing. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:10:17 UTC (rev 3960) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:12:23 UTC (rev 3961) @@ -4227,7 +4227,7 @@ public void deactivateTx() { m_allocationLock.lock(); try { - m_activeTxCount++; + m_activeTxCount--; } finally { m_allocationLock.unlock(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 17:10:24
|
Revision: 3960 http://bigdata.svn.sourceforge.net/bigdata/?rev=3960&view=rev Author: thompsonbry Date: 2010-11-18 17:10:17 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Hacked in an intgration with the RWStore to make it aware of the #of active transactions. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 16:50:22 UTC (rev 3959) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 17:10:17 UTC (rev 3960) @@ -237,8 +237,26 @@ protected AbstractLocalTransactionManager newLocalTransactionManager() { final JournalTransactionService abstractTransactionService = new JournalTransactionService( - properties, this).start(); + properties, this) { + + protected void activateTx(final TxState state) { + final IBufferStrategy bufferStrategy = Journal.this.getBufferStrategy(); + if(bufferStrategy instanceof RWStrategy) { + ((RWStrategy)bufferStrategy).getRWStore().activateTx(); + } + super.activateTx(state); + } + protected void deactivateTx(final TxState state) { + super.deactivateTx(state); + final IBufferStrategy bufferStrategy = Journal.this.getBufferStrategy(); + if(bufferStrategy instanceof RWStrategy) { + ((RWStrategy)bufferStrategy).getRWStore().deactivateTx(); + } + } + + }.start(); + return new AbstractLocalTransactionManager() { public AbstractTransactionService getTransactionService() { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 16:50:22 UTC (rev 3959) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 17:10:17 UTC (rev 3960) @@ -444,6 +444,14 @@ // * the same txReleaseTime. // private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block private final long m_minReleaseAge; + + /** + * The #of open transactions (read-only or read-write). + * + * This is guarded by the {@link #m_allocationLock}. + */ + private int m_activeTxCount = 0; + private volatile long m_lastDeferredReleaseTime = 0L; // private final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); private final PSOutputStream m_deferredFreeOut; @@ -1428,7 +1436,8 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - boolean alwaysDefer = m_minReleaseAge > 0L; + boolean alwaysDefer = m_minReleaseAge > 0L + || m_activeTxCount > 0; if (!alwaysDefer) alwaysDefer = context == null && !m_contexts.isEmpty(); if (alwaysDefer) @@ -4206,4 +4215,22 @@ return m_storageStats; } + public void activateTx() { + m_allocationLock.lock(); + try { + m_activeTxCount++; + } finally { + m_allocationLock.unlock(); + } + } + + public void deactivateTx() { + m_allocationLock.lock(); + try { + m_activeTxCount++; + } finally { + m_allocationLock.unlock(); + } + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 16:50:22 UTC (rev 3959) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 17:10:17 UTC (rev 3960) @@ -183,10 +183,12 @@ // First, remove the existing tuples. removeWriteSet(unisolatedBTree); + store.commit(); + /* * Verify that the read-only view has not seen those changes. */ - { + if(false) { final BTree readOnlyBTree = (BTree) store.getIndex(name, tx2); verifyWriteSet1(readOnlyBTree); @@ -198,6 +200,8 @@ */ writeSet2(unisolatedBTree); + store.commit(); + /* * Verify that the read-only view has not seen those changes. * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 16:50:29
|
Revision: 3959 http://bigdata.svn.sourceforge.net/bigdata/?rev=3959&view=rev Author: thompsonbry Date: 2010-11-18 16:50:22 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Added logic to notice when MIN_RELEASE_AGE is non-zero and always defer frees. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 15:29:14 UTC (rev 3958) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 16:50:22 UTC (rev 3959) @@ -50,6 +50,7 @@ import com.bigdata.btree.ITupleIterator; import com.bigdata.btree.IndexMetadata; import com.bigdata.btree.BTree.Counter; +import com.bigdata.config.LongValidator; import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; import com.bigdata.counters.striped.StripedCounters; @@ -73,6 +74,7 @@ import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.rawstore.IRawStore; +import com.bigdata.service.AbstractTransactionService; import com.bigdata.util.ChecksumUtility; /** @@ -441,6 +443,7 @@ // * serves as a BLOB header when there is more than a single entry with // * the same txReleaseTime. // private static final int MAX_DEFERRED_FREE = 4094; // fits in 16k block + private final long m_minReleaseAge; private volatile long m_lastDeferredReleaseTime = 0L; // private final ArrayList<Integer> m_currentTxnFreeList = new ArrayList<Integer>(); private final PSOutputStream m_deferredFreeOut; @@ -525,6 +528,14 @@ if (fileMetadata == null) throw new IllegalArgumentException(); + this.m_minReleaseAge = LongValidator.GTE_ZERO.parse( + AbstractTransactionService.Options.MIN_RELEASE_AGE, + AbstractTransactionService.Options.DEFAULT_MIN_RELEASE_AGE); + + if (log.isInfoEnabled()) + log.info(AbstractTransactionService.Options.MIN_RELEASE_AGE + "=" + + m_minReleaseAge); + cDefaultMetaBitsSize = Integer.valueOf(fileMetadata.getProperty( Options.META_BITS_SIZE, Options.DEFAULT_META_BITS_SIZE)); @@ -1404,19 +1415,26 @@ freeBlob(addr, sze, context); } else { final Allocator alloc = getBlockByAddress(addr); - /* - * There are a few conditions here. If the context owns the - * allocator and the allocation was made by this context then - * it can be freed immediately. - * The problem comes when the context is null and the allocator - * is NOT owned, BUT there are active AllocationContexts, in this - * situation, the free must ALWAYS be deferred. - */ - final boolean alwaysDefer = context == null && m_contexts.size() > 0; - if (alwaysDefer) + /* + * There are a few conditions here. If the context owns the + * allocator and the allocation was made by this context then it + * can be freed immediately. The problem comes when the context + * is null and the allocator is NOT owned, BUT there are active + * AllocationContexts, in this situation, the free must ALWAYS + * be deferred. + * + * FIXME We need unit tests when MIN_RELEASE_AGE is GT ZERO. + * + * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND + * there are open read-only transactions. + */ + boolean alwaysDefer = m_minReleaseAge > 0L; + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + if (alwaysDefer) if (log.isDebugEnabled()) log.debug("Should defer " + physicalAddress(addr)); - if (/*alwaysDefer ||*/ !alloc.canImmediatelyFree(addr, sze, context)) { + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { deferFree(addr, sze); } else { immediateFree(addr, sze); @@ -3487,6 +3505,12 @@ private ContextAllocation establishContextAllocation( final IAllocationContext context) { + + /* + * The allocation lock MUST be held to make changes in the membership of + * m_contexts atomic with respect to free(). + */ + assert m_allocationLock.isHeldByCurrentThread(); ContextAllocation ret = m_contexts.get(context); @@ -3500,8 +3524,9 @@ } - log.warn("Context: ncontexts=" + m_contexts.size() + ", context=" - + context); + if (log.isInfoEnabled()) + log.info("Context: ncontexts=" + m_contexts.size() + + ", context=" + context); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 15:29:20
|
Revision: 3958 http://bigdata.svn.sourceforge.net/bigdata/?rev=3958&view=rev Author: thompsonbry Date: 2010-11-18 15:29:14 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Updated javadoc for showAllocators. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 14:32:43 UTC (rev 3957) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 15:29:14 UTC (rev 3958) @@ -2521,16 +2521,18 @@ * <dl> * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> - * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> - * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> + * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> - * <dt>%SlotWaste</dt><dd>BytesAppData/(SlotsInUse*AllocatorSize) : How well the application data fits in the slots.</dd> - * <dt>%StoreWaste</dt><dd>BytesAppData/BytesReserved : How much of the reserved space is in use by application data.</dd> - * <dt>%AppData</dt><dd>BytesAppData/Sum(BytesAppData). This is how much of your data is stored by each allocator.</dd> - * <dt>%BackingFile</dt><dd>BytesReserved/Sum(BytesReserved). This is how much of the backing file is reserved for each allocator.</dd> + * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> + * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> + * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> + * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> * </dl> */ public void showAllocators(final StringBuilder str) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-18 14:32:50
|
Revision: 3957 http://bigdata.svn.sourceforge.net/bigdata/?rev=3957&view=rev Author: martyncutcher Date: 2010-11-18 14:32:43 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Integrate StorageStats collection Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 14:32:08 UTC (rev 3956) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-18 14:32:43 UTC (rev 3957) @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import com.bigdata.rwstore.RWStore.AllocationStats; +import com.bigdata.rwstore.StorageStats.Bucket; import com.bigdata.util.ChecksumUtility; /** @@ -52,6 +53,8 @@ */ volatile private int m_diskAddr; volatile private int m_index; + + Bucket m_statsBucket = null; public void setIndex(final int index) { final AllocBlock fb = (AllocBlock) m_allocBlocks.get(0); @@ -109,7 +112,7 @@ final int bit = offset % allocBlockRange; if (RWStore.tstBit(block.m_bits, bit)) { - return RWStore.convertAddr(block.m_addr) + ((long)m_size * bit); + return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; } @@ -428,6 +431,10 @@ } else { m_freeTransients++; } + + if (m_statsBucket != null) { + m_statsBucket.delete(size); + } return true; } else if (addr >= m_startAddr && addr < m_endAddr) { @@ -463,7 +470,11 @@ final AllocBlock block = iter.next(); if (block.m_addr == 0) { - int blockSize = 32 * m_bitSize * m_size; + int blockSize = 32 * m_bitSize; + if (m_statsBucket != null) { + m_statsBucket.addSlots(blockSize); + } + blockSize *= m_size; blockSize >>= RWStore.ALLOCATION_SCALEUP; block.m_addr = store.allocBlock(blockSize); @@ -498,6 +509,10 @@ addr += (count * 32 * m_bitSize); final int value = -((m_index << RWStore.OFFSET_BITS) + addr); + + if (m_statsBucket != null) { + m_statsBucket.allocate(size); + } return value; } else { @@ -651,4 +666,8 @@ return false; } } + + public void setBucketStats(Bucket b) { + m_statsBucket = b; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 14:32:08 UTC (rev 3956) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-18 14:32:43 UTC (rev 3957) @@ -344,7 +344,7 @@ * @todo examine concurrency and lock usage for {@link #m_alloc} and the * rest of these lists. */ - private final ArrayList<Allocator> m_allocs; + private final ArrayList<FixedAllocator> m_allocs; /** * A fixed length array of lists of free {@link FixedAllocator}s with one @@ -453,6 +453,12 @@ private volatile BufferedWrite m_bufferedWrite; + /** + * Our StoreageStats objects + */ + private StorageStats m_storageStats; + private long m_storageStatsAddr = 0; + /** * <code>true</code> iff the backing store is open. */ @@ -556,7 +562,7 @@ m_commitList = new ArrayList<Allocator>(); - m_allocs = new ArrayList<Allocator>(); + m_allocs = new ArrayList<FixedAllocator>(); // m_freeBlobs = new ArrayList<BlobAllocator>(); @@ -607,6 +613,8 @@ m_maxFixedAlloc = m_allocSizes[m_allocSizes.length-1]*64; m_minFixedAlloc = m_allocSizes[0]*64; + + m_storageStats = new StorageStats(m_allocSizes); // commitChanges(null); } else { @@ -615,6 +623,20 @@ m_maxFixedAlloc = m_allocSizes[m_allocSizes.length-1]*64; m_minFixedAlloc = m_allocSizes[0]*64; + + if (m_storageStatsAddr != 0) { + long statsAddr = m_storageStatsAddr >> 16; + int statsLen = ((int) m_storageStatsAddr) & 0xFFFF; + byte[] stats = new byte[statsLen + 4]; // allow for checksum + getData(statsAddr, stats); + DataInputStream instr = new DataInputStream(new ByteArrayInputStream(stats)); + m_storageStats = new StorageStats(instr); + + for (FixedAllocator fa: m_allocs) { + m_storageStats.register(fa); + } + } + } final int maxBlockLessChk = m_maxFixedAlloc-4; @@ -806,8 +828,8 @@ cDefaultMetaBitsSize = strBuf.readInt(); final int allocBlocks = strBuf.readInt(); - strBuf.readInt(); // reserved5 - strBuf.readInt(); // reserved6 + m_storageStatsAddr = strBuf.readLong(); + strBuf.readInt(); // reserved7 strBuf.readInt(); // reserved8 strBuf.readInt(); // reserved9 @@ -951,7 +973,7 @@ final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); final int allocSize = strBuf.readInt(); // if Blob < 0 - final Allocator allocator; + final FixedAllocator allocator; final ArrayList<? extends Allocator> freeList; assert allocSize > 0; @@ -970,6 +992,10 @@ allocator.setFreeList(freeList); m_allocs.add(allocator); + + if (m_storageStats != null) { + m_storageStats.register(allocator); + } } } @@ -1016,6 +1042,10 @@ m_allocs.add(allocator); + if (m_storageStats != null) { + m_storageStats.register(allocator); + } + return allocator; } else { return list.remove(0); @@ -1370,7 +1400,7 @@ } m_allocationLock.lock(); try { - if (sze > m_maxFixedAlloc) { + if (sze > m_maxFixedAlloc-4) { freeBlob(addr, sze, context); } else { final Allocator alloc = getBlockByAddress(addr); @@ -1402,7 +1432,11 @@ if (sze < (m_maxFixedAlloc-4)) throw new IllegalArgumentException("Unexpected address size"); - final int alloc = m_maxFixedAlloc-4; + if (m_storageStats != null) { + m_storageStats.deleteBlob(sze); + } + + final int alloc = m_maxFixedAlloc-4; final int blcks = (alloc - 1 + sze)/alloc; // read in header block, then free each reference @@ -1413,9 +1447,11 @@ new ByteArrayInputStream(hdr, 0, hdr.length-4) ); try { final int allocs = instr.readInt(); + int rem = sze; for (int i = 0; i < allocs; i++) { final int nxt = instr.readInt(); - free(nxt, m_maxFixedAlloc); + free(nxt, rem < alloc ? rem : alloc); + rem -= alloc; } free(hdr_addr, hdr.length); @@ -1439,6 +1475,9 @@ try { final Allocator alloc = getBlockByAddress(addr); final int addrOffset = getOffset(addr); + if (alloc == null) { + throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); + } final long pa = alloc.getPhysicalAddress(addrOffset); alloc.free(addr, sze); // must clear after free in case is a blobHdr that requires reading! @@ -1501,7 +1540,7 @@ m_allocationLock.lock(); try { try { - final Allocator allocator; + final FixedAllocator allocator; final int i = fixedAllocatorIndex(size); if (context != null) { allocator = establishContextAllocation(context).getFreeFixed(i); @@ -1520,6 +1559,10 @@ log.trace("New FixedAllocator for " + block); m_allocs.add(allocator); + + if (m_storageStats != null) { + m_storageStats.register(allocator, true); + } } else { // Verify free list only has allocators with free bits if (log.isDebugEnabled()){ @@ -1533,7 +1576,7 @@ tsti++; } } - allocator = (Allocator) list.get(0); + allocator = list.get(0); } } @@ -1611,6 +1654,10 @@ if (log.isTraceEnabled()) log.trace("BLOB ALLOC: " + size); + + if (m_storageStats != null) { + m_storageStats.allocateBlob(size); + } final PSOutputStream psout = PSOutputStream.getNew(this, m_maxFixedAlloc, context); @@ -1802,10 +1849,9 @@ str.writeInt(cVersion); str.writeLong(m_lastDeferredReleaseTime); str.writeInt(cDefaultMetaBitsSize); - str.writeInt(m_allocSizes.length); - - str.writeInt(0); // reserved5 - str.writeInt(0); // reserved6 + str.writeInt(m_allocSizes.length); + str.writeLong(m_storageStatsAddr); + str.writeInt(0); // reserved7 str.writeInt(0); // reserved8 str.writeInt(0); // reserved9 @@ -1849,7 +1895,19 @@ try { checkDeferredFrees(true, journal); // free now if possible - + + // free old storageStatsAddr + if (m_storageStatsAddr != 0) { + int len = (int) (m_storageStatsAddr & 0xFFFF); + int addr = (int) (m_storageStatsAddr >> 16); + immediateFree(addr, len); + } + if (m_storageStats != null) { + byte[] buf = m_storageStats.getData(); + long addr = alloc(buf, buf.length, null); + m_storageStatsAddr = (addr << 16) + buf.length; + } + // Allocate storage for metaBits final long oldMetaBits = m_metaBitsAddr; final int oldMetaBitsSize = (m_metaBits.length + m_allocSizes.length + 1) * 4; @@ -1859,9 +1917,13 @@ if (physicalAddress(m_metaBitsAddr) == 0) { throw new IllegalStateException("Returned MetaBits Address not valid!"); } + + // TODO: assert that m_deferredFreeOut is empty! + assert m_deferredFreeOut.getBytesWritten() == 0; // Call immediateFree - no need to defer freeof metaBits, this // has to stop somewhere! + // No more allocations must be made immediateFree((int) oldMetaBits, oldMetaBitsSize); // save allocation headers @@ -1907,7 +1969,8 @@ // m_commitCallback.commitComplete(); // } - m_reopener.reopenChannel().force(false); // TODO, check if required! + // The Journal handles the force in doubleSync + // m_reopener.reopenChannel().force(false); // TODO, check if required! } catch (IOException e) { throw new StorageTerminalError("Unable to commit transaction", e); } finally { @@ -2476,7 +2539,7 @@ stats[i] = new AllocationStats(m_allocSizes[i]*64); } - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { Allocator alloc = (Allocator) allocs.next(); alloc.appendShortStats(str, stats); @@ -2612,7 +2675,7 @@ return getBlock(addr); } - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); Allocator alloc = null; while (allocs.hasNext()) { @@ -3044,7 +3107,7 @@ */ public int getFixedAllocatorCount() { int fixed = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { if (allocs.next() instanceof FixedAllocator) { fixed++; @@ -3059,7 +3122,7 @@ */ public int getAllocatedBlocks() { int allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { final Allocator alloc = allocs.next(); if (alloc instanceof FixedAllocator) { @@ -3075,12 +3138,10 @@ */ public long getFileStorage() { long allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { - final Allocator alloc = allocs.next(); - if (alloc instanceof FixedAllocator) { - allocated += ((FixedAllocator) alloc).getFileStorage(); - } + final FixedAllocator alloc = allocs.next(); + allocated += ((FixedAllocator) alloc).getFileStorage(); } return allocated; @@ -3093,7 +3154,7 @@ */ public long getAllocatedSlots() { long allocated = 0; - final Iterator<Allocator> allocs = m_allocs.iterator(); + final Iterator<FixedAllocator> allocs = m_allocs.iterator(); while (allocs.hasNext()) { final Allocator alloc = allocs.next(); if (alloc instanceof FixedAllocator) { @@ -3221,7 +3282,8 @@ immediateFree(-nxtAddr, bloblen); } else { - immediateFree(nxtAddr, 0); // size ignored for FreeAllocators + // The lack of size messes with the stats + immediateFree(nxtAddr, 1); // size ignored for FixedAllocators } nxtAddr = strBuf.readInt(); @@ -4112,5 +4174,9 @@ public int getMaxBlobSize() { return m_maxBlobAllocSize-4; // allow for checksum } + + public StorageStats getStorageStats() { + return m_storageStats; + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-11-18 14:32:15
|
Revision: 3956 http://bigdata.svn.sourceforge.net/bigdata/?rev=3956&view=rev Author: martyncutcher Date: 2010-11-18 14:32:08 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Integrate StorageStats collection object Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-11-18 14:32:08 UTC (rev 3956) @@ -0,0 +1,399 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.rwstore; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; + +/** + * Maintains stats on the RWStore allocations, useful for tuning Allocator + * sizes and tracking store efficiency. + * + * It can also track reallocation patterns that are lost in static snapshots + * of current usage. + * + * Stats are retained on external requests and also on internal allocator + * use. + * + * totalSlots represents the total reserved slots in all the AllocationBlocks + * for the associated FixedAllocators. + * + * slotAllocations is the total of all allocations made + * + * slotDeletes is the total of all slot deletions + * + * Therefore the total of currently allocated slots is + * slotAllocations - slotDeletes + * + * sizeAllocations is the total in bytes of the actual data stored in the + * slots + * + * sizeDeletes is the total in bytes of the actual data that has been deleted + * + * Therefore the size of the total in bytes stored in currently allocated slots is + * sizeAllocations - sizeDeletes + * + * @author Martyn Cutcher + * + */ +public class StorageStats { + final int m_maxFixed; + + public class BlobBucket { + final int m_size; + long m_allocations; + long m_deletes; + + public BlobBucket(final int size) { + m_size = size; + } + public BlobBucket(DataInputStream instr) throws IOException { + m_size = instr.readInt(); + m_allocations = instr.readLong(); + m_deletes = instr.readLong(); + } + public void write(DataOutputStream outstr) throws IOException { + outstr.writeInt(m_size); + outstr.writeLong(m_allocations); + outstr.writeLong(m_deletes); + } + public void delete() { + m_deletes++; + } + public void allocate() { + m_allocations++; + } + } + + public class Bucket { + final int m_size; + int m_allocators; + long m_totalSlots; + long m_slotAllocations; + long m_slotDeletes; + long m_sizeAllocations; + long m_sizeDeletes; + + public Bucket(final int size) { + m_size = size; + } + public Bucket(DataInputStream instr) throws IOException { + m_size = instr.readInt(); + m_allocators = instr.readInt(); + m_slotAllocations = instr.readLong(); + m_slotDeletes = instr.readLong(); + m_totalSlots = instr.readLong(); + m_sizeAllocations = instr.readLong(); + m_sizeDeletes = instr.readLong(); + } + public void write(DataOutputStream outstr) throws IOException { + outstr.writeInt(m_size); + outstr.writeInt(m_allocators); + outstr.writeLong(m_slotAllocations); + outstr.writeLong(m_slotDeletes); + outstr.writeLong(m_totalSlots); + outstr.writeLong(m_sizeAllocations); + outstr.writeLong(m_sizeDeletes); + } + public void delete(int sze) { + if (sze < 0) + throw new IllegalArgumentException("delete requires positive size, got: " + sze); + + if (sze > m_size) { + // sze = ((sze - 1 + m_maxFixed)/ m_maxFixed) * 4; // Blob header + + throw new IllegalArgumentException("Deletion of address with size greater than slot - " + sze + " > " + m_size); + } + + m_sizeDeletes += sze; + m_slotDeletes++; + } + public void allocate(int sze) { + if (sze <= 0) + throw new IllegalArgumentException("allocate requires positive size, got: " + sze); + + m_sizeAllocations += sze; + m_slotAllocations++; + } + + public void addSlots(int slots) { + m_totalSlots += slots; + } + + public long usedSlots() { + return m_slotAllocations - m_slotDeletes; + } + + public long usedStore() { + return m_sizeAllocations - m_sizeDeletes; + } + + // return as percentage + public float slotWaste() { + if (usedStore() == 0) + return 0.0f; + + BigDecimal size = new BigDecimal(m_size * usedSlots()); + BigDecimal store = new BigDecimal(100 * usedStore()); + store = store.divide(size, 2, RoundingMode.HALF_UP); + BigDecimal total = new BigDecimal(100); + + return total.subtract(store).floatValue(); + } + public float totalWaste() { + if (usedStore() == 0) + return 0.0f; + + BigDecimal size = new BigDecimal(m_size * m_totalSlots); + BigDecimal store = new BigDecimal(100 * usedStore()); + store = store.divide(size, 2, RoundingMode.HALF_UP); + BigDecimal total = new BigDecimal(100); + + return total.subtract(store).floatValue(); + } + public long reservedStore() { + return m_size * m_totalSlots; + } + public void addAlocator() { + m_allocators++; + } + } + + final ArrayList<Bucket> m_buckets; + final ArrayList<BlobBucket> m_blobBuckets; + + // store total bytes allocated/deleted as blobs + long m_blobAllocation; + long m_blobDeletion; + + /** + * + * @param buckets - the slot sizes used by the FixedAllocators + */ + public StorageStats(final int[] buckets) { + m_buckets = new ArrayList<Bucket>(); + for (int i = 0; i < buckets.length; i++) { + m_buckets.add(new Bucket(buckets[i]*64)); // slot sizes are 64 multiples + } + // last fixed allocator needed to compute BlobBuckets + m_maxFixed = m_buckets.get(buckets.length-1).m_size; + m_blobBuckets = new ArrayList<BlobBucket>(); + int curInc = m_maxFixed; + int nxtBlob = m_maxFixed; + final int cMaxBucket = 64 * 1024 * 1024; // 64 Mb + while (nxtBlob < cMaxBucket) { + nxtBlob += curInc; + m_blobBuckets.add(new BlobBucket(nxtBlob)); + curInc *= 2; + } + m_blobBuckets.add(new BlobBucket(Integer.MAX_VALUE)); // catch all + } + + /** + * + * @param instr restore from reopen + * + * @throws IOException + */ + public StorageStats(final DataInputStream instr) throws IOException { + m_buckets = new ArrayList<Bucket>(); + int nbuckets = instr.readInt(); + for (int i = 0; i < nbuckets; i++) { + m_buckets.add(new Bucket(instr)); + } + m_maxFixed = m_buckets.get(m_buckets.size()-1).m_size; + m_blobBuckets = new ArrayList<BlobBucket>(); + int nblobbuckets = instr.readInt(); + for (int i = 0; i < nblobbuckets; i++) { + m_blobBuckets.add(new BlobBucket(instr)); + } + m_blobAllocation = instr.readLong(); + m_blobDeletion = instr.readLong(); + } + + public byte[] getData() throws IOException { + ByteArrayOutputStream outb = new ByteArrayOutputStream(); + DataOutputStream outd = new DataOutputStream(outb); + + outd.writeInt(m_buckets.size()); + + for (Bucket b : m_buckets) { + b.write(outd); + } + + outd.writeInt(m_blobBuckets.size()); + + for (BlobBucket b : m_blobBuckets) { + b.write(outd); + } + + outd.writeLong(m_blobAllocation); + outd.writeLong(m_blobDeletion); + + outd.flush(); + + return outb.toByteArray(); + } + + public void allocateBlob(int sze) { + m_blobAllocation += sze; + + // increment blob bucket + findBlobBucket(sze).allocate(); + } + + public void deleteBlob(int sze) { + m_blobDeletion -= sze; + + // decrement blob bucket + findBlobBucket(sze).delete(); + } + + private BlobBucket findBlobBucket(final int sze) { + for (BlobBucket b : m_blobBuckets) { + if (sze < b.m_size) + return b; + } + + throw new IllegalStateException("BlobBuckets have not been correctly set"); + } + + public void register(FixedAllocator alloc, boolean init) { + int block = alloc.getBlockSize(); + for (Bucket b : m_buckets) { + if (b.m_size == block) { + alloc.setBucketStats(b); + if (init) + b.addAlocator(); + return; + } + } + + throw new IllegalArgumentException("FixedAllocator with unexpected block size"); + } + + public void register(FixedAllocator alloc) { + register(alloc, false); + } + + public void showStats(StringBuilder str) { + str.append("\n-------------------------\n"); + str.append("RWStore Allocator Summary\n"); + str.append("-------------------------\n"); + str.append(padRight("AllocatorSize", 16)); + str.append(padLeft("AllocatorCount", 16)); + str.append(padLeft("SlotsAllocated", 16)); + str.append(padLeft("SlotsRecycled", 16)); + str.append(padLeft("SlotsInUse", 16)); + str.append(padLeft("SlotsReserved", 16)); + str.append(padLeft("BytesReserved", 16)); + str.append(padLeft("BytesAppData", 16)); + str.append(padLeft("%SlotWaste", 16)); + str.append(padLeft("%StoreWaste", 16)); + str.append(padLeft("%AppData", 16)); + str.append(padLeft("%StoreFile", 16)); + str.append("\n"); + + long totalAppData = 0; + long totalFileStore = 0; + for (Bucket b: m_buckets) { + totalAppData += b.usedStore(); + totalFileStore += b.reservedStore(); + } + for (Bucket b: m_buckets) { + str.append(padRight("" + b.m_size, 16)); + str.append(padLeft("" + b.m_allocators, 16)); + str.append(padLeft("" + b.m_slotAllocations, 16)); + str.append(padLeft("" + b.m_slotDeletes, 16)); + str.append(padLeft("" + b.usedSlots(), 16)); + str.append(padLeft("" + b.m_totalSlots, 16)); + str.append(padLeft("" + b.reservedStore(), 16)); + str.append(padLeft("" + b.usedStore(), 16)); + str.append(padLeft("" + b.slotWaste() + "%", 16)); + str.append(padLeft("" + b.totalWaste() + "%", 16)); + str.append(padLeft("" + dataPercent(b.usedStore(), totalAppData) + "%", 16)); + str.append(padLeft("" + dataPercent(b.reservedStore(), totalFileStore) + "%", 16)); + str.append("\n"); + } + + str.append("\n-------------------------\n"); + str.append("BLOBS\n"); + str.append("-------------------------\n"); + str.append(padRight("Bucket", 10)); + str.append(padLeft("Allocations", 12)); + str.append(padLeft("Deletes", 12)); + str.append(padLeft("Current", 12)); + str.append("\n"); + + for (BlobBucket b: m_blobBuckets) { + str.append(padRight("" + (b.m_size/1024) + "K", 10)); + str.append(padLeft("" + b.m_allocations, 12)); + str.append(padLeft("" + b.m_deletes, 12)); + str.append(padLeft("" + (b.m_allocations - b.m_deletes), 12)); + str.append("\n"); + } + + } + + private float dataPercent(long usedData, long totalData) { + BigDecimal used = new BigDecimal(100 * usedData); + BigDecimal total = new BigDecimal(totalData); + + return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); + } + + public static String padLeft(String str, int minlen) { + if (str.length() >= minlen) + return str; + + StringBuffer out = new StringBuffer(); + int pad = minlen - str.length(); + while (pad-- > 0) { + out.append(' '); + } + out.append(str); + + return out.toString(); + } + + public static String padRight(String str, int minlen) { + if (str.length() >= minlen) + return str; + + StringBuffer out = new StringBuffer(); + out.append(str); + int pad = minlen - str.length(); + while (pad-- > 0) { + out.append(' '); + } + + return out.toString(); + } +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 14:22:13
|
Revision: 3955 http://bigdata.svn.sourceforge.net/bigdata/?rev=3955&view=rev Author: thompsonbry Date: 2010-11-18 14:22:07 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Added an example using read-only transactions in combination with unisolated index writes. This works with the RWStore or the WORM, but the point is to illustrate how to safely interact with a zero retention configuration of the RWStore. Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java Added: branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/samples/com/bigdata/samples/btree/JournalReadOnlyTxExample.java 2010-11-18 14:22:07 UTC (rev 3955) @@ -0,0 +1,325 @@ +/* + * Created on Jul 10, 2009 + */ + +package com.bigdata.samples.btree; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.UUID; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.bigdata.btree.BTree; +import com.bigdata.btree.IndexMetadata; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.service.AbstractTransactionService; + +/** + * This example illustrates how to safely interact with a zero retention + * configuration of the RWStore using a mixture of read-only transactions + * and unisolated index writes. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: JournalTxExample.java 2265 2009-10-26 12:51:06Z thompsonbry $ + */ +public class JournalReadOnlyTxExample { + + private static final Logger log = Logger.getLogger(JournalReadOnlyTxExample.class); + + public static void main(String[] args) throws IOException { + + // enable logging + log.setLevel(Level.INFO); + + final Properties properties = new Properties(); + + final File tmpDir = new File(System.getProperty("java.io.tmpdir")); + + if (tmpDir.exists()) { + tmpDir.mkdirs(); + } + + final File journalFile = new File(tmpDir, + JournalReadOnlyTxExample.class.getSimpleName() + "jnl"); + + if (journalFile.exists()) { + if (!journalFile.delete()) { + throw new IOException("Could not delete old file: " + + journalFile); + } + } + + // this example uses the RWStore, but it also works with the WORM. + properties.setProperty(Journal.Options.BUFFER_MODE, // + BufferMode.DiskRW.toString() +// BufferMode.DiskWORM.toString() + ); + + properties.setProperty(Journal.Options.FILE, journalFile + .getCanonicalPath()); + + /* + * Immediate recycling of committed data unless protected by a read-lock + * (ZERO milliseconds of retained data associated historical commit + * points). + * + * Note: this is the default behavior for the RWStore and the bigdata + * federation. + */ + properties.setProperty( + AbstractTransactionService.Options.MIN_RELEASE_AGE, "0"); + + Journal store = new Journal(properties); + + // The name of the index used in this example. + final String name = "testIndex"; + + try { + + /* + * Register the index. Each store can hold multiple named indices. + */ + final long commitTime1; + { + + final IndexMetadata indexMetadata = new IndexMetadata( + name, UUID.randomUUID()); + + // Note: isolatable indices are NOT required for read-only txs. + indexMetadata.setIsolatable(false); + + // register the index. + store.registerIndex(indexMetadata); + + // commit the store so the B+Tree can be found on restart. + commitTime1 = store.commit(); + + } + + /* + * Write data using the UNISOLATED view of the B+Tree. + */ + { + + final BTree unisolatedBTree = store.getIndex(name); + + writeSet1(unisolatedBTree); + + verifyWriteSet1(unisolatedBTree); + + /* + * Note: The unisolated index has not been committed yet. + */ + + } + + /* + * The data are not visible on read-only B+Tree reading from the + * most recent commit point. + */ + { + + final long tx1 = store.newTx(ITx.READ_COMMITTED); + + try { + + final BTree readOnlyBTree = (BTree) store.getIndex(name, + tx1); + + verifyWriteSetNotFound(readOnlyBTree); + + } finally { + + store.abort(tx1); + + } + + } + + // Commit the write set for the UNISOLATED index. + final long commitTime2 = store.commit(); + + // Verify writes are now visible on the UNISOLATED B+Tree. + { + + final BTree unisolatedBTree = store.getIndex(name); + + verifyWriteSet1(unisolatedBTree); + + } + + // Show that the changes were restart safe. + if (true) { + + // close the journal. + store.close(); + log.info("Store closed."); + + // re-open the journal. + store = new Journal(properties); + log.info("Store re-opened."); + + // lookup the B+Tree. + final BTree unisolatedBTree = store.getIndex(name); + + verifyWriteSet1(unisolatedBTree); + + } + + /* + * Open a read-only transaction on the last commit time. + */ + final long tx2 = store.newTx(ITx.READ_COMMITTED); + try { + + // lookup the UNISOLATED B+Tree. + final BTree unisolatedBTree = store.getIndex(name); + + // First, remove the existing tuples. + removeWriteSet(unisolatedBTree); + + /* + * Verify that the read-only view has not seen those changes. + */ + { + final BTree readOnlyBTree = (BTree) store.getIndex(name, tx2); + + verifyWriteSet1(readOnlyBTree); + + } + + /* + * Write some new records on the unisolated index. + */ + writeSet2(unisolatedBTree); + + /* + * Verify that the read-only view has not seen those changes. + * + * Note: This probably hits the index cache and everything in + * the read-only B+Tree was already materialized when we + * verified it just before writing writeSet2 onto the unisolated + * index, so this is not really testing anything. + */ + { + final BTree readOnlyBTree = (BTree) store.getIndex(name, tx2); + + verifyWriteSet1(readOnlyBTree); + } + + } finally { + // release that read-only transaction. + store.abort(tx2); + } + + log.info("Done."); + + } finally { + + // destroy the backing store. + store.destroy(); + + } + + } + + /** + * Write a set of tuples having keys in [0:1000) and values equal to the + * keys. + */ + private static void writeSet1(BTree btree) { + + log.info(""); + + for (int i = 0; i < 1000; i++) { + + btree.insert(i, i); + + } + + } + + /** + * Write a set of tuples having keys in [0:1000) and values equal to the + * twice the numerical value of the keys. + * + * @param btree + */ + private static void writeSet2(BTree btree) { + + log.info(""); + + for (int i = 0; i < 1000; i++) { + + btree.insert(i, i * 2); + + } + + } + + /** + * Verify the set of tuples written by {@link #writeSet1(BTree)}. + */ + private static void verifyWriteSet1(BTree btree) { + + log.info(""); + + for (int i = 0; i < 1000; i++) { + + final Object val = btree.lookup(i); + + if (!Integer.valueOf(i).equals(val)) { + + throw new RuntimeException("Not found: key=" + i+", val="+val); + + } + + } + + } + + /** + * Verify that the write set is not found (no keys in [0:1000)). + */ + private static void verifyWriteSetNotFound(BTree btree) { + + log.info(""); + + for (int i = 0; i < 1000; i++) { + + if (btree.contains(i)) { + + throw new RuntimeException("Not expecting: key=" + i); + + } + + } + + } + + /** + * Delete the tuples written by {@link #writeSet(BTree)} or + * {@link #writeSet2(BTree). + */ + private static void removeWriteSet(BTree btree) { + + log.info(""); + + for (int i = 0; i < 1000; i++) { + + if (btree.remove(i) == null) { + + throw new RuntimeException("Not found: key=" + i); + + } + + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-18 13:09:47
|
Revision: 3954 http://bigdata.svn.sourceforge.net/bigdata/?rev=3954&view=rev Author: thompsonbry Date: 2010-11-18 13:09:41 +0000 (Thu, 18 Nov 2010) Log Message: ----------- Added a getTransactionService() method on Journal. There is also a newTx(long) method available directly. However, it would be best to share an interface with IBigdataFederation which declares getTransactionService(). Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-17 21:37:33 UTC (rev 3953) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java 2010-11-18 13:09:41 UTC (rev 3954) @@ -715,8 +715,7 @@ try { - return localTransactionManager.getTransactionService().newTx( - timestamp); + return getTransactionService().newTx(timestamp); } catch (IOException e) { @@ -964,7 +963,13 @@ return concurrencyManager.getTransactionManager(); } + + public ITransactionService getTransactionService() { + + return getTransactionManager().getTransactionService(); + } + public WriteExecutorService getWriteService() { return concurrencyManager.getWriteService(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 21:37:40
|
Revision: 3953 http://bigdata.svn.sourceforge.net/bigdata/?rev=3953&view=rev Author: thompsonbry Date: 2010-11-17 21:37:33 +0000 (Wed, 17 Nov 2010) Log Message: ----------- JoinGraph - javadoc identifying some issues from a call with MikeP. BOpStats - now tracks the #of tasks which have been executed for a given operator. QueryLog - added the opCount column and some code reorganization. QueryEngineTestAnnotations - javadoc. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -123,7 +123,38 @@ * Since the join graph is fed the vertices (APs), it does not have access * to the annotated joins so we need to generated appropriately annotated * joins when sampling an edge and when evaluation a subquery. + * <p> + * One solution would be to always use the unpartitioned views of the + * indices for the runtime query optimizer, which is how we are estimating + * the range counts of the access paths right now. [Note that the static + * query optimizer ignores named and default graphs, while the runtime + * query optimizer SHOULD pay attention to these things and exploit their + * conditional selectivity for the query plan.] * + * @todo When there are optional join graphs, are we going to handle that by + * materializing a sample (or all) of the joins feeding that join graph + * and then apply the runtime optimizer to the optional join graph, + * getting out a sample to feed onto any downstream join graph? + * + * @todo When we run into a cardinality estimation underflow (the expected + * cardinality goes to zero) we could double the sample size for just + * those join paths which hit a zero estimated cardinality and re-run them + * within the round. This would imply that we keep per join path limits. + * The vertex and edge samples are already aware of the limit at which + * they were last sampled so this should not cause any problems there. + * + * @todo When comparing choices among join paths having fully bound tails where + * the estimated cardinality has also gone to zero, we should prefer to + * evaluate vertices in the tail with better index locality first. For + * example, if one vertex had one variable in the original plan while + * another had two variables, then solutions which reach the 2-var vertex + * could be spread out over a much wider range of the selected index than + * those which reach the 1-var vertex. [In order to support this, we would + * need a means to indicate that a fully bound access path should use an + * index specified by the query optimizer rather than the primary index + * for the relation. In addition, this suggests that we should keep bloom + * filters for more than just the SPO(C) index in scale-out.] + * * @todo Examine behavior when we do not have perfect covering indices. This * will mean that some vertices can not be sampled using an index and that * estimation of their cardinality will have to await the estimation of Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -54,6 +54,13 @@ */ final public CAT elapsed = new CAT(); + /** + * The #of instances of a given operator which have been created for a given + * query. This provides interesting information about the #of task instances + * for each operator which were required to execute a query. + */ + final public CAT opCount = new CAT(); + /** * #of chunks in. */ @@ -83,7 +90,9 @@ * Constructor. */ public BOpStats() { - + + opCount.increment(); + } /** @@ -98,21 +107,18 @@ return; } elapsed.add(o.elapsed.get()); + opCount.add(o.opCount.get()); chunksIn.add(o.chunksIn.get()); unitsIn.add(o.unitsIn.get()); unitsOut.add(o.unitsOut.get()); chunksOut.add(o.chunksOut.get()); -// chunksIn.addAndGet(o.chunksIn.get()); -// unitsIn.addAndGet(o.unitsIn.get()); -// unitsOut.addAndGet(o.unitsOut.get()); -// chunksOut.addAndGet(o.chunksOut.get()); } - public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(super.toString()); sb.append("{elapsed=" + elapsed.get()); + sb.append(",opCount=" + opCount.get()); sb.append(",chunksIn=" + chunksIn.get()); sb.append(",unitsIn=" + unitsIn.get()); sb.append(",chunksOut=" + chunksOut.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -69,9 +69,20 @@ boolean DEFAULT_ONE_MESSAGE_PER_CHUNK = false; + /** + * This option may be used to place an optional limit on the #of concurrent + * tasks which may run for the same (bopId,shardId) for a given query. The + * query is guaranteed to make progress as long as this is some positive + * integer. Limiting this value can limit the concurrency with which certain + * operators are evaluated and that can have a negative effect on the + * throughput for a given query. + */ String MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = QueryEngineTestAnnotations.class.getName() + ".maxConcurrentTasksPerOperatorAndShard"; + /** + * The default is essentially unlimited. + */ int DEFAULT_MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = Integer.MAX_VALUE; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -56,6 +56,10 @@ .getLogger(QueryLog.class); static { + logTableHeader(); + } + + static public void logTableHeader() { if(log.isInfoEnabled()) log.info(QueryLog.getTableHeader()); } @@ -74,27 +78,10 @@ try { -// if (log.isDebugEnabled()) { + logDetailRows(q); - /* - * Detail row for each operator in the query. - */ - final Integer[] order = BOpUtility.getEvaluationOrder(q - .getQuery()); - - int orderIndex = 0; - for (Integer bopId : order) { - log - .info(getTableRow(q, orderIndex, bopId, false/* summary */)); - orderIndex++; - } - -// } - - // summary row. - log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), - true/* summary */)); - + logSummaryRow(q); + } catch (RuntimeException t) { log.error(t,t); @@ -105,6 +92,34 @@ } + /** + * Log a detail row for each operator in the query. + */ + static private void logDetailRows(final IRunningQuery q) { + + final Integer[] order = BOpUtility.getEvaluationOrder(q.getQuery()); + + int orderIndex = 0; + + for (Integer bopId : order) { + + log.info(getTableRow(q, orderIndex, bopId, false/* summary */)); + + orderIndex++; + + } + + } + + /** + * Log a summary row for the query. + */ + static private void logSummaryRow(final IRunningQuery q) { + + log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), true/* summary */)); + + } + static private String getTableHeader() { final StringBuilder sb = new StringBuilder(); @@ -135,6 +150,7 @@ // dynamics (aggregated for totals as well). sb.append("\tfanIO"); sb.append("\tsumMillis"); // cumulative milliseconds for eval of this operator. + sb.append("\topCount"); // cumulative #of invocations of tasks for this operator. sb.append("\tchunksIn"); sb.append("\tunitsIn"); sb.append("\tchunksOut"); @@ -305,6 +321,8 @@ sb.append('\t'); sb.append(stats.elapsed.get()); sb.append('\t'); + sb.append(stats.opCount.get()); + sb.append('\t'); sb.append(stats.chunksIn.get()); sb.append('\t'); sb.append(stats.unitsIn.get()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 17:32:22
|
Revision: 3952 http://bigdata.svn.sourceforge.net/bigdata/?rev=3952&view=rev Author: thompsonbry Date: 2010-11-17 17:32:16 +0000 (Wed, 17 Nov 2010) Log Message: ----------- Mostly javadoc changes, including documentation for the showAllocators table. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-17 13:33:32 UTC (rev 3951) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-11-17 17:32:16 UTC (rev 3952) @@ -123,6 +123,13 @@ return m_size; } + /** + * The free list for the allocation slot size serviced by this allocator. + * This is a reference back into the corresponding free list as managed by + * the RWStore. + * + * @see #setFreeList(ArrayList) + */ private ArrayList m_freeList; public void setFreeList(ArrayList list) { @@ -243,7 +250,7 @@ if (block.m_bits[i] == 0) { // empty m_freeBits += 32; } else if (block.m_bits[i] != 0xFFFFFFFF) { // not full - int anInt = block.m_bits[i]; + final int anInt = block.m_bits[i]; for (int bit = 0; bit < 32; bit++) { if ((anInt & (1 << bit)) == 0) { m_freeBits++; @@ -425,9 +432,9 @@ return true; } else if (addr >= m_startAddr && addr < m_endAddr) { - final Iterator iter = m_allocBlocks.iterator(); + final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); while (iter.hasNext()) { - final AllocBlock block = (AllocBlock) iter.next(); + final AllocBlock block = iter.next(); if (block.free(addr, m_size)) { m_freeTransients++; @@ -506,7 +513,8 @@ } public void addAddresses(ArrayList addrs) { - Iterator blocks = m_allocBlocks.iterator(); + + final Iterator blocks = m_allocBlocks.iterator(); // FIXME int baseAddr = -((m_index << 16) + 4); // bit adjust int baseAddr = -(m_index << 16); // bit adjust?? Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 13:33:32 UTC (rev 3951) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 17:32:16 UTC (rev 3952) @@ -190,12 +190,14 @@ * * FIXME Release checklist: * <p> - * Checksum metabits header record? + * Add metabits header record checksum field and verify on read back. * <p> - * Checksum fixed allocators? + * Checksum fixed allocators (needs to be tested on read back). * <p> - * Checksum delete blocks / blob records? + * Add version field to the fixed allocator. * <p> + * Done. Checksum delete blocks / blob records. + * <p> * PSOutputStream - remove caching logic. It is unused and makes this * class much more complex. A separate per-RWStore caching class for * recycling PSOutputStreams can be added later. @@ -204,6 +206,14 @@ * declare more fields to be final. See notes on {@link AllocBlock}. * <p> * Implement logic to "abort" a shadow allocation context. + * <p> + * Unit test to verify that we do not recycle allocations from the last + * commit point even when the retention time is zero such that it is + * always possible to re-open the store from the alternative root block + * even after you have allocated things against the current root block + * (but not yet committed). + * <p> + * Read-only mode. */ public class RWStore implements IStore { @@ -336,10 +346,18 @@ */ private final ArrayList<Allocator> m_allocs; - /** lists of free alloc blocks. */ + /** + * A fixed length array of lists of free {@link FixedAllocator}s with one + * entry in the array for each configured allocator size. An allocator is + * put onto this free list when it is initially created. When the store is + * opened, it will be added to this list if {@link Allocator#hasFree()} + * returns true. It will be removed when it has no free space remaining. It + * will be added back to the free list when its free slots exceeds a + * configured threshold. + */ private ArrayList<FixedAllocator> m_freeFixed[]; - /** lists of free blob allocators. */ +// /** lists of free blob allocators. */ // private final ArrayList<BlobAllocator> m_freeBlobs; /** lists of blocks requiring commitment. */ @@ -2039,7 +2057,7 @@ private int m_metaTransientBits[]; // volatile private int m_metaStartAddr; private volatile int m_metaBitsAddr; - + // @todo javadoc please. volatile private boolean m_recentAlloc = false; /** @@ -2437,6 +2455,20 @@ * Collected statistics are against each Allocation Block size: * total number of slots | store size * number of filled slots | store used + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>BytesAppData/(SlotsInUse*AllocatorSize) : How well the application data fits in the slots.</dd> + * <dt>%StoreWaste</dt><dd>BytesAppData/BytesReserved : How much of the reserved space is in use by application data.</dd> + * <dt>%AppData</dt><dd>BytesAppData/Sum(BytesAppData). This is how much of your data is stored by each allocator.</dd> + * <dt>%BackingFile</dt><dd>BytesReserved/Sum(BytesReserved). This is how much of the backing file is reserved for each allocator.</dd> + * </dl> */ public void showAllocators(final StringBuilder str) { final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 13:33:39
|
Revision: 3951 http://bigdata.svn.sourceforge.net/bigdata/?rev=3951&view=rev Author: thompsonbry Date: 2010-11-17 13:33:32 +0000 (Wed, 17 Nov 2010) Log Message: ----------- PSOutputStream - modified to rethrow an exception which was being dumped on the console and ignored. made several method variables final. RWStore - mainly javadoc, including adding a checklist for a release at the top of the file. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-17 02:14:08 UTC (rev 3950) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/PSOutputStream.java 2010-11-17 13:33:32 UTC (rev 3951) @@ -330,23 +330,23 @@ m_writingHdr = true; // ensure that header CAN be a BLOB // m_blobHeader[m_blobHdrIdx++] = addr; m_blobHeader.add(addr); - final int precount = m_count; +// final int precount = m_count; m_count = 0; try { // writeInt(m_blobHdrIdx); // for (int i = 0; i < m_blobHdrIdx; i++) { // writeInt(m_blobHeader[i]); // } - int hdrBufSize = 4*(m_blobHeader.size() + 1); - ByteArrayOutputStream hdrbuf = new ByteArrayOutputStream(hdrBufSize); - DataOutputStream hdrout = new DataOutputStream(hdrbuf); + final int hdrBufSize = 4*(m_blobHeader.size() + 1); + final ByteArrayOutputStream hdrbuf = new ByteArrayOutputStream(hdrBufSize); + final DataOutputStream hdrout = new DataOutputStream(hdrbuf); hdrout.writeInt(m_blobHeader.size()); for (int i = 0; i < m_blobHeader.size(); i++) { hdrout.writeInt(m_blobHeader.get(i)); } hdrout.flush(); - byte[] outbuf = hdrbuf.toByteArray(); + final byte[] outbuf = hdrbuf.toByteArray(); addr = (int) m_store.alloc(outbuf, hdrBufSize, m_context); // if (m_blobHdrIdx != ((m_blobThreshold - 1 + m_bytesWritten - m_count) / m_blobThreshold)) { @@ -363,7 +363,8 @@ // DO NOT USE BLOB ALLOCATOR // addr = m_store.registerBlob(addr); // returns handle } catch (IOException e) { - e.printStackTrace(); +// e.printStackTrace(); + throw new RuntimeException(e); } } finally { m_writingHdr = false; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 02:14:08 UTC (rev 3950) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-17 13:33:32 UTC (rev 3951) @@ -105,25 +105,24 @@ * <p> * The method of storing the allocation headers has been changed from always * allocating at the end of the file (and moving them on file extend) to - * allocation of fixed areas. The meta-allocation data, containing the bitmap + * allocation of fixed areas. The meta-allocation data, containing the bitmap * that controls these allocations, is itself stored in the heap, and is now * structured to include both the bit data and the list of meta-storage * addresses. * <p> - * Sizing: - * 256 allocators would reference approximately 2M objects/allocations. At 1K - * per allocator this would require 250K of store. The meta-allocation data - * would therefore need a start address plus 32 bytes (or 8 ints) to represent - * the meta-allocation bits. An array of such data referencing sequentially - * allocated storage areas completes the meta-allocation requirements. + * Sizing: 256 allocators would reference approximately 2M objects/allocations. + * At 1K per allocator this would require 250K of store. The meta-allocation + * data would therefore need a start address plus 32 bytes (or 8 ints) to + * represent the meta-allocation bits. An array of such data referencing + * sequentially allocated storage areas completes the meta-allocation + * requirements. * <p> * A meta-allocation address can therefore be represented as a single bit offset - * from which the block, providing start address, and bit offset can be - * directly determined. + * from which the block, providing start address, and bit offset can be directly + * determined. * <p> - * The m_metaBits int array used to be fully used as allocation bits, but - * now stores both the start address plus the 8 ints used to manage that data - * block. + * The m_metaBits int array used to be fully used as allocation bits, but now + * stores both the start address plus the 8 ints used to manage that data block. * <p> * Allocation is reduced to sets of allocator objects which have a start address * and a bitmap of allocated storage maps. @@ -136,9 +135,9 @@ * the BitSet class. * <p> * Using the meta-allocation bits, it is straightforward to load ALL the - * allocation headers. A total of (say) 100 allocation headers might provide - * up to 4000 allocations each -> 400 000 objects, while 1000 headers -> 4m - * objects and 2000 -> 8m objects. + * allocation headers. A total of (say) 100 allocation headers might provide up + * to 4000 allocations each -> 400 000 objects, while 1000 headers -> 4m objects + * and 2000 -> 8m objects. * <p> * The allocators are split into a set of FixedAllocators and then * BlobAllocation. The FixedAllocators will allocate from 128 to 32K objects, @@ -167,28 +166,44 @@ * All data is checksummed, both allocated/saved data and the allocation blocks. * <p> * BLOB allocation is not handled using chained data buffers but with a blob - * header record. This is indicated with a BlobAllocator that provides indexed + * header record. This is indicated with a BlobAllocator that provides indexed * offsets to the header record (the address encodes the BlobAllocator and the * offset to the address). The header record stores the number of component * allocations and the address of each. * <p> * This approach makes for much more efficient freeing/re-allocation of Blob - * storage, in particular avoiding the need to read in the component blocks - * to determine chained blocks for freeing. This is particularly important - * for larger stores where a disk cache could be flushed through simply freeing - * BLOB allocations. + * storage, in particular avoiding the need to read in the component blocks to + * determine chained blocks for freeing. This is particularly important for + * larger stores where a disk cache could be flushed through simply freeing BLOB + * allocations. * <h2> - * Deferred Free List - * </h2> + * Deferred Free List</h2> * <p> * The previous implementation has been amended to associate a single set of - * deferredFree blocks with each CommitRecord. The CommitRecordIndex will - * then provide access to the CommitRecords to support the deferred freeing - * of allocations based on age/earliestTxReleaseTime. + * deferredFree blocks with each CommitRecord. The CommitRecordIndex will then + * provide access to the CommitRecords to support the deferred freeing of + * allocations based on age/earliestTxReleaseTime. * <p> * The last release time processed is held with the MetaAllocation data * * @author Martyn Cutcher + * + * FIXME Release checklist: + * <p> + * Checksum metabits header record? + * <p> + * Checksum fixed allocators? + * <p> + * Checksum delete blocks / blob records? + * <p> + * PSOutputStream - remove caching logic. It is unused and makes this + * class much more complex. A separate per-RWStore caching class for + * recycling PSOutputStreams can be added later. + * <p> + * Modify FixedAllocator to use arrayCopy() rather than clone and + * declare more fields to be final. See notes on {@link AllocBlock}. + * <p> + * Implement logic to "abort" a shadow allocation context. */ public class RWStore implements IStore { @@ -281,8 +296,13 @@ static final int ALLOCATION_SCALEUP = 16; // multiplier to convert allocations based on minimum allocation of 32k static private final int META_ALLOCATION = 8; // 8 * 32K is size of meta Allocation - // Maximum fixed allocs in a BLOB, but do restrict to size that will fit within a single fixed allocation - // Ignored + /** + * Maximum fixed allocs in a BLOB, but do restrict to size that will fit + * within a single fixed allocation Ignored. + * + * FIXME Javadoc. Is this ignored or not? (what is the Ignored doing at the + * end of the comment above?) Is this in units of int32 values or bytes? + */ static final int BLOB_FIXED_ALLOCS = 2048; // private ICommitCallback m_commitCallback; // @@ -751,7 +771,7 @@ if (metaBitsStore > 0) { rawmbaddr >>= 16; - + // RWStore now restore metabits final byte[] buf = new byte[metaBitsStore * 4]; @@ -1063,8 +1083,8 @@ * direct read will be the blob header record. In this case we should hand * over the streaming to a PSInputStream. * - * FIXME: For now we do not use the PSInputStream but instead process - * directly + * FIXME: Javadoc update (was: For now we do not use the PSInputStream but instead process + * directly...) * * If it is a BlobAllocation, then the BlobAllocation address points to the * address of the BlobHeader record. @@ -1299,47 +1319,6 @@ return out.toString(); } -// /** -// * FIXME: This method is not currently used with BigData, if needed then -// * the address mangling needs re-working -// */ -// public int getDataSize(long addr, byte buf[]) { -// throw new UnsupportedOperationException(); -// -//// synchronized (this) { -//// m_writes.flush(); -//// -//// if (addr == 0) { -//// return 0; -//// } -//// -//// try { -//// int size = addr2Size((int) addr); -//// synchronized (m_raf) { -////// m_raf.seek(physicalAddress((int) addr)); -////// m_raf.readFully(buf, 0, size); -//// m_raf.getChannel().read(ByteBuffer.wrap(buf, 0, size), physicalAddress((int) addr)); -//// } -//// -//// return size; -//// } catch (IOException e) { -//// throw new StorageTerminalError("Unable to read data", e); -//// } -//// } -// } - -// /** -// * Always returns ZERO (0L). -// * <p> -// * This is intended to support the core functionality of a WormStore, other -// * stores should return zero, indicating no previous versions available -// */ -// public long getPreviousAddress(final long laddr) { -// -// return 0; -// -// } - public void free(final long laddr, final int sze) { free(laddr, sze, null/* AlocationContext */); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 02:14:18
|
Revision: 3950 http://bigdata.svn.sourceforge.net/bigdata/?rev=3950&view=rev Author: thompsonbry Date: 2010-11-17 02:14:08 +0000 (Wed, 17 Nov 2010) Log Message: ----------- More work on the runtime query optimizer. It is generating useful plans for LUBM Q2, Q8 and Q9. The runtime cost of the generated plans is close to the runtime cost of the plans produced by the static query optimizer. LUBM data are pretty regular so the runtime query optimizer is not able to exploit unexpected correlations in the joins. The runtime query optimizer tends to have cardinality estimation underflow for Q2 which suggests that we need to deepen the search on paths with low estimated cardinality. This bears further investigation. When we have estimation underflow in the runtime query optimizer that means that plans extending that point are picked at random. This is similar to, but not the same, as the problem encounterd by the static query optimizer, which is unable to estimate the "as bound" cardinality after making some initial decision about the join ordering. There may very well be a role for hybrid of both the static and runtime query optimizer which plays to their different strengths. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestPredicateAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestSampleIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestRemoteAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/rule/eval/TestDefaultEvaluationPlan.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestContextAdvancer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; +import com.bigdata.bop.IPredicate.Annotations; + import cutthecrap.utils.striterators.IPropertySet; /** @@ -180,25 +182,29 @@ */ BOpEvaluationContext getEvaluationContext(); - /** - * Return <code>true</code> iff this operator is an access path which writes - * on the database. - * - * @see Annotations#MUTATION - */ - boolean isMutation(); +// /** +// * Return <code>true</code> iff this operator is an access path which writes +// * on the database. +// * +// * @see com.bigdata.bop.IPredicate.Annotations#MUTATION +// * +// * @todo Move to {@link IPredicate}? +// */ +// boolean isMutation(); +// +// /** +// * The timestamp or transaction identifier on which the operator will read +// * or write. +// * +// * @see Annotations#TIMESTAMP +// * +// * @throws IllegalStateException +// * if {@link Annotations#TIMESTAMP} was not specified. +// * +// * @todo move to {@link IPredicate}? +// */ +// long getTimestamp(); - /** - * The timestamp or transaction identifier on which the operator will read - * or write. - * - * @see Annotations#TIMESTAMP - * - * @throws IllegalStateException - * if {@link Annotations#TIMESTAMP} was not specified. - */ - long getTimestamp(); - // /** // * Compare this {@link BOp} with another {@link BOp}. // * @@ -240,37 +246,6 @@ long DEFAULT_TIMEOUT = Long.MAX_VALUE; /** - * Boolean property whose value is <code>true</code> iff this operator - * writes on a database. - * <p> - * Most operators operate solely on streams of elements or binding sets. - * Some operators read or write on the database using an access path, - * which is typically described by an {@link IPredicate}. This property - * MUST be <code>true</code> when access path is used to write on the - * database. - * <p> - * Operators which read or write on the database must declare the - * {@link Annotations#TIMESTAMP} associated with that operation. - * - * @see #TIMESTAMP - * - * @todo Move to {@link IPredicate}? - */ - String MUTATION = BOp.class.getName() + ".mutation"; - - boolean DEFAULT_MUTATION = false; - - /** - * The timestamp (or transaction identifier) used by this operator if it - * reads or writes on the database (no default). - * - * @see #MUTATION - * - * @todo Move to {@link IPredicate}? - */ - String TIMESTAMP = BOp.class.getName() + ".timestamp"; - - /** * This annotation determines where an operator will be evaluated * (default {@value #DEFAULT_EVALUATION_CONTEXT}). */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -582,18 +582,6 @@ } - public final boolean isMutation() { - - return getProperty(Annotations.MUTATION, Annotations.DEFAULT_MUTATION); - - } - - public final long getTimestamp() { - - return (Long) getRequiredProperty(Annotations.TIMESTAMP); - - } - /* * Note: I've played around with a few hash functions and senses of * equality. Predicate (before the bops were introduced) used to have a Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -165,8 +165,7 @@ final IIndexManager tmp = getFederation() == null ? getIndexManager() : getFederation(); - final long timestamp = (Long) pred - .getRequiredProperty(BOp.Annotations.TIMESTAMP); + final long timestamp = pred.getTimestamp(); return (IRelation<E>) tmp.getResourceLocator().locate( pred.getOnlyRelationName(), timestamp); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -0,0 +1,29 @@ +package com.bigdata.bop; + +import java.util.LinkedHashSet; + +/** + * A factory which may be used when some identifiers need to be reserved. + */ +public class BOpIdFactory implements IdFactory { + + private final LinkedHashSet<Integer> ids = new LinkedHashSet<Integer>(); + + private int nextId = 0; + + public void reserve(int id) { + ids.add(id); + } + + public int nextId() { + + while (ids.contains(nextId)) { + + nextId++; + + } + + return nextId++; + } + +} \ No newline at end of file Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -27,7 +27,6 @@ package com.bigdata.bop; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -699,7 +698,7 @@ for (BOp arg : bop.args()) { - if (arg.arity() > 0) { + if (!(arg instanceof IVariableOrConstant<?>)) { toString(arg, sb, indent+1); @@ -798,29 +797,33 @@ return true; } - - /** - * Copy binding sets from the source to the sink(s). - * - * @param source - * The source. - * @param sink - * The sink (required). - * @param sink2 - * Another sink (optional). - * @param constraints - * Binding sets which fail these constraints will NOT be copied - * (optional). - * @param stats - * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} - * will be updated during the copy (optional). - */ - static public void copy( + + /** + * Copy binding sets from the source to the sink(s). + * + * @param source + * The source. + * @param sink + * The sink (required). + * @param sink2 + * Another sink (optional). + * @param constraints + * Binding sets which fail these constraints will NOT be copied + * (optional). + * @param stats + * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} + * will be updated during the copy (optional). + * + * @return The #of binding sets copied. + */ + static public long copy( final IAsynchronousIterator<IBindingSet[]> source, final IBlockingBuffer<IBindingSet[]> sink, final IBlockingBuffer<IBindingSet[]> sink2, final IConstraint[] constraints, final BOpStats stats) { + long nout = 0; + while (source.hasNext()) { final IBindingSet[] chunk = source.next(); @@ -841,13 +844,19 @@ // copy accepted binding sets to the default sink. sink.add(tmp); + nout += chunk.length; + if (sink2 != null) { - // copy accepted binding sets to the alt sink. + + // copy accepted binding sets to the alt sink. sink2.add(tmp); + } } + return nout; + } /** @@ -946,5 +955,5 @@ return out; } - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -42,13 +42,12 @@ import com.bigdata.btree.filter.Advancer; import com.bigdata.btree.filter.TupleFilter; import com.bigdata.mdi.PartitionLocator; -import com.bigdata.rawstore.Bytes; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.eval.IEvaluationPlan; import com.bigdata.relation.rule.eval.pipeline.JoinMasterTask; import com.bigdata.service.ndx.IClientIndex; @@ -69,9 +68,12 @@ */ public interface IPredicate<E> extends BOp, Cloneable, Serializable { - /** - * Interface declaring well known annotations. - */ + /** + * Interface declaring well known annotations. + * + * FIXME All of these annotations should be in the {@link IPredicate} + * namespace. + */ public interface Annotations extends BOp.Annotations, BufferAnnotations { /** @@ -289,6 +291,35 @@ // | IRangeQuery.PARALLEL ; + /** + * Boolean property whose value is <code>true</code> iff this operator + * writes on a database. + * <p> + * Most operators operate solely on streams of elements or binding sets. + * Some operators read or write on the database using an access path, + * which is typically described by an {@link IPredicate}. This property + * MUST be <code>true</code> when access path is used to write on the + * database. + * <p> + * Operators which read or write on the database must declare the + * {@link Annotations#TIMESTAMP} associated with that operation. + * + * @see Annotations#TIMESTAMP + */ + String MUTATION = BOp.class.getName() + ".mutation"; + + boolean DEFAULT_MUTATION = false; + + /** + * The timestamp (or transaction identifier) used by this operator if it + * reads or writes on the database (no default). + * + * @see com.bigdata.bop.IPredicate.Annotations#MUTATION + * + * @todo Move to {@link IPredicate}? + */ + String TIMESTAMP = BOp.class.getName() + ".timestamp"; + } /** @@ -637,4 +668,23 @@ */ public IPredicate<E> setBOpId(int bopId); + /** + * Return <code>true</code> iff this operator is an access path which writes + * on the database. + * + * @see Annotations#MUTATION + */ + boolean isMutation(); + + /** + * The timestamp or transaction identifier on which the operator will read + * or write. + * + * @see Annotations#TIMESTAMP + * + * @throws IllegalStateException + * if {@link Annotations#TIMESTAMP} was not specified. + */ + long getTimestamp(); + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -0,0 +1,10 @@ +package com.bigdata.bop; + +/** + * An interface for a bop identifier factory. + */ +public interface IdFactory { + + public int nextId(); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -636,4 +636,16 @@ } + public final boolean isMutation() { + + return getProperty(IPredicate.Annotations.MUTATION, IPredicate.Annotations.DEFAULT_MUTATION); + + } + + public final long getTimestamp() { + + return (Long) getRequiredProperty(IPredicate.Annotations.TIMESTAMP); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -34,6 +34,7 @@ import java.util.Comparator; import java.util.Formatter; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -50,6 +51,8 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpContextBase; import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpIdFactory; +import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IElement; import com.bigdata.bop.IPredicate; @@ -64,8 +67,11 @@ import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.bop.rdf.join.DataSetJoin; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.relation.rule.Rule; import com.bigdata.striterator.Dechunkerator; @@ -176,13 +182,6 @@ } - /** - * - * TODO How can join constraints be moved around? Just attach them where - * ever a variable becomes bound? And when do we filter out variables which - * are not required downstream? Once we decide on a join path and execute it - * fully (rather than sampling that join path). - */ public JoinGraph(final BOp[] args, final Map<String, Object> anns) { super(args, anns); @@ -204,11 +203,6 @@ } -// /** -// * Used to assign row identifiers. -// */ -// static private final IVariable<Integer> ROWID = Var.var("__rowid"); - /** * A sample of a {@link Vertex} (an access path). */ @@ -334,9 +328,10 @@ } /** - * Take a sample of the vertex. If the sample is already exact, then - * this is a NOP. If the vertex was already sampled to that limit, then - * this is a NOP (you have to raise the limit to re-sample the vertex). + * Take a sample of the vertex, updating {@link #sample} as a + * side-effect. If the sample is already exact, then this is a NOP. If + * the vertex was already sampled to that limit, then this is a NOP (you + * have to raise the limit to re-sample the vertex). * * @param limit * The sample cutoff. @@ -677,6 +672,11 @@ /** * The last sample for this edge and <code>null</code> if the edge has * not been sampled. + * <p> + * Note: This sample is only the one-step cutoff evaluation of the edge + * given a sample of its vertex having the lesser cardinality. It is NOT + * the cutoff sample of a join path having this edge except for the + * degenerate case where the edge is the first edge in the join path. */ public EdgeSample sample = null; @@ -696,14 +696,23 @@ } /** + * The edge label is formed from the {@link BOp.Annotations#BOP_ID} of + * its ordered vertices (v1,v2). + */ + public String getLabel() { + + return "(" + v1.pred.getId() + "," + v2.pred.getId() + ")"; + + } + + /** * Note: The vertices of the edge are labeled using the * {@link BOp.Annotations#BOP_ID} associated with the {@link IPredicate} * for each vertex. */ public String toString() { - return "Edge{ (V" + v1.pred.getId() + ",V" + v2.pred.getId() - + "), estCard=" + return "Edge{ "+getLabel()+", estCard=" + (sample == null ? "N/A" : sample.estimatedCardinality) + ", shared=" + shared.toString() + ", sample=" + sample + "}"; @@ -790,27 +799,48 @@ } /** - * Estimate the cardinality of the edge. + * Estimate the cardinality of the edge, updating {@link #sample} as a + * side-effect. This is a NOP if the edge has already been sampled at + * that <i>limit</i>. This is a NOP if the edge sample is exact. * * @param context * - * @return The estimated cardinality of the edge. + * @return The new {@link EdgeSample} (this is also updated on + * {@link #sample} as a side-effect). * * @throws Exception */ - public long estimateCardinality(final QueryEngine queryEngine, + public EdgeSample estimateCardinality(final QueryEngine queryEngine, final int limit) throws Exception { if (limit <= 0) throw new IllegalArgumentException(); - /* - * Note: There is never a need to "re-sample" the edge. Unlike ROX, - * we always can sample a vertex. This means that we can sample the - * edges exactly once, during the initialization of the join graph. - */ - if (sample != null) - throw new RuntimeException(); +// /* +// * Note: There is never a need to "re-sample" the edge. Unlike ROX, +// * we always can sample a vertex. This means that we can sample the +// * edges exactly once, during the initialization of the join graph. +// */ +// if (sample != null) +// throw new RuntimeException(); + + if (sample != null) { + + if (sample.limit >= limit) { + + // Already sampled at that limit. + return sample; + + } + + if (sample.estimateEnum == EstimateEnum.Exact) { + + // Sample is exact (fully materialized result). + return sample; + + } + + } /* * Figure out which vertex has the smaller cardinality. The sample @@ -832,27 +862,6 @@ } /* - * TODO This is awkward to setup because we do not have a concept - * (or class) corresponding to a fly weight relation and we do not - * have a general purpose relation, just arrays or sequences of - * IBindingSets. Also, all relations are persistent. Temporary - * relations are on a temporary store and are locatable by their - * namespace rather than being Objects. - * - * The algorithm presupposes fly weight / temporary relations this - * both to wrap the sample and to store the computed intermediate - * results. - * - * Note: The PipelineJoin does not have a means to halt after a - * limit is satisfied. In order to achieve this, we have to wrap it - * with a SliceOp. - * - * Together, this means that we are dealing with IBindingSet[]s for - * both the input and the output of the cutoff evaluation of the - * edge rather than rows of the materialized relation. - */ - - /* * Convert the source sample into an IBindingSet[]. * * TODO We might as well do this when we sample the vertex. @@ -872,12 +881,16 @@ v.sample.rangeCount, v.sample.exact, v.sample.limit, sourceSample); - return sample.estimatedCardinality; + return sample; } /** - * Estimate the cardinality of the edge. + * Estimate the cardinality of the edge given a sample of either a + * vertex or a join path leading up to that edge. + * <p> + * Note: The caller is responsible for protecting against needless + * re-sampling. * * @param queryEngine * @param limit @@ -908,10 +921,6 @@ if (limit <= 0) throw new IllegalArgumentException(); -// // Inject a rowId column. -// sourceSample = BOpUtility.injectRowIdColumn(ROWID, 1/* start */, -// sourceSample); - /* * Note: This sets up a cutoff pipeline join operator which makes an * accurate estimate of the #of input solutions consumed and the #of @@ -928,6 +937,12 @@ * predicate) will not reduce the effort to compute the join, but * they can reduce the cardinality of the join and that is what we * are trying to estimate here. + * + * TODO How can join constraints be moved around? Just attach them + * where ever a variable becomes bound? And when do we filter out + * variables which are not required downstream? Once we decide on a + * join path and execute it fully (rather than sampling that join + * path). */ final int joinId = 1; final PipelineJoin joinOp = new PipelineJoin(new BOp[] {}, // @@ -953,22 +968,8 @@ */ new NV(PipelineJoin.Annotations.SHARED_STATE,true), new NV(PipelineJoin.Annotations.EVALUATION_CONTEXT,BOpEvaluationContext.CONTROLLER) -// // make sure the chunks are large enough to hold the result. -// new NV(PipelineJoin.Annotations.CHUNK_CAPACITY,limit), -// // no chunk timeout -// new NV(PipelineJoin.Annotations.CHUNK_TIMEOUT,Long.MAX_VALUE) ); -// BOpContext context = new BOpContext(runningQuery, partitionId, stats, source, sink, sink2); -// joinOp.eval(context); - -// final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp },// -// NV.asMap(// -// new NV(BOp.Annotations.BOP_ID, 2), // -// new NV(SliceOp.Annotations.LIMIT, (long) limit), // -// new NV(BOp.Annotations.EVALUATION_CONTEXT, -// BOpEvaluationContext.CONTROLLER))); - final PipelineOp queryOp = joinOp; // run the cutoff sampling of the edge. @@ -980,10 +981,6 @@ new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { sourceSample }))); -// // #of source samples consumed. -// int inputCount; -// // #of output samples generated. -// int outputCount = 0; final List<IBindingSet> result = new LinkedList<IBindingSet>(); try { try { @@ -993,15 +990,8 @@ runningQuery.iterator()); while (itr.hasNext()) { bset = itr.next(); -// final int rowid = (Integer) bset.get(ROWID).get(); -// if (rowid > inputCount) -// inputCount = rowid; result.add(bset); -// outputCount++; } -// // #of input rows consumed. -// inputCount = bset == null ? 0 : ((Integer) bset.get(ROWID) -// .get()); } finally { // verify no problems. runningQuery.get(); @@ -1014,8 +1004,8 @@ final PipelineJoinStats joinStats = (PipelineJoinStats) runningQuery .getStats().get(joinId); - if (log.isDebugEnabled()) - log.debug(joinStats.toString()); + if (log.isTraceEnabled()) + log.trace(joinStats.toString()); /* * TODO Improve comments here. See if it is possible to isolate a @@ -1032,8 +1022,8 @@ (int) joinStats.outputSolutions.get(), // result.toArray(new IBindingSet[result.size()])); - if (log.isTraceEnabled()) - log.trace("edge=" + this + ", sample=" + edgeSample); + if (log.isDebugEnabled()) + log.debug(getLabel() + " : newSample=" + edgeSample); return edgeSample; @@ -1081,8 +1071,7 @@ for (Edge e : edges) { if (!first) sb.append(","); - sb.append("(" + e.v1.pred.getId() + "," + e.v2.pred.getId() - + ")"); + sb.append(e.getLabel()); first = false; } sb.append(",cumEstCard=" + cumulativeEstimatedCardinality @@ -1176,68 +1165,6 @@ return false; } -// /** -// * Return <code>true</code> if this path is an unordered super set of -// * the given path. In the case where both paths have the same vertices -// * this will also return <code>true</code>. -// * -// * @param p -// * Another path. -// * -// * @return <code>true</code> if this path is an unordered super set of -// * the given path. -// */ -// public boolean isUnorderedSuperSet(final Path p) { -// -// if (p == null) -// throw new IllegalArgumentException(); -// -// if (edges.size() < p.edges.size()) { -// /* -// * Fast rejection. This assumes that each edge after the first -// * adds one distinct vertex to the path. That assumption is -// * enforced by #addEdge(). -// */ -// return false; -// } -// -// final Vertex[] v1 = getVertices(); -// final Vertex[] v2 = p.getVertices(); -// -// if (v1.length < v2.length) { -// // Proven false since the other set is larger. -// return false; -// } -// -// /* -// * Scan the vertices of the caller's path. If any of those vertices -// * are NOT found in this path then the caller's path can not be a -// * subset of this path. -// */ -// for (int i = 0; i < v2.length; i++) { -// -// final Vertex tmp = v2[i]; -// -// boolean found = false; -// for (int j = 0; j < v1.length; j++) { -// -// if (v1[j] == tmp) { -// found = true; -// break; -// } -// -// } -// -// if (!found) { -// return false; -// } -// -// } -// -// return true; -// -// } - /** * Return <code>true</code> if this path is an unordered variant of the * given path (same vertices in any order). @@ -1302,21 +1229,100 @@ } /** - * Return the vertices in this path (in path order). + * Return the vertices in this path (in path order). For the first edge, + * the minimum cardinality vertex is always reported first (this is + * critical for producing the correct join plan). For the remaining + * edges in the path, the unvisited is reported. * * @return The vertices (in path order). * * TODO This could be rewritten without the toArray() using a * method which visits the vertices of a path in any order. + * + * @todo unit test for the first vertex to be reported. */ public Vertex[] getVertices() { + final Set<Vertex> tmp = new LinkedHashSet<Vertex>(); + for (Edge e : edges) { + + if (tmp.isEmpty()) { + /* + * The first edge is handled specially in order to report + * the minimum cardinality vertex first. + */ + tmp.add(e.getMinimumCardinalityVertex()); + tmp.add(e.getMaximumCardinalityVertex()); + + } else { + + tmp.add(e.v1); + + tmp.add(e.v2); + + } + + } + + final Vertex[] a = tmp.toArray(new Vertex[tmp.size()]); + + return a; + + } + + /** + * Return the {@link IPredicate}s associated with the vertices of the + * join path in path order. + * + * @see #getVertices() + */ + public IPredicate[] getPredicates() { + + // The vertices in the selected evaluation order. + final Vertex[] vertices = getVertices(); + + // The predicates in the same order as the vertices. + final IPredicate[] preds = new IPredicate[vertices.length]; + + for (int i = 0; i < vertices.length; i++) { + + preds[i] = vertices[i].pred; + + } + + return preds; + + } + + /** + * Return the {@link BOp} identifiers of the predicates associated with + * each vertex in path order. + */ + static public int[] getVertexIds(final List<Edge> edges) { + + final Set<Vertex> tmp = new LinkedHashSet<Vertex>(); + + for (Edge e : edges) { + tmp.add(e.v1); + tmp.add(e.v2); + } + final Vertex[] a = tmp.toArray(new Vertex[tmp.size()]); - return a; + + final int[] b = new int[a.length]; + + for (int i = 0; i < a.length; i++) { + + b[i] = a[i].pred.getId(); + + } + + return b; + } /** @@ -1350,14 +1356,18 @@ /** * Add an edge to a path, computing the estimated cardinality of the new - * path, and returning the new path. + * path, and returning the new path. The cutoff join is performed using + * the {@link #sample} of <i>this</i> join path and the actual access + * path for the target vertex. * * @param queryEngine * @param limit * @param e * The edge. * - * @return The new path. + * @return The new path. The materialized sample for the new path is the + * sample obtained by the cutoff join for the edge added to the + * path. * * @throws Exception */ @@ -1432,63 +1442,12 @@ final Path tmp = new Path(edges, cumulativeEstimatedCardinality, edgeSample); - // tmp.stopVertex = e.getMaximumCardinalityVertex(); - return tmp; } } - // /** - // * Equality is defined by comparison of the unordered set of edges. - // */ - // public boolean equals(final Object o) { - // if (this == o) - // return true; - // if (!(o instanceof Path)) - // return false; - // final Path t = (Path) o; - // if (edges.length != t.edges.length) - // return false; - // for (Edge e : edges) { - // boolean found = false; - // for (Edge x : t.edges) { - // if (x.equals(e)) { - // found = true; - // break; - // } - // } - // if (!found) - // return false; - // } - // return true; - // } - // - // /** - // * The hash code of path is defined as the bit-wise XOR of the hash - // * codes of the edges in that path. - // */ - // public int hashCode() { - // - // if (hash == 0) { - // - // int result = 0; - // - // for(Edge e : edges) { - // - // result ^= e.hashCode(); - // - // } - // - // hash = result; - // - // } - // return hash; - // - // } - // private int hash; - } /** @@ -1582,31 +1541,60 @@ } /** - * A join graph (data structure and methods only). + * A runtime optimizer for a join graph. The {@link JoinGraph} bears some + * similarity to ROX (Runtime Optimizer for XQuery), but has several + * significant differences: + * <ol> + * <li> + * 1. ROX starts from the minimum cardinality edge of the minimum + * cardinality vertex. The {@link JoinGraph} starts with one or more low + * cardinality vertices.</li> + * <li> + * 2. ROX always extends the last vertex added to a given join path. The + * {@link JoinGraph} extends all vertices having unexplored edges in each + * breadth first expansion.</li> + * <li> + * 3. ROX is designed to interleave operator-at-once evaluation of join path + * segments which dominate other join path segments. The {@link JoinGraph} + * is designed to prune all join paths which are known to be dominated by + * other join paths for the same set of vertices in each round and iterates + * until a join path is identified which uses all vertices and has the + * minimum expected cumulative estimated cardinality. Join paths which + * survive pruning are re-sampled as necessary in order to obtain better + * information about edges in join paths which have a low estimated + * cardinality in order to address a problem with underflow of the + * cardinality estimates.</li> + * </ol> * - * Note: ROX was stated in terms of materialization of intermediate results. - * Bigdata was originally designed to support pipelined join evaluation in - * which the zero investment property is true (there exists an index for the - * join). While support is being developed for operator-at-once joins (e.g., - * hash joins), that support is aimed at more efficient evaluation of high - * cardinality joins using multi-block IO. Therefore, unlike ROX, the - * runtime query optimizer does not materialize the intermediate results - * when chain sampling. Instead, it feeds a sample into a cutoff pipeline - * evaluation for the join path. Since some join paths can eliminate a lot - * of intermediate solutions and hence take a long time to satisfy the - * cutoff, we also specify a timeout for the cutoff evaluation of a join - * path. Given the zero investment property (an index exists for the join), - * if the cutoff is not satisfied within the timeout, then the join has a - * low correlation. If no solutions are generated within the timeout, then - * the estimate of the correlation "underflows". + * TODO For join graphs with a large number of vertices we may need to + * constrain the #of vertices which are explored in parallel. This could be + * done by only branching the N lowest cardinality vertices from the already + * connected edges. Since fewer vertices are being explored in parallel, + * paths are more likely to converge onto the same set of vertices at which + * point we can prune the dominated paths. * - * Note: timeouts are a bit tricky when you are not running on a real-time - * platform. In particular, heavy swapping or heavy GC workloads could both - * cause a timeout to expire because no work was done on sampling the join - * path rather than because there was a lot of work to be done. Therefore, - * the timeout should be used to protect against join paths which take a - * long time to materialize <i>cutoff</i> solutions rather than to fine tune - * the running time of the query optimizer. + * TODO Compare the cumulative expected cardinality of a join path with the + * expected cost of a join path. The latter allows us to also explore + * alternative join strategies, such as the parallel subquery versus scan + * and filter decision for named graph and default graph SPARQL queries. + * + * TODO Coalescing duplicate access paths can dramatically reduce the work + * performed by a pipelined nested index subquery. (A hash join eliminates + * all duplicate access paths using a scan and filter approach.) If we will + * run a pipeline nested index subquery join, then should the runtime query + * optimizer prefer paths with duplicate access paths? + * + * TODO How can we handle things like lexicon joins. A lexicon join is is + * only evaluated when the dynamic type of a variable binding indicates that + * the RDF Value must be materialized by a join against the ID2T index. + * Binding sets having inlined values can simply be routed around the join + * against the ID2T index. Routing around saves network IO in scale-out + * where otherwise we would route binding sets having identifiers which do + * not need to be materialized to the ID2T shards. + * + * @see <a + * href="http://www-db.informatik.uni-tuebingen.de/files/research/pathfinder/publications/rox-demo.pdf"> + * ROX </a> */ public static class JGraph { @@ -1641,10 +1629,6 @@ } sb.append("\n]}"); return sb.toString(); - - // return super.toString() + "{V=" + Arrays.toString(V) + ",E=" - // + Arrays.toString(E) + - // ", executedVertices="+executedVertices+"}"; } public JGraph(final IPredicate[] v) { @@ -1707,7 +1691,7 @@ * * @throws Exception */ - public void runtimeOptimizer(final QueryEngine queryEngine, + public Path runtimeOptimizer(final QueryEngine queryEngine, final int limit) throws Exception { // Setup the join graph. @@ -1732,26 +1716,11 @@ } - /* - * FIXME Choose the best join path and execute it (or return the - * evaluation order to the caller). - * - * FIXME This must either recognize each time a join path is known - * to dominate all other join paths and then execute it or iterator - * until the total join path is decided and then execute the - * original query using that join path. - * - * @todo When executing the query, it is actually being executed as - * a subquery. Therefore we have to take appropriate care to ensure - * that the results are copied out of the subquery and into the - * parent query. See SubqueryTask for how this is done. - * - * @todo When we execute the query, we should clear the references - * to the sample (unless they are exact, in which case they can be - * used as is) in order to release memory associated with those - * samples if the query is long running. - */ - + // Should be one winner. + assert paths.length == 1; + + return paths[0]; + } /** @@ -1831,14 +1800,14 @@ */ estimateEdgeWeights(queryEngine, limit); - if (log.isInfoEnabled()) { + if (log.isDebugEnabled()) { final StringBuilder sb = new StringBuilder(); sb.append("Edges:\n"); for (Edge e : E) { sb.append(e.toString()); sb.append("\n"); } - log.info(sb.toString()); + log.debug(sb.toString()); } /* @@ -1887,52 +1856,215 @@ throw new IllegalArgumentException(); // increment the limit by itself in each round. - final int limit = round * limitIn; - - final List<Path> tmp = new LinkedList<Path>(); + final int limit = (round + 1) * limitIn; - // First, copy all existing paths. + if (log.isDebugEnabled()) + log.debug("round=" + round + ", limit=" + limit + + ", #paths(in)=" + a.length); + +// final List<Path> tmp = new LinkedList<Path>(); +// +// // First, copy all existing paths. +// for (Path x : a) { +// tmp.add(x); +// } + + /* + * Re-sample all vertices which are part of any of the existing + * paths. + * + * Note: A request to re-sample a vertex is a NOP unless the limit + * has been increased since the last time the vertex was sampled. It + * is also a NOP if the vertex has been fully materialized. + * + * TODO We only really need to deepen those paths where we have a + * low estimated join hit ratio. Paths with a higher join hit ratio + * already have a decent estimate of the cardinality and a decent + * sample size and can be explored without resampling. + */ + if (log.isDebugEnabled()) + log.debug("Re-sampling in-use vertices: limit=" + limit); + for (Path x : a) { - tmp.add(x); + + for(Edge e : x.edges) { + + e.v1.sample(queryEngine, limit); + e.v2.sample(queryEngine, limit); + + } + } - // Vertices are inserted into this collection when they are resampled. - final Set<Vertex> resampled = new LinkedHashSet<Vertex>(); + /* + * Re-sample the cutoff join for each edge in each of the existing + * paths using the newly re-sampled vertices. + * + * Note: The only way to increase the accuracy of our estimates for + * edges as we extend the join paths is to re-sample each edge in + * the join path in path order. + * + * Note: An edge must be sampled for each distinct join path prefix + * in which it appears within each round. However, it is common for + * surviving paths to share a join path prefix, so do not re-sample + * a given path prefix more than once per round. Also, do not + * re-sample paths which are from rounds before the immediately + * previous round as those paths will not be extended in this round. + */ + if (log.isDebugEnabled()) + log.debug("Re-sampling in-use path segments: limit=" + limit); - // Then expand each path. + final Map<int[], EdgeSample> edgePaths = new LinkedHashMap<int[], EdgeSample>(); + for (Path x : a) { - final int nedges = x.edges.size(); + // The edges which we have visited in this path. + final List<Edge> edges = new LinkedList<Edge>(); + + // The vertices which we have visited in this path. + final Set<Vertex> vertices = new LinkedHashSet<Vertex>(); + + EdgeSample priorEdgeSample = null; + + for(Edge e : x.edges) { + + // Add edge to the visited set for this join path. + edges.add(e); - if (nedges < round) { + // Generate unique key for this join path segment. + final int[] ids = Path.getVertexIds(edges); - // Path is from a previous round. - continue; - - } + if (priorEdgeSample == null) { - /* - * The only way to increase the accuracy of our estimates for - * edges as we extend the join paths is to re-sample each edge - * in the join path in path order. - * - * Note: An edge must be sampled for each distinct join path - * prefix in which it appears within each round. However, it is - * common for surviving paths to share a join path prefix, so do - * not re-sample a given path prefix more than once per round. - * Also, do not re-sample paths which are from rounds before the - * immediately previous round as those paths will not be - * extended in this round. - * - * FIXME Find all vertices in use by all paths which survived - * into this round. Re-sample those vertices to the new limit - * (resampling a vertex is a NOP if it has been resampled to the - * desired limit so we can do this incrementally rather than up - * front). For each edge of each path in path order, re-sample - * the edge. Shared prefix samples should be reused, but samples - * of the same edge with a different prefix must not be shared. - */ + /* + * This is the first edge in the path. + * + * Test our local table of join path segment estimates + * to see if we have already re-sampled that edge. If + * not, then re-sample it now. + */ + + // Test sample cache. + EdgeSample edgeSample = edgePaths.get(ids); + + if (edgeSample == null) { + if (e.sample != null && e.sample.limit >= limit) { + + // The existing sample for that edge is fine. + edgeSample = e.sample; + + } else { + + /* + * Re-sample the edge, updating the sample on + * the edge as a side-effect. The cutoff sample + * is based on the vertex sample for the minimum + * cardinality vertex. + */ + + edgeSample = e.estimateCardinality(queryEngine, + limit); + + } + + // Cache the sample. + if (edgePaths.put(ids, edgeSample) != null) + throw new AssertionError(); + + } + + // Add both vertices to the visited set. + vertices.add(e.v1); + vertices.add(e.v2); + + // Save sample. It will be used to re-sample the next edge. + priorEdgeSample = edgeSample; + + continue; + + } + + final boolean v1Found = vertices.contains(e.v1); + + // The source vertex for the new edge. + final Vertex sVertex = v1Found ? e.v1 : e.v2; + + // The target vertex for the new edge. + final Vertex tVertex = v1Found ? e.v2 : e.v1; + + // Look for sample for this path in our cache. + EdgeSample edgeSample = edgePaths.get(ids); + + if (edgeSample == null) { + + /* + * This is some N-step edge in the path, where N is + * greater than ONE (1). The source vertex is the vertex + * which already appears in the prior edges of this join + * path. The target vertex is the next vertex which is + * visited by the join path. The sample pass in is the + * prior edge sample - that is, the sample from the path + * segment less the target vertex. This is the sample + * that we just updated when we visited the prior edge + * of the path. + */ + + edgeSample = e + .estimateCardinality( + queryEngine, + limit, + sVertex, + tVertex,// + priorEdgeSample.estimatedCardinality,// + priorEdgeSample.estimateEnum == EstimateEnum.Exact, + priorEdgeSample.limit,// + priorEdgeSample.sample// + ); + + if (log.isDebugEnabled()) + log.debug("Resampled: " + Arrays.toString(ids) + + " : " + edgeSample); + + if (edgePaths.put(ids, edgeSample) != null) + throw new AssertionError(); + + } + + // Save sample. It will be used to re-sample the next edge. + priorEdgeSample = edgeSample; + + // Add target vertex to the visited set. + vertices.add(tVertex); + + } // next Edge [e] in Path [x] + + // Save the result on the path. + x.sample = priorEdgeSample; + } // next Path [x]. + + /* + * Expand each path one step from each vertex which branches to an + * unused vertex. + */ + + if (log.isDebugEnabled()) + log.debug("Expanding paths: limit=" + limit + ", #paths(in)=" + + a.length); + + final List<Path> tmp = new LinkedList<Path>(); + + for (Path x : a) { + +// final int nedges = x.edges.size(); +// +// if (nedges < round) { +// +// // Path is from a previous round. +// continue; +// +// } + // The set of vertices used to expand this path in this round. final Set<Vertex> used = new LinkedHashSet<Vertex>(); @@ -1969,13 +2101,8 @@ // add the new vertex to the set of used vertices. used.add(tVertex); - if (resampled.add(tVertex)) { - /* - * (Re-)sample this vertex before we sample a new edge - * which targets this vertex. - */ - tVertex.sample(queryEngine, limit); - } + // (Re-)sample vertex before we sample a new edge + tVertex.sample(queryEngine, limit); // Extend the path to the new vertex. final Path p = x.addEdge(queryEngine, limit, edgeInGraph); @@ -2355,30 +2482,26 @@ // Create the join graph. final JGraph g = new JGraph(getVertices()); - // Run it. - g.runtimeOptimizer(context.getRunningQuery().getQueryEngine(), limit); + // Find the best join path. + final Path p = g.runtimeOptimizer(context.getRunningQuery() + .getQueryEngine(), limit); + // Factory avoids reuse of bopIds assigned to the predicates. + final BOpIdFactory idFactory = new BOpIdFactory(); + + // Generate the query from the join path. + final PipelineOp queryOp = JoinGraph.getQuery(idFactory, p + .getPredicates()); + + // Run the query, blocking until it is done. + JoinGraph.runSubquery(context, queryOp); + return null; } - } + } // class JoinGraphTask -// @todo Could be used to appropriately ignore false precision in cardinality estimates. -// private static double roundToSignificantFigures(final double num, -// final int n) { -// if (num == 0) { -// return 0; -// } -// -// final double d = Math.ceil(Math.log10(num < 0 ? -num : num)); -// final int power = n - (int) d; -// -// final double magnitude = Math.pow(10, power); -// final long shifted = Math.round(num * magnitude); -// return shifted / magnitude; -// } - /** * Places vertices into order by the {@link BOp#getId()} associated with * their {@link IPredicate}. @@ -2436,4 +2559,191 @@ } + /* + * Static methods: + * + * @todo Keep with JGraph or move to utility class. However, the precise + * manner in which the query plan is generated is still up in the air since + * we are not yet handling anything except standard joins in the runtime + * optimizer. + */ + + /** + * Generate a query plan from an ordered collection of predicates. + * + * @param p + * The join path. + * + * @return The query plan. + */ + static public PipelineOp getQuery(final BOpIdFactory idFactory, + final IPredicate[] preds) { + + final PipelineJoin[] joins = new PipelineJoin[preds.length]; + +// final PipelineOp startOp = new StartOp(new BOp[] {}, +// NV.asMap(new NV[] {// +// new NV(Predicate.Annotations.BOP_ID, idFactory +// .nextId()),// +// new NV(SliceOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER),// +// })); +// +// PipelineOp lastOp = startOp; + PipelineOp lastOp = null; + +// final Set<IVariable> vars = new LinkedHashSet<IVariable>(); +// for(IPredicate p : preds) { +// for(BOp arg : p.args()) { +// if(arg instanceof IVariable) { +// vars.add((IVariable)arg); +// } +// } +// } + + for (int i = 0; i < preds.length; i++) { + + // The next vertex in the selected join order. + final IPredicate p = preds[i]; + + final List<NV> anns = new LinkedList<NV>(); + + anns.add(new NV(PipelineJoin.Annotations.PREDICATE, p)); + + anns.add(new NV(PipelineJoin.Annotations.BOP_ID, idFactory + .nextId())); + +// anns.add(new NV(PipelineJoin.Annotations.EVALUATION_CONTEXT, BOpEvaluationContext.ANY)); +// +// anns.add(new NV(PipelineJoin.Annotations.SELECT, vars.toArray(new IVariable[vars.size()]))); + + final PipelineJoin joinOp = new PipelineJoin( + lastOp == null ? new BOp[0] : new BOp[] { lastOp }, + anns.toArray(new NV[anns.size()])); + + joins[i] = joinOp; + + lastOp = joinOp; + + } + +// final PipelineOp queryOp = lastOp; + + /* + * FIXME Why does wrapping with this slice appear to be + * necessary? (It is causing runtime errors when not wrapped). + * Is this a bopId collision which is not being detected? + */ + final PipelineOp queryOp = new SliceOp(new BOp[] { lastOp }, NV + .asMap(new NV[] { + new NV(JoinGraph.Annotations.BOP_ID, idFactory.nextId()), // + new NV(JoinGraph.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER) }) // + ); + + return queryOp; + + } + + /** + * Execute the selected join path. + * <p> + * Note: When executing the query, it is actually being executed as a + * subquery. Therefore we have to take appropriate care to ensure that the + * results are copied out of the subquery and into the parent query. See + * {@link AbstractSubqueryOp} for how this is done. + * + * @todo When we execute the query, we should clear the references to the + * samples (unless they are exact, in which case they can be used as + * is) in order to release memory associated with those samples if the + * query is long running. Samples must be held until we have + * identified the final join path since each vertex will be used by + * each maximum length join path and we use the samples from the + * vertices to re-sample the surviving join paths in each round. + * + * @todo If there is a slice on the outer query, then the query result may + * well be materialized by now. + * + * @todo If there are source binding sets then they need to be applied above + * (when we are sampling) and below (when we evaluate the selected + * join path). + * + * FIXME runQuery() is not working correctly. The query is being + * halted by a {@link BufferClosedException} which appears before it + * has materialized the necessary results. + */ + static public void runSubquery(final BOpContext<IBindingSet> parentContext, + final PipelineOp queryOp) { + + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + + try { + + if (log.isInfoEnabled()) + log.info("Running: " + BOpUtility.toString(queryOp)); + + final PipelineOp startOp = (PipelineOp) BOpUtility + .getPipelineStart(queryOp); + + if (log.isInfoEnabled()) + log.info("StartOp: " + BOpUtility.toString(startOp)); + + // Run the query. + final UUID queryId = UUID.randomUUID(); + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + + final RunningQuery runningQuery = queryEngine + .eval( + queryId, + queryOp, + new LocalChunkMessage<IBindingSet>( + queryEngine, + queryId, + startOp.getId()/* startId */, + -1 /* partitionId */, + /* + * @todo pass in the source binding sets + * here and also when sampling the + * vertices. + */ + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { new HashBindingSet() } }))); + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningQuery.iterator(); + + // Copy solutions from the subquery to the query. + final long nout = BOpUtility + .copy(subquerySolutionItr, parentContext.getSink(), + null/* sink2 */, null/* constraints */, null/* stats */); + + System.out.println("nout=" + nout); + + // verify no problems. + runningQuery.get(); + + System.out.println("Future Ok"); + + } catch (Throwable t) { + + log.error(t,t); + + /* + * If a subquery fails, then propagate the error to the parent + * and rethrow the first cause error out of the subquery. + */ + throw new RuntimeException(parentContext.getRunningQuery() + .halt(t)); + + } finally { + + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -72,15 +72,33 @@ if (log.isInfoEnabled()) { - final Integer[] order = BOpUtility.getEvaluationOrder(q.getQuery()); + try { - log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), - true/* summary */)); +// if (log.isDebugEnabled()) { - int orderIndex = 0; - for (Integer bopId : order) { - log.info(getTableRow(q, orderIndex, bopId, false/* summary */)); - orderIndex++; + /* + * Detail row for each operator in the query. + */ + final Integer[] order = BOpUtility.getEvaluationOrder(q + .getQuery()); + + int orderIndex = 0; + for (Integer bopId : order) { + log + .info(getTableRow(q, orderIndex, bopId, false/* summary */)); + orderIndex++; + } + +// } + + // summary row. + log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), + true/* summary */)); + + } catch (RuntimeException t) { + + log.error(t,t); + } } @@ -107,6 +125,7 @@ */ sb.append("\tevalOrder"); // [0..n-1] sb.append("\tbopId"); + sb.append("\tpredId"); sb.append("\tevalContext"); sb.append("\tcontroller"); // metadata considered by the static optimizer. @@ -120,7 +139,7 @@ sb.append("\tunitsIn"); sb.append("\tchunksOut"); sb.append("\tunitsOut"); - sb.append("\tmultipler"); // expansion rate multipler in the solution count. + sb.append("\tjoinRatio"); // expansion rate multipler in the solution count. sb.append("\taccessPathDups"); sb.append("\taccessPathCount"); sb.append("\taccessPathChunksIn"); @@ -146,7 +165,8 @@ * @param summary <code>true</code> iff the summary for the query should be written. * @return The row of the table. */ - static private String getTableRow(final IRunningQuery q, final int evalOrder, final Integer bopId, final boolean summary) { + static private String getTableRow(final IRunningQuery q, + final int evalOrder, final Integer bopId, final boolean summary) { final StringBuilder sb = new StringBuilder(); @@ -190,16 +210,32 @@ * keep this from breaking the table format. */ sb.append(BOpUtility.toString(q.getQuery()).replace('\n', ' ')); + sb.append('\t'); + sb.append("total"); // summary line. } else { - // Otherwise how just this bop. + // Otherwise show just this bop. sb.append(bopIndex.get(bopId).toString()); + sb.append('\t'); + sb.append(evalOrder); // eval order for this bop. } sb.append('\t'); - sb.append(evalOrder); - sb.append('\t'); sb.append(Integer.toString(bopId)); sb.append('\t'); + { + /* + * Show the predicate identifier if this is a Join operator. + * + * @todo handle other kinds of join operators when added using a + * shared interface. + */ + final IPredicate<?> pred = (IPredicate<?>) bop + .getProperty(PipelineJoin.Annotations.PREDICATE); + if (pred != null) { + sb.append(Integer.toString(pred.getId())); + } + } + sb.append('\t'); sb.append(bop.getEvaluationContext()); sb.append('\t'); sb.append(bop.getProperty(BOp.Annotations.CONTROLLER, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -491,8 +491,16 @@ pop... [truncated message content] |