This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <mar...@us...> - 2010-12-20 12:06:14
|
Revision: 4024 http://bigdata.svn.sourceforge.net/bigdata/?rev=4024&view=rev Author: martyncutcher Date: 2010-12-20 12:06:07 +0000 (Mon, 20 Dec 2010) Log Message: ----------- Ensure deletions trigger RWStore commit required, and remove unnecessary transaction bracketing from concurrency tests. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-20 08:57:44 UTC (rev 4023) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-20 12:06:07 UTC (rev 4024) @@ -1589,7 +1589,7 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - if (false&&m_minReleaseAge == 0) { + if (m_minReleaseAge == 0) { /* * The session protection is complicated by the mix of * transaction protection and isolated AllocationContexts. @@ -1737,6 +1737,8 @@ if (!m_commitList.contains(alloc)) { m_commitList.add(alloc); + + m_recentAlloc = true; } } finally { m_allocationLock.unlock(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-20 08:57:44 UTC (rev 4023) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-20 12:06:07 UTC (rev 4024) @@ -103,10 +103,10 @@ final Journal journal = new Journal(properties); - final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); - if (bufferStrategy instanceof RWStrategy) { - ((RWStrategy)bufferStrategy).getRWStore().activateTx(); - } +// final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); +// if (bufferStrategy instanceof RWStrategy) { +// ((RWStrategy)bufferStrategy).getRWStore().activateTx(); +// } try { @@ -134,9 +134,9 @@ ); } finally { - if (bufferStrategy instanceof RWStrategy) { - ((RWStrategy)bufferStrategy).getRWStore().deactivateTx(); - } +// if (bufferStrategy instanceof RWStrategy) { +// ((RWStrategy)bufferStrategy).getRWStore().deactivateTx(); +// } journal.destroy(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java 2010-12-20 08:57:44 UTC (rev 4023) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java 2010-12-20 12:06:07 UTC (rev 4024) @@ -1653,10 +1653,10 @@ properties.setProperty(Options.WRITE_SERVICE_MAXIMUM_POOL_SIZE, "1"); final Journal journal = new Journal(properties); - final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); - if (bufferStrategy instanceof RWStrategy) { - ((RWStrategy)bufferStrategy).getRWStore().activateTx(); - } +// final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); +// if (bufferStrategy instanceof RWStrategy) { +// ((RWStrategy)bufferStrategy).getRWStore().activateTx(); +// } try { @@ -1805,9 +1805,9 @@ } finally { - if (bufferStrategy instanceof RWStrategy) { - ((RWStrategy)bufferStrategy).getRWStore().deactivateTx(); - } +// if (bufferStrategy instanceof RWStrategy) { +// ((RWStrategy)bufferStrategy).getRWStore().deactivateTx(); +// } journal.destroy(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-12-20 08:57:44 UTC (rev 4023) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-12-20 12:06:07 UTC (rev 4024) @@ -997,26 +997,35 @@ bs.delete(faddr); // Must not have been immediately freed if history is retained. - if (rw.getHistoryRetention() != 0) + if (rw.getHistoryRetention() != 0) { assertEquals(pa, bs.getPhysicalAddress(faddr)); - else - assertEquals(0L, bs.getPhysicalAddress(faddr)); - /* - * Commit before testing for deferred frees. Since there is a - * prior commit point, we are not allowed to immediately free - * any record from that commit point in order to preserve the - * consistency of the last commit point, so we have to commit - * first then test for deferred frees. - */ - System.out.println("Now commit to disk (2)"); + /* + * Commit before testing for deferred frees. Since there is a + * prior commit point, we are not allowed to immediately free + * any record from that commit point in order to preserve the + * consistency of the last commit point, so we have to commit + * first then test for deferred frees. + */ + System.out.println("Now commit to disk (2)"); - store.commit(); + store.commit(); - Thread.currentThread().sleep(10); + Thread.currentThread().sleep(10); // to force deferredFrees - // Request release of deferred frees. - rw.checkDeferredFrees(true/* freeNow */, store); + // Request release of deferred frees. + rw.checkDeferredFrees(true/* freeNow */, store); + + // Now commit() to ensure the deferrals can be recycled + store.commit(); + } else { + // The address is deleted, but will still return a valid + // address since it is committed + assertEquals(pa, bs.getPhysicalAddress(faddr)); + + // Now commit() to ensure the deferrals can be recycled + store.commit(); + } assertEquals(0L, bs.getPhysicalAddress(faddr)); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-20 08:57:52
|
Revision: 4023 http://bigdata.svn.sourceforge.net/bigdata/?rev=4023&view=rev Author: martyncutcher Date: 2010-12-20 08:57:44 +0000 (Mon, 20 Dec 2010) Log Message: ----------- fix to AllocBlock releaseSession for higher bit allocations and addition test_stressSessionProtection Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-19 23:36:02 UTC (rev 4022) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-20 08:57:44 UTC (rev 4023) @@ -321,11 +321,12 @@ m_transients[i] = m_live[i] | m_commit[i]; chkbits &= ~m_transients[i]; + final int startBit = i * 32; if (chkbits != 0) { // there are writes to clear for (int b = 0; b < 32; b++) { if ((chkbits & (1 << b)) != 0) { - long clr = RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * b); + long clr = RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * (startBit + b)); if (log.isTraceEnabled()) log.trace("releasing address: " + clr); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-12-19 23:36:02 UTC (rev 4022) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java 2010-12-20 08:57:44 UTC (rev 4023) @@ -20,7 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ /* * Created on Oct 14, 2006 */ @@ -37,6 +37,10 @@ import junit.extensions.proxy.ProxyTestSuite; import junit.framework.Test; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.ITuple; +import com.bigdata.btree.ITupleIterator; +import com.bigdata.btree.IndexMetadata; import com.bigdata.config.LongValidator; import com.bigdata.journal.AbstractInterruptsTestCase; import com.bigdata.journal.AbstractJournalTestCase; @@ -44,7 +48,10 @@ import com.bigdata.journal.AbstractMROWTestCase; import com.bigdata.journal.AbstractRestartSafeTestCase; import com.bigdata.journal.BufferMode; +import com.bigdata.journal.CommitRecordIndex; +import com.bigdata.journal.CommitRecordSerializer; import com.bigdata.journal.DiskOnlyStrategy; +import com.bigdata.journal.ICommitRecord; import com.bigdata.journal.Journal; import com.bigdata.journal.RWStrategy; import com.bigdata.journal.TestJournalBasics; @@ -56,617 +63,623 @@ /** * Test suite for {@link BufferMode#DiskRW} journals. * - * TODO: must modify RWStore to use DirectBufferPool to allocate and release buffers, - * Once done then ensure the write cache is enabled when running test suite + * TODO: must modify RWStore to use DirectBufferPool to allocate and release + * buffers, Once done then ensure the write cache is enabled when running test + * suite * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class TestRWJournal extends AbstractJournalTestCase { - public TestRWJournal() { - super(); - } + public TestRWJournal() { + super(); + } - public TestRWJournal(String name) { - super(name); - } + public TestRWJournal(String name) { + super(name); + } - public static Test suite() { + public static Test suite() { - final TestRWJournal delegate = new TestRWJournal(); // !!!! THIS CLASS !!!! + final TestRWJournal delegate = new TestRWJournal(); // !!!! THIS CLASS + // !!!! - /* - * Use a proxy test suite and specify the delegate. - */ + /* + * Use a proxy test suite and specify the delegate. + */ - final ProxyTestSuite suite = new ProxyTestSuite(delegate, - "Disk RW Journal Test Suite"); + final ProxyTestSuite suite = new ProxyTestSuite(delegate, "Disk RW Journal Test Suite"); - /* - * List any non-proxied tests (typically bootstrapping tests). - */ - - // tests defined by this class. - suite.addTestSuite(TestRWJournal.class); + /* + * List any non-proxied tests (typically bootstrapping tests). + */ - // test suite for the IRawStore api. - suite.addTestSuite(TestRawStore.class); + // tests defined by this class. + suite.addTestSuite(TestRWJournal.class); - // test suite for handling asynchronous close of the file channel. - suite.addTestSuite(TestInterrupts.class); + // test suite for the IRawStore api. + suite.addTestSuite(TestRawStore.class); - // test suite for MROW correctness. - suite.addTestSuite(TestMROW.class); + // test suite for handling asynchronous close of the file channel. + suite.addTestSuite(TestInterrupts.class); - // test suite for MRMW correctness. - suite.addTestSuite(TestMRMW.class); + // test suite for MROW correctness. + suite.addTestSuite(TestMROW.class); - /* - * Pickup the basic journal test suite. This is a proxied test suite, so - * all the tests will run with the configuration specified in this test - * class and its optional .properties file. - */ - suite.addTest(TestJournalBasics.suite()); - - return suite; + // test suite for MRMW correctness. + suite.addTestSuite(TestMRMW.class); - } + /* + * Pickup the basic journal test suite. This is a proxied test suite, so + * all the tests will run with the configuration specified in this test + * class and its optional .properties file. + */ + suite.addTest(TestJournalBasics.suite()); - public Properties getProperties() { + return suite; - final Properties properties = super.getProperties(); + } - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); - // properties.setProperty(Options.BUFFER_MODE, BufferMode.TemporaryRW.toString()); + public Properties getProperties() { - // properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - // properties.setProperty(Options.FILE, "/Volumes/SSDData/TestRW/tmp.rw"); + final Properties properties = super.getProperties(); - properties.setProperty(Options.DELETE_ON_EXIT, "true"); + properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); + // properties.setProperty(Options.BUFFER_MODE, + // BufferMode.TemporaryRW.toString()); - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); + // properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - // number of bits in FixedAllocators - properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_FREE_BITS_THRESHOLD, "1000"); - - // Size of META_BITS_BLOCKS - properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_META_BITS_SIZE, "9"); + // properties.setProperty(Options.FILE, + // "/Volumes/SSDData/TestRW/tmp.rw"); - // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32"); // 2K max - properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 1K + properties.setProperty(Options.DELETE_ON_EXIT, "true"); - // ensure history retention to force deferredFrees - // properties.setProperty(AbstractTransactionService.Options.MIN_RELEASE_AGE, "1"); // Non-zero + properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - return properties; + // number of bits in FixedAllocators + properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_FREE_BITS_THRESHOLD, "1000"); - } - - /** - * Verify normal operation and basic assumptions when creating a new journal - * using {@link BufferMode#DiskRW}. - * - * @throws IOException - */ - public void test_create_disk01() throws IOException { + // Size of META_BITS_BLOCKS + properties.setProperty(com.bigdata.rwstore.RWStore.Options.DEFAULT_META_BITS_SIZE, "9"); - File file = null; + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, + // "1,2,3,5,8,12,16,32"); // 2K max + properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 1K - final Properties properties = getProperties(); + // ensure history retention to force deferredFrees + // properties.setProperty(AbstractTransactionService.Options.MIN_RELEASE_AGE, + // "1"); // Non-zero - final Journal journal = new Journal(properties); + return properties; - try { + } - final RWStrategy bufferStrategy = (RWStrategy) journal - .getBufferStrategy(); + /** + * Verify normal operation and basic assumptions when creating a new journal + * using {@link BufferMode#DiskRW}. + * + * @throws IOException + */ + public void test_create_disk01() throws IOException { - assertTrue("isStable", bufferStrategy.isStable()); - assertFalse("isFullyBuffered", bufferStrategy.isFullyBuffered()); - // assertEquals(Options.FILE, properties.getProperty(Options.FILE), - // bufferStrategy.file.toString()); - assertEquals(Options.INITIAL_EXTENT, Long - .parseLong(Options.DEFAULT_INITIAL_EXTENT), bufferStrategy - .getInitialExtent()); - assertEquals(Options.MAXIMUM_EXTENT, - 0L/* soft limit for disk mode */, bufferStrategy - .getMaximumExtent()); -// assertNotNull("raf", bufferStrategy.getRandomAccessFile()); - assertEquals(Options.BUFFER_MODE, BufferMode.DiskRW, bufferStrategy - .getBufferMode()); + File file = null; - file = journal.getFile(); - - } finally { + final Properties properties = getProperties(); - journal.destroy(); + final Journal journal = new Journal(properties); - } + try { - if(file != null && file.exists()) - fail("Did not delete the backing file: "+file); - - } - - /** - * Unit test verifies that {@link Options#CREATE} may be used to initialize - * a journal on a newly created empty file. - * - * @throws IOException - */ - public void test_create_emptyFile() throws IOException { - - final File file = File.createTempFile(getName(), Options.JNL); + final RWStrategy bufferStrategy = (RWStrategy) journal.getBufferStrategy(); - final Properties properties = new Properties(); + assertTrue("isStable", bufferStrategy.isStable()); + assertFalse("isFullyBuffered", bufferStrategy.isFullyBuffered()); + // assertEquals(Options.FILE, properties.getProperty(Options.FILE), + // bufferStrategy.file.toString()); + assertEquals(Options.INITIAL_EXTENT, Long.parseLong(Options.DEFAULT_INITIAL_EXTENT), bufferStrategy + .getInitialExtent()); + assertEquals(Options.MAXIMUM_EXTENT, 0L/* soft limit for disk mode */, bufferStrategy.getMaximumExtent()); + // assertNotNull("raf", bufferStrategy.getRandomAccessFile()); + assertEquals(Options.BUFFER_MODE, BufferMode.DiskRW, bufferStrategy.getBufferMode()); - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); + file = journal.getFile(); - properties.setProperty(Options.FILE, file.toString()); + } finally { - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); + journal.destroy(); - final Journal journal = new Journal(properties); + } - try { + if (file != null && file.exists()) + fail("Did not delete the backing file: " + file); - assertEquals(file, journal.getFile()); + } - } finally { + /** + * Unit test verifies that {@link Options#CREATE} may be used to initialize + * a journal on a newly created empty file. + * + * @throws IOException + */ + public void test_create_emptyFile() throws IOException { - journal.destroy(); + final File file = File.createTempFile(getName(), Options.JNL); - } + final Properties properties = new Properties(); - } + properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); - /** - * Test suite integration for {@link AbstractRestartSafeTestCase}. - * - * @todo there are several unit tests in this class that deal with - * {@link DiskOnlyStrategy#allocate(int)} and - * {@link DiskOnlyStrategy#update(long, int, ByteBuffer)}. If those - * methods are added to the {@link IRawStore} API then move these unit - * tests into {@link AbstractRawStoreTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestRawStore extends AbstractRestartSafeTestCase { - - public TestRawStore() { - super(); - } + properties.setProperty(Options.FILE, file.toString()); - public TestRawStore(String name) { - super(name); - } + properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - protected BufferMode getBufferMode() { - - return BufferMode.DiskRW; - // return BufferMode.TemporaryRW; - - } + final Journal journal = new Journal(properties); - public Properties getProperties() { - - System.out.println("TestRWJournal:getProperties"); + try { - final Properties properties = super.getProperties(); + assertEquals(file, journal.getFile()); - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); - // properties.setProperty(Options.BUFFER_MODE, BufferMode.TemporaryRW.toString()); + } finally { - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - // properties.setProperty(Options.FILE, "/Volumes/SSDData/TestRW/tmp.rw"); -// properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32,48,64,128,192,320,512,832,1344,2176,3520"); - // properties.setProperty(Options.RW_ALLOCATIONS, "1,2,3,5,8,12,16,32,48,64"); + journal.destroy(); - properties.setProperty(Options.DELETE_ON_EXIT, "true"); + } - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); + } - // number of bits in FixedAllocators - properties.setProperty(RWStore.Options.FREE_BITS_THRESHOLD, "50"); - - // Size of META_BITS_BLOCKS - properties.setProperty(RWStore.Options.META_BITS_SIZE, "9"); + /** + * Test suite integration for {@link AbstractRestartSafeTestCase}. + * + * @todo there are several unit tests in this class that deal with + * {@link DiskOnlyStrategy#allocate(int)} and + * {@link DiskOnlyStrategy#update(long, int, ByteBuffer)}. If those + * methods are added to the {@link IRawStore} API then move these unit + * tests into {@link AbstractRawStoreTestCase}. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * @version $Id: TestRWJournal.java 4010 2010-12-16 12:44:43Z martyncutcher + * $ + */ + public static class TestRawStore extends AbstractRestartSafeTestCase { - // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32,48,64,128"); // 8K - max blob = 2K * 8K = 16M - // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16,32,48,64,128"); // 2K max - properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 2K max + public TestRawStore() { + super(); + } - // ensure history retention to force deferredFrees - // properties.setProperty(AbstractTransactionService.Options.MIN_RELEASE_AGE, "1"); // Non-zero + public TestRawStore(String name) { + super(name); + } - return properties; + protected BufferMode getBufferMode() { - } - - protected IRawStore getStore() { - - return new Journal(getProperties()); - - } - -// /** -// * Test that allocate() pre-extends the store when a record is allocated -// * which would overflow the current user extent. -// */ -// public void test_allocPreExtendsStore() { -// -// final Journal store = (Journal) getStore(); -// -// try { -// -// final DiskOnlyStrategy bufferStrategy = (DiskOnlyStrategy) store -// .getBufferStrategy(); -// -// final long nextOffset = store.getRootBlockView() -// .getNextOffset(); -// -// final long length = store.size(); -// -// final long headerSize = FileMetadata.headerSize0; -// -// // #of bytes remaining in the user extent before overflow. -// final long nfree = length - (headerSize + nextOffset); -// -// if (nfree >= Integer.MAX_VALUE) { -// -// /* -// * The test is trying to allocate a single record that will -// * force the store to be extended. This will not work if the -// * store file already has a huge user extent with nothing -// * allocated on it. -// */ -// -// fail("Can't allocate a record with: " + nfree + " bytes"); -// -// } -// -// final int nbytes = (int) nfree; -// -// final long addr = bufferStrategy.allocate(nbytes); -// -// assertNotSame(0L, addr); -// -// assertEquals(nbytes, store.getByteCount(addr)); -// -// // store file was extended. -// assertTrue(store.size() > length); -// -// } finally { -// -// store.destroy(); -// -// } -// -// } + return BufferMode.DiskRW; + // return BufferMode.TemporaryRW; - /** - * Test allocate()+read() where the record was never written (the data - * are undefined unless written so there is nothing really to test here - * except for exceptions which might be through for this condition). - */ - public void test_allocate_then_read() {} + } - /** - * Reallocates the same object several times, then commits and tests read back. - * - * - */ - public void test_reallocate() { - final Journal store = (Journal) getStore(); + public Properties getProperties() { - try { + System.out.println("TestRWJournal:getProperties"); - byte[] buf = new byte[1024]; // 2Mb buffer of random data - r.nextBytes(buf); - - ByteBuffer bb = ByteBuffer.wrap(buf); + final Properties properties = super.getProperties(); - RWStrategy bs = (RWStrategy) store - .getBufferStrategy(); + properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskRW.toString()); + // properties.setProperty(Options.BUFFER_MODE, + // BufferMode.TemporaryRW.toString()); - RWStore rw = bs.getRWStore(); - - long faddr1 = bs.write(bb); - bb.position(0); - //bs.delete(faddr); - - long faddr2 = bs.write(bb); - bb.position(0); - - store.commit(); - - rw.reset(); - - ByteBuffer inbb1 = bs.read(faddr1); - ByteBuffer inbb2 = bs.read(faddr2); - - assertEquals(bb, inbb1); - assertEquals(bb, inbb2); - - } finally { - store.destroy(); - } - - } - - /** - * Test write of a record and then update of a slice of that record. - * <p> - * Note: Since the record was written but not flushed it will be found - * in the write cache by update(). - */ - public void test_write_plus_update() {} - - /** - * Ensures the allocation of unique addresses by mapping allocated address with uniqueness - * assertion against physical address. - */ - public void test_addressing() { - - final Journal store = (Journal) getStore(); + properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - try { + // properties.setProperty(Options.FILE, + // "/Volumes/SSDData/TestRW/tmp.rw"); + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, + // "1,2,3,5,8,12,16,32,48,64,128,192,320,512,832,1344,2176,3520"); + // properties.setProperty(Options.RW_ALLOCATIONS, + // "1,2,3,5,8,12,16,32,48,64"); - RWStrategy bufferStrategy = (RWStrategy) store - .getBufferStrategy(); + properties.setProperty(Options.DELETE_ON_EXIT, "true"); - RWStore rw = bufferStrategy.getRWStore(); - ArrayList<Integer> sizes = new ArrayList<Integer>(); - TreeMap<Long, Integer> paddrs = new TreeMap<Long, Integer>(); - for (int i = 0; i < 100000; i++) { - int s = r.nextInt(250)+1; - sizes.add(s); - int a = rw.alloc(s, null); - long pa = rw.physicalAddress(a); - assertTrue(paddrs.get(pa) == null); - paddrs.put(pa, a); - } - - for (int i = 0; i < 50; i++) { - int s = r.nextInt(500)+1; - sizes.add(s); - int a = rw.alloc(s, null); - long pa = rw.physicalAddress(a); - paddrs.put(pa, a); - } - - } finally { + properties.setProperty(Options.WRITE_CACHE_ENABLED, "" + writeCacheEnabled); - store.destroy(); - - } + // number of bits in FixedAllocators + properties.setProperty(RWStore.Options.FREE_BITS_THRESHOLD, "50"); - - } - - /** - * Basic allocation test to ensure the FixedAllocators are operating efficiently. - * - * A 90 byte allocation is expected to fit in a 128byte block. If we only allocate - * this fixed block size, then we would expect the physical address to increase by 128 bytes - * for each allocation. - */ - public void test_allocations() { - - Journal store = (Journal) getStore(); + // Size of META_BITS_BLOCKS + properties.setProperty(RWStore.Options.META_BITS_SIZE, "9"); - try { + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, + // "1,2,3,5,8,12,16,32,48,64,128"); // 8K - max blob = 2K * 8K = 16M + // properties.setProperty(RWStore.Options.ALLOCATION_SIZES, + // "1,2,3,5,8,12,16,32,48,64,128"); // 2K max + properties.setProperty(RWStore.Options.ALLOCATION_SIZES, "1,2,3,5,8,12,16"); // 2K + // max - RWStrategy bufferStrategy = (RWStrategy) store - .getBufferStrategy(); + // ensure history retention to force deferredFrees + // properties.setProperty(AbstractTransactionService.Options.MIN_RELEASE_AGE, + // "1"); // Non-zero - RWStore rw = bufferStrategy.getRWStore(); - long numAllocs = rw.getTotalAllocations(); - long startAllocations = rw.getTotalAllocationsSize(); - long faddr = allocBatch(rw, 1000, 275, 320); - faddr = allocBatch(rw, 10000, 90, 128); - faddr = allocBatch(rw, 20000, 45, 64); - - System.out.println("Final allocation: " + faddr - + ", allocations: " + (rw.getTotalAllocations() - numAllocs) - + ", allocated bytes: " + (rw.getTotalAllocationsSize() - startAllocations)); - - store.commit(); - - // Confirm that we can re-open the journal after commit - store = (Journal) reopenStore(store); - - } finally { + return properties; - store.destroy(); - - } + } - - } - - /** - * Not so much a test as a code coverage exercise. - * - * The output from showAllocReserve confirms the relative merits of - * optimising for space vs density. The DirectFixedAllocators will - * allocate from DirectBuffers, where locality of reference is less - * important than efficient management of the memory, which is optimised - * by allocations in smaller amounts that match the demands at a finer - * granularity. - */ - public void testAllocationReserves() { - final int cReserve16K = 16 * 1024; - final int cReserve128K = 32 * 1024; - - showAllocReserve(false, 64, cReserve16K, cReserve16K); - showAllocReserve(false, 128, cReserve16K, cReserve16K); - showAllocReserve(false, 1024, cReserve16K, cReserve16K); - showAllocReserve(false, 2048, cReserve16K, cReserve16K); - showAllocReserve(false, 3072, cReserve16K, cReserve16K); - showAllocReserve(false, 4096, cReserve16K, cReserve16K); - showAllocReserve(false, 8192, cReserve16K, cReserve16K); - - showAllocReserve(true, 64, cReserve128K, cReserve16K); - showAllocReserve(true, 128, cReserve128K, cReserve16K); - showAllocReserve(true, 1024, cReserve128K, cReserve16K); - showAllocReserve(true, 2048, cReserve128K, cReserve16K); - showAllocReserve(true, 3072, cReserve128K, cReserve16K); - showAllocReserve(true, 4096, cReserve128K, cReserve16K); - showAllocReserve(true, 8192, cReserve128K, cReserve16K); - } - private void showAllocReserve(final boolean optDensity, final int slotSize, final int reserve, final int mod) { - final int ints = FixedAllocator.calcBitSize(optDensity, slotSize, reserve, mod); - // there are max 254 ints available to a FixedAllocator - final int maxuse = (254/(ints+1)) * ints; - System.out.println("Allocate " + ints + ":" + (32 * ints * slotSize) + " for " + slotSize + " in " + reserve + " using " + maxuse + " of 254 possible"); - } - - long allocBatch(RWStore rw, int bsize, int asze, int ainc) { - long curAddress = rw.physicalAddress(rw.alloc(asze, null)); - for (int i = 1; i < bsize; i++) { - int a = rw.alloc(asze, null); - long nxt = rw.physicalAddress(a); - assertTrue("Problem with index: " + i, diff(curAddress, nxt) == ainc || (nxt % 8192 == 0)); - curAddress = nxt; - } - - return curAddress; - } - - int diff(final long cur, final long nxt) { - int ret = (int) (nxt - cur); - return ret < 0 ? -ret : ret; - } + protected IRawStore getStore() { - int[] allocBatchBuffer(RWStore rw, int bsize, int base, int scope) { - int[] retaddrs = new int[bsize]; - - byte[] batchBuffer = new byte[base+scope]; - r.nextBytes(batchBuffer); - for (int i = 0; i < bsize; i++) { - int as = base + r.nextInt(scope); - retaddrs[i] = (int) rw.alloc(batchBuffer, as, null); - } - - return retaddrs; - } + return new Journal(getProperties()); - - /** - * Reallocation tests the freeing of allocated address and the re-use - * within a transaction. - * - * The repeated runs with full reopening of the store check the - * initialization of the allocators on reload. - * - * @throws IOException - */ - public void test_reallocation() throws IOException { - final Properties properties = getProperties(); - File tmpfile = File.createTempFile("TestRW", "rw"); - properties.setProperty(Options.FILE, tmpfile.getAbsolutePath()); - properties.remove(Options.CREATE_TEMP_FILE); - Journal store = new Journal(properties); + } - try { + // /** + // * Test that allocate() pre-extends the store when a record is + // allocated + // * which would overflow the current user extent. + // */ + // public void test_allocPreExtendsStore() { + // + // final Journal store = (Journal) getStore(); + // + // try { + // + // final DiskOnlyStrategy bufferStrategy = (DiskOnlyStrategy) store + // .getBufferStrategy(); + // + // final long nextOffset = store.getRootBlockView() + // .getNextOffset(); + // + // final long length = store.size(); + // + // final long headerSize = FileMetadata.headerSize0; + // + // // #of bytes remaining in the user extent before overflow. + // final long nfree = length - (headerSize + nextOffset); + // + // if (nfree >= Integer.MAX_VALUE) { + // + // /* + // * The test is trying to allocate a single record that will + // * force the store to be extended. This will not work if the + // * store file already has a huge user extent with nothing + // * allocated on it. + // */ + // + // fail("Can't allocate a record with: " + nfree + " bytes"); + // + // } + // + // final int nbytes = (int) nfree; + // + // final long addr = bufferStrategy.allocate(nbytes); + // + // assertNotSame(0L, addr); + // + // assertEquals(nbytes, store.getByteCount(addr)); + // + // // store file was extended. + // assertTrue(store.size() > length); + // + // } finally { + // + // store.destroy(); + // + // } + // + // } - RWStrategy bufferStrategy = (RWStrategy) store - .getBufferStrategy(); + /** + * Test allocate()+read() where the record was never written (the data + * are undefined unless written so there is nothing really to test here + * except for exceptions which might be through for this condition). + */ + public void test_allocate_then_read() { + } - RWStore rw = bufferStrategy.getRWStore(); - long numAllocs = rw.getTotalAllocations(); - long startAllocations = rw.getTotalAllocationsSize(); - - reallocBatch(rw, 1000, 275, 1000); - - store.commit(); - store.close(); - store = new Journal(properties); - bufferStrategy = (RWStrategy) store.getBufferStrategy(); - rw = bufferStrategy.getRWStore(); - - reallocBatch(rw, 1000, 100, 10000); - - store.commit(); - store.close(); - store = new Journal(properties); - bufferStrategy = (RWStrategy) store.getBufferStrategy(); - rw = bufferStrategy.getRWStore(); + /** + * Reallocates the same object several times, then commits and tests + * read back. + * + * + */ + public void test_reallocate() { + final Journal store = (Journal) getStore(); - reallocBatch(rw, 1000, 100, 10000); - - store.commit(); - store.close(); - store = new Journal(properties); - bufferStrategy = (RWStrategy) store.getBufferStrategy(); - rw = bufferStrategy.getRWStore(); + try { - System.out.println("Final allocations: " + (rw.getTotalAllocations() - numAllocs) - + ", allocated bytes: " + (rw.getTotalAllocationsSize() - startAllocations) - + ", file length: " + rw.getStoreFile().length()); - } finally { + byte[] buf = new byte[1024]; // 2Mb buffer of random data + r.nextBytes(buf); - store.destroy(); - - } + ByteBuffer bb = ByteBuffer.wrap(buf); - - } + RWStrategy bs = (RWStrategy) store.getBufferStrategy(); - private long reallocBatch(RWStore rw, int tsts, int sze, int grp) { - long[] addr = new long[grp]; - for (int i = 0; i < grp; i++) { - addr[i] = rw.alloc(2 + r.nextInt(sze), null); - } - for (int t = 0; t < tsts; t++) { - for (int i = 0; i < grp; i++) { - long old = addr[i]; - int asze = 2 + r.nextInt(sze); - addr[i] = rw.alloc(asze, null); - - if (i % 2 == 0) - rw.free(old, 1); // dunno what the real size is - } - } - - return 0L; + RWStore rw = bs.getRWStore(); + + long faddr1 = bs.write(bb); + bb.position(0); + // bs.delete(faddr); + + long faddr2 = bs.write(bb); + bb.position(0); + + store.commit(); + + rw.reset(); + + ByteBuffer inbb1 = bs.read(faddr1); + ByteBuffer inbb2 = bs.read(faddr2); + + assertEquals(bb, inbb1); + assertEquals(bb, inbb2); + + } finally { + store.destroy(); + } + } - - public void test_reallocationWithReadAndReopen() { - - Journal store = (Journal) getStore(); - try { + /** + * Test write of a record and then update of a slice of that record. + * <p> + * Note: Since the record was written but not flushed it will be found + * in the write cache by update(). + */ + public void test_write_plus_update() { + } + /** + * Ensures the allocation of unique addresses by mapping allocated + * address with uniqueness assertion against physical address. + */ + public void test_addressing() { + + final Journal store = (Journal) getStore(); + + try { + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); RWStore rw = bufferStrategy.getRWStore(); - + ArrayList<Integer> sizes = new ArrayList<Integer>(); + TreeMap<Long, Integer> paddrs = new TreeMap<Long, Integer>(); + for (int i = 0; i < 100000; i++) { + int s = r.nextInt(250) + 1; + sizes.add(s); + int a = rw.alloc(s, null); + long pa = rw.physicalAddress(a); + assertTrue(paddrs.get(pa) == null); + paddrs.put(pa, a); + } + + for (int i = 0; i < 50; i++) { + int s = r.nextInt(500) + 1; + sizes.add(s); + int a = rw.alloc(s, null); + long pa = rw.physicalAddress(a); + paddrs.put(pa, a); + } + + } finally { + + store.destroy(); + + } + + } + + /** + * Basic allocation test to ensure the FixedAllocators are operating + * efficiently. + * + * A 90 byte allocation is expected to fit in a 128byte block. If we + * only allocate this fixed block size, then we would expect the + * physical address to increase by 128 bytes for each allocation. + */ + public void test_allocations() { + + Journal store = (Journal) getStore(); + + try { + + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + + RWStore rw = bufferStrategy.getRWStore(); + long numAllocs = rw.getTotalAllocations(); + long startAllocations = rw.getTotalAllocationsSize(); + long faddr = allocBatch(rw, 1000, 275, 320); + faddr = allocBatch(rw, 10000, 90, 128); + faddr = allocBatch(rw, 20000, 45, 64); + + System.out.println("Final allocation: " + faddr + ", allocations: " + + (rw.getTotalAllocations() - numAllocs) + ", allocated bytes: " + + (rw.getTotalAllocationsSize() - startAllocations)); + + store.commit(); + + // Confirm that we can re-open the journal after commit + store = (Journal) reopenStore(store); + + } finally { + + store.destroy(); + + } + + } + + /** + * Not so much a test as a code coverage exercise. + * + * The output from showAllocReserve confirms the relative merits of + * optimising for space vs density. The DirectFixedAllocators will + * allocate from DirectBuffers, where locality of reference is less + * important than efficient management of the memory, which is optimised + * by allocations in smaller amounts that match the demands at a finer + * granularity. + */ + public void testAllocationReserves() { + final int cReserve16K = 16 * 1024; + final int cReserve128K = 32 * 1024; + + showAllocReserve(false, 64, cReserve16K, cReserve16K); + showAllocReserve(false, 128, cReserve16K, cReserve16K); + showAllocReserve(false, 1024, cReserve16K, cReserve16K); + showAllocReserve(false, 2048, cReserve16K, cReserve16K); + showAllocReserve(false, 3072, cReserve16K, cReserve16K); + showAllocReserve(false, 4096, cReserve16K, cReserve16K); + showAllocReserve(false, 8192, cReserve16K, cReserve16K); + + showAllocReserve(true, 64, cReserve128K, cReserve16K); + showAllocReserve(true, 128, cReserve128K, cReserve16K); + showAllocReserve(true, 1024, cReserve128K, cReserve16K); + showAllocReserve(true, 2048, cReserve128K, cReserve16K); + showAllocReserve(true, 3072, cReserve128K, cReserve16K); + showAllocReserve(true, 4096, cReserve128K, cReserve16K); + showAllocReserve(true, 8192, cReserve128K, cReserve16K); + } + + private void showAllocReserve(final boolean optDensity, final int slotSize, final int reserve, final int mod) { + final int ints = FixedAllocator.calcBitSize(optDensity, slotSize, reserve, mod); + // there are max 254 ints available to a FixedAllocator + final int maxuse = (254 / (ints + 1)) * ints; + System.out.println("Allocate " + ints + ":" + (32 * ints * slotSize) + " for " + slotSize + " in " + + reserve + " using " + maxuse + " of 254 possible"); + } + + long allocBatch(RWStore rw, int bsize, int asze, int ainc) { + long curAddress = rw.physicalAddress(rw.alloc(asze, null)); + for (int i = 1; i < bsize; i++) { + int a = rw.alloc(asze, null); + long nxt = rw.physicalAddress(a); + assertTrue("Problem with index: " + i, diff(curAddress, nxt) == ainc || (nxt % 8192 == 0)); + curAddress = nxt; + } + + return curAddress; + } + + int diff(final long cur, final long nxt) { + int ret = (int) (nxt - cur); + return ret < 0 ? -ret : ret; + } + + int[] allocBatchBuffer(RWStore rw, int bsize, int base, int scope) { + int[] retaddrs = new int[bsize]; + + byte[] batchBuffer = new byte[base + scope]; + r.nextBytes(batchBuffer); + for (int i = 0; i < bsize; i++) { + int as = base + r.nextInt(scope); + retaddrs[i] = (int) rw.alloc(batchBuffer, as, null); + } + + return retaddrs; + } + + /** + * Reallocation tests the freeing of allocated address and the re-use + * within a transaction. + * + * The repeated runs with full reopening of the store check the + * initialization of the allocators on reload. + * + * @throws IOException + */ + public void test_reallocation() throws IOException { + final Properties properties = getProperties(); + File tmpfile = File.createTempFile("TestRW", "rw"); + properties.setProperty(Options.FILE, tmpfile.getAbsolutePath()); + properties.remove(Options.CREATE_TEMP_FILE); + Journal store = new Journal(properties); + + try { + + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + + RWStore rw = bufferStrategy.getRWStore(); + long numAllocs = rw.getTotalAllocations(); + long startAllocations = rw.getTotalAllocationsSize(); + + reallocBatch(rw, 1000, 275, 1000); + + store.commit(); + store.close(); + store = new Journal(properties); + bufferStrategy = (RWStrategy) store.getBufferStrategy(); + rw = bufferStrategy.getRWStore(); + + reallocBatch(rw, 1000, 100, 10000); + + store.commit(); + store.close(); + store = new Journal(properties); + bufferStrategy = (RWStrategy) store.getBufferStrategy(); + rw = bufferStrategy.getRWStore(); + + reallocBatch(rw, 1000, 100, 10000); + + store.commit(); + store.close(); + store = new Journal(properties); + bufferStrategy = (RWStrategy) store.getBufferStrategy(); + rw = bufferStrategy.getRWStore(); + + System.out.println("Final allocations: " + (rw.getTotalAllocations() - numAllocs) + + ", allocated bytes: " + (rw.getTotalAllocationsSize() - startAllocations) + ", file length: " + + rw.getStoreFile().length()); + } finally { + + store.destroy(); + + } + + } + + private long reallocBatch(RWStore rw, int tsts, int sze, int grp) { + long[] addr = new long[grp]; + for (int i = 0; i < grp; i++) { + addr[i] = rw.alloc(2 + r.nextInt(sze), null); + } + for (int t = 0; t < tsts; t++) { + for (int i = 0; i < grp; i++) { + long old = addr[i]; + int asze = 2 + r.nextInt(sze); + addr[i] = rw.alloc(asze, null); + + if (i % 2 == 0) + rw.free(old, 1); // dunno what the real size is + } + } + + return 0L; + } + + public void test_reallocationWithReadAndReopen() { + + Journal store = (Journal) getStore(); + + try { + + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + + RWStore rw = bufferStrategy.getRWStore(); + final int tcount = 2000; // increase to ramp up stress levels long numAllocs = rw.getTotalAllocations(); long startAllocations = rw.getTotalAllocationsSize(); // reallocBatchWithRead(bufferStrategy, 100000, 275, 5); - reallocBatchWithRead(store, 1, 100, 250, tcount, true, true); + reallocBatchWithRead(store, 1, 100, 250, tcount, true, true); store.close(); - + // added to try and foce bug System.out.println("Re-open Journal"); store = (Journal) getStore(); reallocBatchWithRead(store, 1, 800, 1500, tcount, true, true); reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); - reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); + reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); store.close(); // .. end add to force bug - + System.out.println("Re-open Journal"); store = (Journal) getStore(); reallocBatchWithRead(store, 1, 2000, 10000, tcount, true, true); @@ -676,7 +689,7 @@ store = (Journal) getStore(); reallocBatchWithRead(store, 1, 800, 1256, tcount, true, true); reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); - reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); + reallocBatchWithRead(store, 1, 50, 250, tcount, true, true); showStore(store); store.close(); System.out.println("Re-open Journal"); @@ -699,88 +712,87 @@ } finally { store.destroy(); - - } - - } - - void showStore(Journal store) { - RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + } + } + + void showStore(Journal store) { + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + RWStore rw = bufferStrategy.getRWStore(); - System.out.println("Fixed Allocators: " + rw.getFixedAllocatorCount() - + ", heap allocated: " + rw.getFileStorage() - + ", utilised bytes: " + rw.getAllocatedSlots() - + ", file length: " + rw.getStoreFile().length()); + System.out.println("Fixed Allocators: " + rw.getFixedAllocatorCount() + ", heap allocated: " + + rw.getFileStorage() + ", utilised bytes: " + rw.getAllocatedSlots() + ", file length: " + + rw.getStoreFile().length()); - } - - // Only realloc 1/5 - byte allocChar = 0; - private long reallocBatchWithRead(Journal store, int tsts, int min, int sze, int grp, boolean commit, boolean reopen) { - allocChar = (byte) (allocChar+1); - - RWStrategy bs = (RWStrategy) store - .getBufferStrategy(); + } - byte[] buf = new byte[sze+4]; // extra for checksum - // r.nextBytes(buf); - for (int i = 0; i < buf.length; i++) { - buf[i] = allocChar; - } - - - RWStore rw = bs.getRWStore(); - - long[] addr = new long[grp/5]; - int[] szes = new int[grp]; - for (int i = 0; i < grp; i++) { - szes[i] = min + r.nextInt(sze-min); - ByteBuffer bb = ByteBuffer.wrap(buf, 0, szes[i]); - if (i % 5 == 0) - addr[i/5] = bs.write(bb); - } - - if (commit) { - store.commit(); - } - - for (int t = 0; t < tsts; t++) { - for (int i = 0; i < (grp/5); i++) { - long old = addr[i]; - try { - bs.read(old); - } catch (Exception e) { - throw new RuntimeException("problem handling read: " + i + " in test: " + t + " from address: " + old, e); - } - ByteBuffer bb = ByteBuffer.wrap(buf, 0, szes[i]); - addr[i] = bs.write(bb); - bb.flip(); - bs.delete(old); - } - } - - if (commit) { - store.commit(); - - if (reopen) - rw.reset(); - - } - return 0L; + // Only realloc 1/5 + byte allocChar = 0; + + private long reallocBatchWithRead(Journal store, int tsts, int min, int sze, int grp, boolean commit, + boolean reopen) { + allocChar = (byte) (allocChar + 1); + + RWStrategy bs = (RWStrategy) store.getBufferStrategy(); + + byte[] buf = new byte[sze + 4]; // extra for checksum + // r.nextBytes(buf); + for (int i = 0; i < buf.length; i++) { + buf[i] = allocChar; + } + + RWStore rw = bs.getRWStore(); + + long[] addr = new long[grp / 5]; + int[] szes = new int[grp]; + for (int i = 0; i < grp; i++) { + szes[i] = min + r.nextInt(sze - min); + ByteBuffer bb = ByteBuffer.wrap(buf, 0, szes[i]); + if (i % 5 == 0) + addr[i / 5] = bs.write(bb); + } + + if (commit) { + store.commit(); + } + + for (int t = 0; t < tsts; t++) { + for (int i = 0; i < (grp / 5); i++) { + long old = addr[i]; + try { + bs.read(old); + } catch (Exception e) { + throw new RuntimeException("problem handling read: " + i + " in test: " + t + " from address: " + + old, e); + } + ByteBuffer bb = ByteBuffer.wrap(buf, 0, szes[i]); + addr[i] = bs.write(bb); + bb.flip(); + bs.delete(old); + } + } + + if (commit) { + store.commit(); + + if (reopen) + rw.reset(); + + } + return 0L; } - /** - * Adjust tcount to increase stress levels - */ - public void test_stressReallocationWithRead() { - - Journal store = (Journal) getStore(); + /** + * Adjust tcount to increase stress levels + */ + public void test_stressReallocationWithRead() { - try { + Journal store = (Journal) getStore(); + try { + final int tcount = 2000; // increase to ramp up stress levels RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); @@ -790,16 +802,16 @@ long numAllocs = rw.getTotalAllocations(); long startAllocations = rw.getTotalAllocationsSize(); // reallocBatchWithRead(bufferStrategy, 100000, 275, 5); - reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); - - reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); - reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 800, 1500, tcount, false, false); reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); - reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); // Extend file with sizeable allocations reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); @@ -807,14 +819,14 @@ reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); - + reallocBatchWithRead(store, 1, 250, 500, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 800, 1256, tcount, false, false); reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); - reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); + reallocBatchWithRead(store, 1, 50, 250, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); @@ -829,296 +841,295 @@ reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); reallocBatchWithRead(store, 1, 5000, 10000, tcount, false, false); - + reallocBatchWithRead(store, 1, 500, 1000, tcount, false, false); reallocBatchWithRead(store, 1, 1000, 2000, tcount, false, false); reallocBatchWithRead(store, 1, 500, 1000, tcount, false, false); - + store.commit(); - + showStore(store); - + store.close(); - + store = (Journal) getStore(); showStore(store); } finally { store.destroy(); - - } - - } + } - /** - * Test of blob allocation, does not check on read back, just the allocation - */ - public void test_blob_allocs() { - if (false) { - return; - } - - final Journal store = (Journal) getStore(); + } - try { + /** + * Test of blob allocation, does not check on read back, just the + * allocation + */ + public void test_blob_allocs() { + if (false) { + return; + } - RWStrategy bufferStrategy = (RWStrategy) store - .getBufferStrategy(); + final Journal store = (Journal) getStore(); - RWStore rw = bufferStrategy.getRWStore(); - long numAllocs = rw.getTotalAllocations(); - long startAllocations = rw.getTotalAllocationsSize(); - int startBlob = 1024 * 256; - int endBlob = 1024 * 1256; - int[] faddrs = allocBatchBuffer(rw, 100, startBlob, endBlob); - + try { + + RWStrategy bufferStrategy = (RWStrategy) store.getBufferStrategy(); + + RWStore rw = bufferStrategy.getRWStore(); + long numAllocs = rw.getTotalAllocations(); + long startAllocations = rw.getTotalAllocationsSize(); + int startBlob = 1024 * 256; + int endBlob = 1024 * 1256; + int[] faddrs = allocBatchBuffer(rw, 100, startBlob, endBlob); + final StringBuilder str = new StringBuilder(); rw.getStorageStats().showStats(str); - System.out.println(str); - } finally { + System.out.println(str); + } finally { - store.destroy(); - - } - - } - /** - * Test of blob allocation and read-back, firstly from cache and then from disk. - */ - public void test_blob_readBack() { - - final Journal store = (Journal) getStore(); + store.destroy(); - try { - final RWStrategy bs = (RWStrategy) store - .getBufferStrategy(); + } - final RWStore rw = bs.getRWStore(); - + } - byte[] buf = new byte[2 * 1024 * 1024]; // 5Mb buffer of random data - r.nextBytes(buf); - - ByteBuffer bb = ByteBuffer.wrap(buf); + /** + * Test of blob allocation and read-back, firstly from cache and then + * from disk. + */ + public void test_blob_readBack() { - long faddr = bs.write(bb); // rw.alloc(buf, buf.length); - - log.info("Blob Allocation at " + rw.convertFromAddr(faddr)); - - bb.position(0); - - ByteBuffer rdBuf = bs.read(faddr); - - assertEquals(bb, rdBuf); - - System.out.println("Now commit to disk"); - - store.commit(); - - // Now reset - clears writeCache and reinits from disk - rw.reset(); - - rdBuf = bs.read(faddr); - assertEquals(bb, rdBuf); + final Journal store = (Journal) getStore(); - } finally { + try { + final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); - store.destroy(); - - } - - } - - /** - * Test of blob allocation and read-back, firstly from cache and then from disk. - * @throws InterruptedException - */ - public void test_blob_realloc() throws InterruptedException { - - final Journal store = (Journal) getStore(); + final RWStore rw = bs.getRWStore(); - try { + byte[] buf = new byte[2 * 1024 * 1024]; // 5Mb buffer of random + // data + r.nextBytes(buf); - final byte[] buf = new byte[1024 * 2048]; // 2Mb buffer of random data - r.nextBytes(buf); - - final ByteBuffer bb = ByteBuffer.wrap(buf); + ByteBuffer bb = ByteBuffer.wrap(buf); - final RWStrategy bs = (RWStrategy) store - .getBufferStrategy(); + long faddr = bs.write(bb); // rw.alloc(buf, buf.length); - final RWStore rw = bs.getRWStore(); - - long faddr = bs.write(bb); // rw.alloc(buf, buf.length); - - bb.position(0); - - ByteBuffer rdBuf = bs.read(faddr); - - assertEquals(bb, rdBuf); - - // now delete the memory - bs.delete(faddr); - - // verify immediateFree! - assertEquals(0L,bs.getPhysicalAddress(faddr)); - - // allocate another address, might (or might not) be the same. - faddr = bs.write(bb); // rw.alloc(buf, buf.length); - final long pa = bs.getPhysicalAddress(faddr); - bb.position(0); - - System.out.println("Now commit to disk (1)"); - - store.commit(); - - // Now reset - clears writeCache and reinits from disk - rw.reset(); - - rdBuf = bs.read(faddr); - assertEquals(bb, rdBuf); + log.info("Blob Allocation at " + rw.convertFromAddr(faddr)); - // now delete the memory - bs.delete(faddr); + bb.position(0); - // Must not have been immediately freed if history is retained. - if (rw.getHistoryRetention() != 0) - assertEquals(pa, bs.getPhysicalAddress(faddr)); - else - assertEquals(0L, bs.getPhysicalAddress(faddr)); - + ByteBuffer rdBuf = bs.read(faddr); - /* - * Commit before testing for deferred frees. Since there is a - * prior commit point, we are not allowed to immediately free - * any record from that commit point in order to preserve the - * consistency of the last commit point, so we have to commit - * first then test for deferred frees. - */ - System.out.println("Now commit to disk (2)"); - - store.commit(); - - Thread.currentThread().sleep(10); - - // Request release of deferred frees. - rw.checkDeferredFrees(true/* freeNow */, store); + assertEquals(bb, rdBuf); - assertEquals(0L, bs.getPhysicalAddress(faddr)); + System.out.println("Now commit to disk"); - try { - rdBuf = bs.read(faddr); // should fail with illegal argument - throw new RuntimeException("Fail"); - } catch (Exception ise) { - assertTrue("Expected IllegalArgumentException reading from " + (faddr >> 32) + " instead got: " + ise, ise instanceof IllegalArgumentException); - } - - } finally { + store.commit(); - store.destroy(); - - } - - } + // Now reset - clears writeCache and reinits from disk + rw.reset(); - + rdBuf = bs.read(faddr); + assertEquals(bb, rdBuf); + + } finally { + + store.destroy(); + + } + + } + /** - * Ttest write() + flush() + update() - for this case the data have been - * flushed from the write cache so the update will be a random write on - * the file rather than being buffered by the write cache. - */ - public void test_write_flush_update() { - - final Journal store = (Journal) getStore(); + * Test of blob allocation and read-back, firstly from cache and then + * from disk. + * + * @throws InterruptedException + */ + public void test_blob_realloc() throws InterruptedException { - try { + final Journal store = (Journal) getStore(); - RWStrategy bufferStrategy = (RWStrategy) store - .getBufferStrategy(); + try { - final int nbytes = 60; + final byte[] buf = new byte[1024 * 2048]; // 2Mb buffer of + // random data + r.nextBytes(buf); - // random data. - byte[] a = new byte[nbytes]; - r.nextBytes(a); - - // write a new record. - final long addr = bufferStrategy.write(ByteBuffer.wrap(a)); + final ByteBuffer bb = ByteBuffer.wrap(buf); - assertEquals(nbytes, store.getByteCount(addr)); - - // Note: This will result flush the write cache. - store.commit(); - - /* - * Read back the record and verify the update is visible. - */ - { - - final ByteBuffer b = bufferStrategy.read(addr); - - assertNotNull(b); - - for(int i=20; i<40; i++) { - - assertEquals("data differs at offset=" + i, a[i], b - .get(i)); - - } - - } - - } finally { + final RWStrategy bs = (RWStrategy) store.getBufferStrategy(); - store.destroy(); - - } + final RWStore rw = bs.getRWStore(); - } + long faddr = bs.write(bb); // rw.alloc(buf, buf.length); - public void test_metaAlloc() { - - Journal store = (Journal) getStore(); + bb.position(... [truncated message content] |
From: <tho...@us...> - 2010-12-19 23:36:08
|
Revision: 4022 http://bigdata.svn.sourceforge.net/bigdata/?rev=4022&view=rev Author: thompsonbry Date: 2010-12-19 23:36:02 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Adding a (modified) version of IsLiteral. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java 2010-12-19 21:52:17 UTC (rev 4021) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java 2010-12-19 23:36:02 UTC (rev 4022) @@ -1,74 +1,88 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -package com.bigdata.rdf.internal.constraints; - -import com.bigdata.rdf.internal.IV; -import com.bigdata.relation.rule.IBindingSet; -import com.bigdata.relation.rule.IConstant; -import com.bigdata.relation.rule.IConstraint; -import com.bigdata.relation.rule.IVariable; - -/** - * Imposes the constraint <code>isLiteral(x)</code>. - */ -public class IsLiteral implements IConstraint { - - /** - * - */ - private static final long serialVersionUID = 3125106876006900339L; - - private final IVariable x; - - public IsLiteral(final IVariable x) { - - if (x == null) - throw new IllegalArgumentException(); - - this.x = x; - - } - - public boolean accept(IBindingSet s) { - - // get binding for "x". - final IConstant x = s.get(this.x); - - if (x == null) - return true; // not yet bound. - - final IV iv = (IV) x.get(); - - return iv.isLiteral(); - - } - - public IVariable[] getVariables() { - - return new IVariable[] { x }; - - } - -} +/* + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.internal.constraints; + +import com.bigdata.rdf.internal.IV; +import com.bigdata.relation.rule.IBindingSet; +import com.bigdata.relation.rule.IConstant; +import com.bigdata.relation.rule.IConstraint; +import com.bigdata.relation.rule.IVariable; + +/** + * Imposes the constraint <code>x != y</code>. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: NE.java 2889 2010-05-20 16:11:35Z mrpersonick $ + * + * FIXME Write unit tests for this. + */ +public class IsLiteral implements IConstraint { + + /** + * + */ + private static final long serialVersionUID = 3125106876006900339L; + + public final IVariable x; + + private final boolean stringsOnly; + + public IsLiteral(final IVariable x) { + this(x, false); + } + + public IsLiteral(final IVariable x, final boolean stringsOnly) { + + if (x == null) + throw new IllegalArgumentException(); + + this.x = x; + this.stringsOnly = stringsOnly; + + } + + public boolean accept(IBindingSet s) { + + // get binding for "x". + final IConstant x = s.get(this.x); + + if (x == null) + return true; // not yet bound. + + IV iv = (IV) x.get(); + if (stringsOnly) + return iv.isLiteral() && !iv.isInline(); + else + return iv.isLiteral(); + + } + + public IVariable[] getVariables() { + + return new IVariable[] { x }; + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 21:52:23
|
Revision: 4021 http://bigdata.svn.sourceforge.net/bigdata/?rev=4021&view=rev Author: thompsonbry Date: 2010-12-19 21:52:17 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Added constraints to detect inline values (IVs) and literals when examining an IV in an SPO. Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsInline.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java Added: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsInline.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsInline.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsInline.java 2010-12-19 21:52:17 UTC (rev 4021) @@ -0,0 +1,84 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.internal.constraints; + +import com.bigdata.rdf.internal.IV; +import com.bigdata.relation.rule.IBindingSet; +import com.bigdata.relation.rule.IConstant; +import com.bigdata.relation.rule.IConstraint; +import com.bigdata.relation.rule.IVariable; + +/** + * Imposes the constraint <code>isInline(x)</code>. + */ +public class IsInline implements IConstraint { + + /** + * + */ + private static final long serialVersionUID = 3125106876006900339L; + + private final IVariable x; + + /** + * If true, only accept variable bindings for {@link #x} that have an + * inline internal value {@link IV}. Otherwise only accept variable bindings + * that are not inline in the statement indices. + * <p> + * @see IV#isInline() + */ + private final boolean inline; + + public IsInline(final IVariable x, final boolean inline) { + + if (x == null) + throw new IllegalArgumentException(); + + this.x = x; + this.inline = inline; + + } + + public boolean accept(IBindingSet s) { + + // get binding for "x". + final IConstant x = s.get(this.x); + + if (x == null) + return true; // not yet bound. + + final IV iv = (IV) x.get(); + + return iv.isInline() == inline; + + } + + public IVariable[] getVariables() { + + return new IVariable[] { x }; + + } + +} Added: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/IsLiteral.java 2010-12-19 21:52:17 UTC (rev 4021) @@ -0,0 +1,74 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.internal.constraints; + +import com.bigdata.rdf.internal.IV; +import com.bigdata.relation.rule.IBindingSet; +import com.bigdata.relation.rule.IConstant; +import com.bigdata.relation.rule.IConstraint; +import com.bigdata.relation.rule.IVariable; + +/** + * Imposes the constraint <code>isLiteral(x)</code>. + */ +public class IsLiteral implements IConstraint { + + /** + * + */ + private static final long serialVersionUID = 3125106876006900339L; + + private final IVariable x; + + public IsLiteral(final IVariable x) { + + if (x == null) + throw new IllegalArgumentException(); + + this.x = x; + + } + + public boolean accept(IBindingSet s) { + + // get binding for "x". + final IConstant x = s.get(this.x); + + if (x == null) + return true; // not yet bound. + + final IV iv = (IV) x.get(); + + return iv.isLiteral(); + + } + + public IVariable[] getVariables() { + + return new IVariable[] { x }; + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 21:51:51
|
Revision: 4020 http://bigdata.svn.sourceforge.net/bigdata/?rev=4020&view=rev Author: thompsonbry Date: 2010-12-19 21:51:44 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Added support for inlining xsd:dateTime literals. This is disabled by default for backward compatibility. We should consider enabling it by default at the next release and providing a migration path for older KB instances. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtensionFactory.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/LexiconConfiguration.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/XSD.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/SampleExtensionFactory.java branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/TestEncodeDecodeKeys.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DateTimeExtension.java Added: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DateTimeExtension.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DateTimeExtension.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DateTimeExtension.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -0,0 +1,131 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.internal; + +import java.util.GregorianCalendar; +import java.util.TimeZone; + +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; +import javax.xml.datatype.XMLGregorianCalendar; + +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; + +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; + +/** + * This implementation of {@link IExtension} implements inlining for literals + * that represent xsd:dateTime literals. These literals will be stored as time + * in milliseconds since the epoch. The milliseconds are encoded as an inline + * long. + */ +public class DateTimeExtension<V extends BigdataValue> implements IExtension<V> { + + private final BigdataURI dateTime; + + public DateTimeExtension(final IDatatypeURIResolver resolver) { + + this.dateTime = resolver.resolve(XSD.DATETIME); + + } + + public BigdataURI getDatatype() { + + return dateTime; + + } + + /** + * Attempts to convert the supplied value into an epoch representation. + * Tests for a literal value with the correct datatype that can be converted + * to a positive long integer. Encodes the long in a delegate + * {@link XSDLongIV}, and returns an {@link ExtensionIV} to wrap the native + * type. + */ + public ExtensionIV createIV(final Value value) { + + if (value instanceof Literal == false) + throw new IllegalArgumentException(); + + final Literal lit = (Literal) value; + + final URI dt = lit.getDatatype(); + + if (dt == null || !XSD.DATETIME.stringValue().equals(dt.stringValue())) + throw new IllegalArgumentException(); + + final String s = value.stringValue(); + + final XMLGregorianCalendar c = XMLDatatypeUtil.parseCalendar(s); + + /* + * Returns the current time as UTC milliseconds from the epoch + */ + final long l = c.toGregorianCalendar().getTimeInMillis(); + + final AbstractLiteralIV delegate = new XSDLongIV(l); + + return new ExtensionIV(delegate, (TermId) getDatatype().getIV()); + + } + + /** + * Use the long value of the {@link XSDLongIV} delegate (which represents + * milliseconds since the epoch) to create a an XMLGregorianCalendar + * object (GMT timezone). Use the XMLGregorianCalendar to create a datatype + * literal value with the xsd:dateTime datatype. + */ + public V asValue(final ExtensionIV iv, final BigdataValueFactory vf) { + + /* + * Milliseconds since the epoch. + */ + final long l = iv.getDelegate().longValue(); + + final GregorianCalendar c = + new GregorianCalendar(TimeZone.getTimeZone("GMT")); + c.setTimeInMillis(l); + + try { + + final XMLGregorianCalendar xmlGC = + DatatypeFactory.newInstance().newXMLGregorianCalendar(c); + + return (V) vf.createLiteral(xmlGC); + + } catch (DatatypeConfigurationException ex) { + + throw new IllegalArgumentException("bad iv: " + iv, ex); + + } + + } + +} Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -1,26 +1,36 @@ -package com.bigdata.rdf.internal; - -/** - * Empty {@link IExtensionFactory}. - */ -public class DefaultExtensionFactory implements IExtensionFactory { - - private final IExtension[] extensions; - - public DefaultExtensionFactory() { - - extensions = new IExtension[0]; - - } - - public void init(final IDatatypeURIResolver resolver) { - - } - - public IExtension[] getExtensions() { - - return extensions; - - } - -} +package com.bigdata.rdf.internal; + +import java.util.Collection; +import java.util.LinkedList; + +/** + * Empty {@link IExtensionFactory}. + */ +public class DefaultExtensionFactory implements IExtensionFactory { + + private final Collection<IExtension> extensions; + + private volatile IExtension[] extensionsArray; + + public DefaultExtensionFactory() { + + extensions = new LinkedList<IExtension>(); + + } + + public void init(final IDatatypeURIResolver resolver, + final boolean inlineDateTimes) { + + if (inlineDateTimes) + extensions.add(new DateTimeExtension(resolver)); + extensionsArray = extensions.toArray(new IExtension[extensions.size()]); + + } + + public IExtension[] getExtensions() { + + return extensionsArray; + + } + +} Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtensionFactory.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtensionFactory.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/IExtensionFactory.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -1,52 +1,56 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rdf.internal; - -/** - * IExtensionFactories are responsible for enumerating what extensions are - * supported for a particular database configuration. Bigdata comes packaged - * with a {@link SampleExtensionFactory} that supplies two starter extensions - - * the {@link EpochExtension} (for representing time since the epoch as a long - * integer) and the {@link ColorsEnumExtension} (a sample extension for how to - * represent an enumeration via inline literals). - */ -public interface IExtensionFactory { - - /** - * This will be called very early in the IExtensionFactory lifecycle so that - * the {@link TermId}s for the {@link IExtension}'s datatype URIs will be on - * hand when needed. - * - * @param resolver - * the datatype URI resolver - */ - void init(final IDatatypeURIResolver resolver); - - /** - * Return the supported extensions. - */ - IExtension[] getExtensions(); - -} +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.internal; + +/** + * IExtensionFactories are responsible for enumerating what extensions are + * supported for a particular database configuration. Bigdata comes packaged + * with a {@link SampleExtensionFactory} that supplies two starter extensions - + * the {@link EpochExtension} (for representing time since the epoch as a long + * integer) and the {@link ColorsEnumExtension} (a sample extension for how to + * represent an enumeration via inline literals). + */ +public interface IExtensionFactory { + + /** + * This will be called very early in the IExtensionFactory lifecycle so that + * the {@link TermId}s for the {@link IExtension}'s datatype URIs will be on + * hand when needed. + * + * @param resolver + * the datatype URI resolver + * @param inlineDateTimes + * if true, inine the xsd:dateTime datatype using the + * {@link DateTimeExtension} class. + */ + void init(final IDatatypeURIResolver resolver, + final boolean inlineDateTimes); + + /** + * Return the supported extensions. + */ + IExtension[] getExtensions(); + +} Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/LexiconConfiguration.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/LexiconConfiguration.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/LexiconConfiguration.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -1,313 +1,315 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -/* - * Created July 10, 2010 - */ - -package com.bigdata.rdf.internal; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.log4j.Logger; -import org.openrdf.model.BNode; -import org.openrdf.model.Literal; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.datatypes.XMLDatatypeUtil; -import com.bigdata.rdf.model.BigdataURI; -import com.bigdata.rdf.model.BigdataValue; -import com.bigdata.rdf.model.BigdataValueFactory; - -/** - * An object which describes which kinds of RDF Values are inlined into the - * statement indices and how other RDF Values are coded into the lexicon. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class LexiconConfiguration<V extends BigdataValue> - implements ILexiconConfiguration<V> { - - protected static final Logger log = - Logger.getLogger(LexiconConfiguration.class); - - private final boolean inlineLiterals, inlineBNodes; - - private final IExtensionFactory xFactory; - - private final Map<TermId, IExtension> termIds; - - private final Map<String, IExtension> datatypes; - - public LexiconConfiguration(final boolean inlineLiterals, - final boolean inlineBNodes, final IExtensionFactory xFactory) { - - this.inlineLiterals = inlineLiterals; - this.inlineBNodes = inlineBNodes; - this.xFactory = xFactory; - - termIds = new HashMap<TermId, IExtension>(); - datatypes = new HashMap<String, IExtension>(); - - } - - public void initExtensions(final IDatatypeURIResolver resolver) { - - xFactory.init(resolver); - - for (IExtension extension : xFactory.getExtensions()) { - BigdataURI datatype = extension.getDatatype(); - if (datatype == null) - continue; - termIds.put((TermId) datatype.getIV(), extension); - datatypes.put(datatype.stringValue(), extension); - } - - } - - public V asValue(final ExtensionIV iv, final BigdataValueFactory vf) { - final TermId datatype = iv.getExtensionDatatype(); - return (V) termIds.get(datatype).asValue(iv, vf); - } - - public IV createInlineIV(final Value value) { - - // we know right away we can't handle URIs - if (value instanceof URI) - return null; - - if (value instanceof Literal) { - - final Literal l = (Literal) value; - - final URI datatype = l.getDatatype(); - - // not a datatyped literal - if (datatype == null) - return null; - - if (datatypes.containsKey(datatype.stringValue())) { - - final IExtension xFactory = - datatypes.get(datatype.stringValue()); - - try { - - final IV iv = xFactory.createIV(value); - - if (iv != null && value instanceof BigdataValue) - ((BigdataValue) value).setIV(iv); - - return iv; - - } catch (Exception ex) { - - log.warn("problem creating inline internal value for " + - "extension datatype: " + value.stringValue()); - - /* - * Some sort of parse error in the literal value most - * likely. Resort to term identifiers. - */ - return null; - - } - - } - - // get the native DTE - final DTE dte = DTE.valueOf(datatype); - - // no native DTE for this datatype - if (dte == null) - return null; - - // check to see if we are inlining literals of this type - if (!isInline(VTE.LITERAL, dte)) - return null; - - final String v = value.stringValue(); - - IV iv = null; - - try { - - switch (dte) { - case XSDBoolean: - iv = new XSDBooleanIV(XMLDatatypeUtil.parseBoolean(v)); - break; - case XSDByte: - iv = new XSDByteIV(XMLDatatypeUtil.parseByte(v)); - break; - case XSDShort: - iv = new XSDShortIV(XMLDatatypeUtil.parseShort(v)); - break; - case XSDInt: - iv = new XSDIntIV(XMLDatatypeUtil.parseInt(v)); - break; - case XSDLong: - iv = new XSDLongIV(XMLDatatypeUtil.parseLong(v)); - break; - case XSDFloat: - iv = new XSDFloatIV(XMLDatatypeUtil.parseFloat(v)); - break; - case XSDDouble: - iv = new XSDDoubleIV(XMLDatatypeUtil.parseDouble(v)); - break; - case XSDInteger: - iv = new XSDIntegerIV(XMLDatatypeUtil.parseInteger(v)); - break; - case XSDDecimal: - iv = new XSDDecimalIV(XMLDatatypeUtil.parseDecimal(v)); - break; - case UUID: - iv = new UUIDLiteralIV(UUID.fromString(v)); - break; - default: - iv = null; - } - - } catch (NumberFormatException ex) { - - // some dummy doesn't know how to format a number - // default to term identifier for this term - - log.warn("number format exception: " + v); - - } - - if (iv != null && value instanceof BigdataValue) - ((BigdataValue) value).setIV(iv); - - return iv; - - } else if (value instanceof BNode) { - - final BNode b = (BNode) value; - - final String id = b.getID(); - - final char c = id.charAt(0); - - if (c == 'u') { - - try { - - final UUID uuid = UUID.fromString(id.substring(1)); - - if (!uuid.toString().equals(id.substring(1))) - return null; - - if (!isInline(VTE.BNODE, DTE.UUID)) - return null; - - final IV iv = new UUIDBNodeIV(uuid); - - if (value instanceof BigdataValue) - ((BigdataValue) value).setIV(iv); - - return iv; - - } catch (Exception ex) { - - // string id could not be converted to a UUID - - } - - } else if (c == 'i') { - - try { - - final Integer i = Integer.valueOf(id.substring(1)); - - // cannot normalize id, needs to remain syntactically identical - if (!i.toString().equals(id.substring(1))) - return null; - - if (!isInline(VTE.BNODE, DTE.XSDInt)) - return null; - - final IV iv = new NumericBNodeIV(i); - - if (value instanceof BigdataValue) - ((BigdataValue) value).setIV(iv); - - return iv; - - } catch (Exception ex) { - - // string id could not be converted to an Integer - - } - - } - - } - - return null; - - } - - /** - * See {@link ILexiconConfiguration#isInline(VTE, DTE)}. - */ - public boolean isInline(final VTE vte, final DTE dte) { - - switch (vte) { - case BNODE: - return inlineBNodes && isSupported(dte); - case LITERAL: - return inlineLiterals && isSupported(dte); - default: - return false; - } - - } - - private boolean isSupported(final DTE dte) { - - switch (dte) { - case XSDBoolean: - case XSDByte: - case XSDShort: - case XSDInt: - case XSDLong: - case XSDFloat: - case XSDDouble: - case XSDInteger: - case XSDDecimal: - case UUID: - return true; - case XSDUnsignedByte: // none of the unsigneds are tested yet - case XSDUnsignedShort: // none of the unsigneds are tested yet - case XSDUnsignedInt: // none of the unsigneds are tested yet - case XSDUnsignedLong: // none of the unsigneds are tested yet - default: - return false; - } - - } - -} +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * Created July 10, 2010 + */ + +package com.bigdata.rdf.internal; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.log4j.Logger; +import org.openrdf.model.BNode; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; + +/** + * An object which describes which kinds of RDF Values are inlined into the + * statement indices and how other RDF Values are coded into the lexicon. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class LexiconConfiguration<V extends BigdataValue> + implements ILexiconConfiguration<V> { + + protected static final Logger log = + Logger.getLogger(LexiconConfiguration.class); + + private final boolean inlineLiterals, inlineBNodes, inlineDateTimes; + + private final IExtensionFactory xFactory; + + private final Map<TermId, IExtension> termIds; + + private final Map<String, IExtension> datatypes; + + public LexiconConfiguration(final boolean inlineLiterals, + final boolean inlineBNodes, final boolean inlineDateTimes, + final IExtensionFactory xFactory) { + + this.inlineLiterals = inlineLiterals; + this.inlineBNodes = inlineBNodes; + this.inlineDateTimes = inlineDateTimes; + this.xFactory = xFactory; + + termIds = new HashMap<TermId, IExtension>(); + datatypes = new HashMap<String, IExtension>(); + + } + + public void initExtensions(final IDatatypeURIResolver resolver) { + + xFactory.init(resolver, inlineDateTimes); + + for (IExtension extension : xFactory.getExtensions()) { + BigdataURI datatype = extension.getDatatype(); + if (datatype == null) + continue; + termIds.put((TermId) datatype.getIV(), extension); + datatypes.put(datatype.stringValue(), extension); + } + + } + + public V asValue(final ExtensionIV iv, final BigdataValueFactory vf) { + final TermId datatype = iv.getExtensionDatatype(); + return (V) termIds.get(datatype).asValue(iv, vf); + } + + public IV createInlineIV(final Value value) { + + // we know right away we can't handle URIs + if (value instanceof URI) + return null; + + if (value instanceof Literal) { + + final Literal l = (Literal) value; + + final URI datatype = l.getDatatype(); + + // not a datatyped literal + if (datatype == null) + return null; + + if (datatypes.containsKey(datatype.stringValue())) { + + final IExtension xFactory = + datatypes.get(datatype.stringValue()); + + try { + + final IV iv = xFactory.createIV(value); + + if (iv != null && value instanceof BigdataValue) + ((BigdataValue) value).setIV(iv); + + return iv; + + } catch (Exception ex) { + + log.warn("problem creating inline internal value for " + + "extension datatype: " + value.stringValue()); + + /* + * Some sort of parse error in the literal value most + * likely. Resort to term identifiers. + */ + return null; + + } + + } + + // get the native DTE + final DTE dte = DTE.valueOf(datatype); + + // no native DTE for this datatype + if (dte == null) + return null; + + // check to see if we are inlining literals of this type + if (!isInline(VTE.LITERAL, dte)) + return null; + + final String v = value.stringValue(); + + IV iv = null; + + try { + + switch (dte) { + case XSDBoolean: + iv = new XSDBooleanIV(XMLDatatypeUtil.parseBoolean(v)); + break; + case XSDByte: + iv = new XSDByteIV(XMLDatatypeUtil.parseByte(v)); + break; + case XSDShort: + iv = new XSDShortIV(XMLDatatypeUtil.parseShort(v)); + break; + case XSDInt: + iv = new XSDIntIV(XMLDatatypeUtil.parseInt(v)); + break; + case XSDLong: + iv = new XSDLongIV(XMLDatatypeUtil.parseLong(v)); + break; + case XSDFloat: + iv = new XSDFloatIV(XMLDatatypeUtil.parseFloat(v)); + break; + case XSDDouble: + iv = new XSDDoubleIV(XMLDatatypeUtil.parseDouble(v)); + break; + case XSDInteger: + iv = new XSDIntegerIV(XMLDatatypeUtil.parseInteger(v)); + break; + case XSDDecimal: + iv = new XSDDecimalIV(XMLDatatypeUtil.parseDecimal(v)); + break; + case UUID: + iv = new UUIDLiteralIV(UUID.fromString(v)); + break; + default: + iv = null; + } + + } catch (NumberFormatException ex) { + + // some dummy doesn't know how to format a number + // default to term identifier for this term + + log.warn("number format exception: " + v); + + } + + if (iv != null && value instanceof BigdataValue) + ((BigdataValue) value).setIV(iv); + + return iv; + + } else if (value instanceof BNode) { + + final BNode b = (BNode) value; + + final String id = b.getID(); + + final char c = id.charAt(0); + + if (c == 'u') { + + try { + + final UUID uuid = UUID.fromString(id.substring(1)); + + if (!uuid.toString().equals(id.substring(1))) + return null; + + if (!isInline(VTE.BNODE, DTE.UUID)) + return null; + + final IV iv = new UUIDBNodeIV(uuid); + + if (value instanceof BigdataValue) + ((BigdataValue) value).setIV(iv); + + return iv; + + } catch (Exception ex) { + + // string id could not be converted to a UUID + + } + + } else if (c == 'i') { + + try { + + final Integer i = Integer.valueOf(id.substring(1)); + + // cannot normalize id, needs to remain syntactically identical + if (!i.toString().equals(id.substring(1))) + return null; + + if (!isInline(VTE.BNODE, DTE.XSDInt)) + return null; + + final IV iv = new NumericBNodeIV(i); + + if (value instanceof BigdataValue) + ((BigdataValue) value).setIV(iv); + + return iv; + + } catch (Exception ex) { + + // string id could not be converted to an Integer + + } + + } + + } + + return null; + + } + + /** + * See {@link ILexiconConfiguration#isInline(VTE, DTE)}. + */ + public boolean isInline(final VTE vte, final DTE dte) { + + switch (vte) { + case BNODE: + return inlineBNodes && isSupported(dte); + case LITERAL: + return inlineLiterals && isSupported(dte); + default: + return false; + } + + } + + private boolean isSupported(final DTE dte) { + + switch (dte) { + case XSDBoolean: + case XSDByte: + case XSDShort: + case XSDInt: + case XSDLong: + case XSDFloat: + case XSDDouble: + case XSDInteger: + case XSDDecimal: + case UUID: + return true; + case XSDUnsignedByte: // none of the unsigneds are tested yet + case XSDUnsignedShort: // none of the unsigneds are tested yet + case XSDUnsignedInt: // none of the unsigneds are tested yet + case XSDUnsignedLong: // none of the unsigneds are tested yet + default: + return false; + } + + } + +} Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/XSD.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/XSD.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/XSD.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -1,83 +1,86 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.rdf.internal; - -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; - -/** - * Collects various XSD URIs as constants. - * - * @author <a href="mailto:mrp...@us...">Mike Personick</a> - * @version $Id$ - */ -public interface XSD { - - String NAMESPACE = "http://www.w3.org/2001/XMLSchema#"; - - URI BOOLEAN = new URIImpl(NAMESPACE - + "boolean"); - - URI BYTE = new URIImpl(NAMESPACE - + "byte"); - - URI SHORT = new URIImpl(NAMESPACE - + "short"); - - URI INT = new URIImpl(NAMESPACE - + "int"); - - URI LONG = new URIImpl(NAMESPACE - + "long"); - - URI UNSIGNED_BYTE = new URIImpl(NAMESPACE - + "unsignedByte"); - - URI UNSIGNED_SHORT = new URIImpl(NAMESPACE - + "unsignedShort"); - - URI UNSIGNED_INT = new URIImpl(NAMESPACE - + "unsignedInt"); - - URI UNSIGNED_LONG = new URIImpl(NAMESPACE - + "unsignedLong"); - - URI FLOAT = new URIImpl(NAMESPACE - + "float"); - - URI DOUBLE = new URIImpl(NAMESPACE - + "double"); - - URI INTEGER = new URIImpl(NAMESPACE - + "integer"); - - URI DECIMAL = new URIImpl(NAMESPACE - + "decimal"); - - URI UUID = new URIImpl(NAMESPACE - + "uuid"); - - -} +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.internal; + +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; + +/** + * Collects various XSD URIs as constants. + * + * @author <a href="mailto:mrp...@us...">Mike Personick</a> + * @version $Id$ + */ +public interface XSD { + + String NAMESPACE = "http://www.w3.org/2001/XMLSchema#"; + + URI BOOLEAN = new URIImpl(NAMESPACE + + "boolean"); + + URI BYTE = new URIImpl(NAMESPACE + + "byte"); + + URI SHORT = new URIImpl(NAMESPACE + + "short"); + + URI INT = new URIImpl(NAMESPACE + + "int"); + + URI LONG = new URIImpl(NAMESPACE + + "long"); + + URI UNSIGNED_BYTE = new URIImpl(NAMESPACE + + "unsignedByte"); + + URI UNSIGNED_SHORT = new URIImpl(NAMESPACE + + "unsignedShort"); + + URI UNSIGNED_INT = new URIImpl(NAMESPACE + + "unsignedInt"); + + URI UNSIGNED_LONG = new URIImpl(NAMESPACE + + "unsignedLong"); + + URI FLOAT = new URIImpl(NAMESPACE + + "float"); + + URI DOUBLE = new URIImpl(NAMESPACE + + "double"); + + URI INTEGER = new URIImpl(NAMESPACE + + "integer"); + + URI DECIMAL = new URIImpl(NAMESPACE + + "decimal"); + + URI UUID = new URIImpl(NAMESPACE + + "uuid"); + + URI DATETIME = new URIImpl(NAMESPACE + + "dateTime"); + + +} Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -384,7 +384,7 @@ } { - + inlineLiterals = Boolean.parseBoolean(getProperty( AbstractTripleStore.Options.INLINE_LITERALS, AbstractTripleStore.Options.DEFAULT_INLINE_LITERALS)); @@ -392,26 +392,35 @@ inlineBNodes = storeBlankNodes && Boolean.parseBoolean(getProperty( AbstractTripleStore.Options.INLINE_BNODES, AbstractTripleStore.Options.DEFAULT_INLINE_BNODES)); - + + inlineDateTimes = Boolean.parseBoolean(getProperty( + AbstractTripleStore.Options.INLINE_DATE_TIMES, + AbstractTripleStore.Options.DEFAULT_INLINE_DATE_TIMES)); + try { - + final Class<IExtensionFactory> xfc = determineExtensionFactoryClass(); + final IExtensionFactory xFactory = xfc.newInstance(); - + lexiconConfiguration = new LexiconConfiguration( - inlineLiterals, inlineBNodes, xFactory); - + inlineLiterals, inlineBNodes, inlineDateTimes, xFactory); + } catch (InstantiationException e) { + throw new IllegalArgumentException( AbstractTripleStore.Options.EXTENSION_FACTORY_CLASS, e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( AbstractTripleStore.Options.EXTENSION_FACTORY_CLASS, e); + } - + } - + } /** @@ -580,7 +589,13 @@ */ final private boolean inlineBNodes; - + /** + * Are xsd:dateTime litersls being inlined into the statement indices. + * + * {@link AbstractTripleStore.Options#INLINE_DATE_TIMES} + */ + final private boolean inlineDateTimes; + /** * Return <code>true</code> if datatype literals are being inlined into * the statement indices. Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -880,15 +880,28 @@ String DEFAULT_INLINE_BNODES = "false"; - /** - * The name of the {@link IExtensionFactory} class. The implementation - * MUST declare a constructor that accepts an - * {@link IDatatypeURIResolver} as its only argument. The - * {@link IExtension}s constructed by the factory need a resolver to - * resolve datatype URIs to term identifiers in the database. - * - * @see #DEFAULT_EXTENSION_FACTORY_CLASS - */ + /** + * Set up database to inline date/times directly into the statement + * indices rather than using the lexicon to map them to term identifiers + * and back. Date times will be converted to UTC, then stored as + * milliseconds since the epoch. Thus if you inline date/times you will + * lose the canonical representation of the date/time, and you will not + * be able to recover the original time zone of the date/time. + */ + String INLINE_DATE_TIMES = AbstractTripleStore.class.getName() + + ".inlineDateTimes"; + + String DEFAULT_INLINE_DATE_TIMES = "false"; + + /** + * The name of the {@link IExtensionFactory} class. The implementation + * MUST declare a constructor that accepts an + * {@link IDatatypeURIResolver} as its only argument. The + * {@link IExtension}s constructed by the factory need a resolver to + * resolve datatype URIs to term identifiers in the database. + * + * @see #DEFAULT_EXTENSION_FACTORY_CLASS + */ String EXTENSION_FACTORY_CLASS = AbstractTripleStore.class.getName() + ".extensionFactoryClass"; Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/SampleExtensionFactory.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/SampleExtensionFactory.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/SampleExtensionFactory.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -20,8 +20,10 @@ } - public void init(final IDatatypeURIResolver resolver) { + public void init(final IDatatypeURIResolver resolver,boolean inlineDateTimes) { + if (inlineDateTimes) + extensions.add(new DateTimeExtension(resolver)); extensions.add(new EpochExtension(resolver)); extensions.add(new ColorsEnumExtension(resolver)); extensionsArray = extensions.toArray(new IExtension[2]); Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/TestEncodeDecodeKeys.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/TestEncodeDecodeKeys.java 2010-12-19 21:49:22 UTC (rev 4019) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/internal/TestEncodeDecodeKeys.java 2010-12-19 21:51:44 UTC (rev 4020) @@ -31,11 +31,14 @@ import java.math.BigInteger; import java.util.Random; import java.util.UUID; + +import javax.xml.datatype.DatatypeFactory; + +import junit.framework.TestCase2; + import org.openrdf.model.URI; import org.openrdf.model.impl.LiteralImpl; -import junit.framework.TestCase2; - import com.bigdata.btree.keys.IKeyBuilder; import com.bigdata.btree.keys.KeyBuilder; import com.bigdata.rdf.model.BigdataBNode; @@ -335,7 +338,7 @@ * @param e * The array of the expected values. */ - protected void doEncodeDecodeTest(final IV<?, ?>[] e) { + protected IV<?, ?>[] doEncodeDecodeTest(final IV<?, ?>[] e) { /* * Encode. @@ -370,6 +373,8 @@ } + return a; + } } @@ -762,4 +767,61 @@ } + public void test_SPO_encodeDecodeDateTime() throws Exception { + + final BigdataValueFactory vf = BigdataValueFactoryImpl.getInstance("test"); + + final DatatypeFactory df = DatatypeFactory.newInstance(); + + final DateTimeExtension<BigdataValue> ext = + new DateTimeExtension<BigdataValue>(new IDatatypeURIResolver() { + public BigdataURI resolve(URI uri) { + BigdataURI buri = vf.createURI(uri.stringValue()); + buri.setIV(new TermId(VTE.URI, 1024)); + return buri; + } + }); + + final BigdataLiteral[] dt = { + vf.createLiteral( + df.newXMLGregorianCalendar("2001-10-26T21:32:52")), + vf.createLiteral( + df.newXMLGregorianCalendar("2001-10-26T21:32:52+02:00")), + vf.createLiteral( + df.newXMLGregorianCalendar("2001-10-26T19:32:52Z")), + vf.createLiteral( + df.newXMLGregorianCalendar("2001-10-26T19:32:52+00:00")), + vf.createLiteral( + df.newXMLGregorianCalendar("-2001-10-26T21:32:52")), + vf.createLiteral( + df.newXMLGregorianCalendar("2001-10-26T21:32:52.12679")), + vf.createLiteral( + df.newXMLGregorianCalendar("1901-10-26T21:32:52")), + }; + + final IV<?, ?>[] e = {// + ext.createIV(dt[0]), + ext.createIV(dt[1]), + ext.createIV(dt[2]), + ext.createIV(dt[3]), + ext.createIV(dt[4]), + ext.createIV(dt[5]), + ext.createIV(dt[6]), +// ext.createIV(dt[7]), + }; + + final IV<?, ?>[] a = doEncodeDecodeTest(e); + + if (log.isInfoEnabled()) { + for (int i = 0; i < e.length; i++) { + log.info(dt[i]); + log.info(ext.asValue((ExtensionIV) e[i], vf)); + log.info(ext.asValue((ExtensionIV) a[i], vf)); + log.info(""); + } + } + + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 21:49:29
|
Revision: 4019 http://bigdata.svn.sourceforge.net/bigdata/?rev=4019&view=rev Author: thompsonbry Date: 2010-12-19 21:49:22 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Added a utility class which can be used to dump _small_ databases. Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DumpStore.java Added: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DumpStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DumpStore.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DumpStore.java 2010-12-19 21:49:22 UTC (rev 4019) @@ -0,0 +1,166 @@ +package com.bigdata.rdf.store; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; + +/** + * Utility class may be used to dump out a <em>small</em> database. + */ +public class DumpStore { + + /** + * @param args + */ + public static void main(String[] args) throws IOException { + + // default namespace. + String namespace = "kb"; + boolean explicit = false; + boolean inferred = false; + boolean axioms = false; + boolean justifications = false; + + int i = 0; + + while (i < args.length) { + + final String arg = args[i]; + + if (arg.startsWith("-")) { + + if (arg.equals("-namespace")) { + + namespace = args[++i]; + + } else if (arg.equals("-explicit")) { + + explicit = true; + + } else if (arg.equals("-inferred")) { + + inferred = true; + + } else if (arg.equals("-axioms")) { + + axioms = true; + + } else if (arg.equals("-justifications")) { + + justifications = true; + +// } else { +// +// System.err.println("Unknown argument: " + arg); +// +// usage(); + + } + + } else { + + break; + + } + + i++; + + } + +// final int remaining = args.length - i; +// +// if (remaining < 1/*allow run w/o any named files or directories*/) { +// +// System.err.println("Not enough arguments."); +// +// usage(); +// +// } + + final File propertyFile = new File(args[i++]); + + if (!propertyFile.exists()) { + + throw new FileNotFoundException(propertyFile.toString()); + + } + + final Properties properties = new Properties(); + { + System.out.println("Reading properties: "+propertyFile); + final InputStream is = new FileInputStream(propertyFile); + try { + properties.load(is); + } finally { + if (is != null) { + is.close(); + } + } + } + + /* + * Allow override of select options. + */ + { + final String[] overrides = new String[] { + // Journal options. + com.bigdata.journal.Options.FILE, +// // RDFParserOptions. +// RDFParserOptions.Options.DATATYPE_HANDLING, +// RDFParserOptions.Options.PRESERVE_BNODE_IDS, +// RDFParserOptions.Options.STOP_AT_FIRST_ERROR, +// RDFParserOptions.Options.VERIFY_DATA, +// // DataLoader options. +// DataLoader.Options.BUFFER_CAPACITY, +// DataLoader.Options.CLOSURE, +// DataLoader.Options.COMMIT, +// DataLoader.Options.FLUSH, + }; + for (String s : overrides) { + if (System.getProperty(s) != null) { + // Override/set from the environment. + final String v = System.getProperty(s); + System.out.println("Using: " + s + "=" + v); + properties.setProperty(s, v); + } + } + } + + Journal jnl = null; + try { + + jnl = new Journal(properties); + + System.out.println("Journal file: "+jnl.getFile()); + + AbstractTripleStore kb = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + if (kb == null) { + + throw new RuntimeException("No such namespace: "+namespace); + + } + + System.out.println(kb.dumpStore(kb/* resolveTerms */, explicit, + inferred, axioms, justifications)); + + } finally { + + if (jnl != null) { + + jnl.close(); + + } + + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 21:15:10
|
Revision: 4018 http://bigdata.svn.sourceforge.net/bigdata/?rev=4018&view=rev Author: thompsonbry Date: 2010-12-19 21:15:04 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Was checking the wrong variable for divide by zero (store vs size). Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-19 19:11:38 UTC (rev 4017) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-19 21:15:04 UTC (rev 4018) @@ -203,7 +203,7 @@ BigDecimal size = new BigDecimal(reservedStore()); BigDecimal store = new BigDecimal(100 * (reservedStore() - usedStore())); - if(store.signum()==0) return 0f; + if(size.signum()==0) return 0f; return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } public float totalWaste(long total) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 19:11:44
|
Revision: 4017 http://bigdata.svn.sourceforge.net/bigdata/?rev=4017&view=rev Author: thompsonbry Date: 2010-12-19 19:11:38 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Made the logger private. Added conditional logging of the query. Formatting. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2010-12-19 19:09:55 UTC (rev 4016) +++ branches/JOURNAL_HA_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2010-12-19 19:11:38 UTC (rev 4017) @@ -64,6 +64,7 @@ import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryLanguage; import org.openrdf.query.TupleQuery; +import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.QueryParser; import org.openrdf.query.parser.sparql.SPARQLParserFactory; import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter; @@ -83,17 +84,18 @@ import com.bigdata.journal.Journal; import com.bigdata.journal.RWStrategy; import com.bigdata.journal.TimestampUtility; +import com.bigdata.rawstore.Bytes; import com.bigdata.rdf.sail.BigdataSail; import com.bigdata.rdf.sail.BigdataSailGraphQuery; import com.bigdata.rdf.sail.BigdataSailRepository; import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; import com.bigdata.rdf.sail.bench.NanoSparqlClient.QueryType; +import com.bigdata.rdf.store.AbstractLocalTripleStore; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.relation.AbstractResource; import com.bigdata.relation.RelationSchema; import com.bigdata.rwstore.RWStore; import com.bigdata.service.AbstractDistributedFederation; -import com.bigdata.service.AbstractFederation; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.jini.JiniClient; import com.bigdata.sparse.ITPS; @@ -126,7 +128,7 @@ * The logger for the concrete {@link NanoSparqlServer} class. The {@link NanoHTTPD} * class has its own logger. */ - static protected final Logger log = Logger.getLogger(NanoSparqlServer.class); + static private final Logger log = Logger.getLogger(NanoSparqlServer.class); /** * A SPARQL results set in XML. @@ -366,16 +368,21 @@ } // sb.append(tripleStore.predicateUsage()); - + if (tripleStore.getIndexManager() instanceof Journal) { - Journal journal = (Journal) tripleStore.getIndexManager(); - IBufferStrategy strategy = journal.getBufferStrategy(); + + final Journal journal = (Journal) tripleStore.getIndexManager(); + + final IBufferStrategy strategy = journal.getBufferStrategy(); + if (strategy instanceof RWStrategy) { - RWStore store = ((RWStrategy) strategy).getRWStore(); + + final RWStore store = ((RWStrategy) strategy).getRWStore(); store.showAllocators(sb); } + } } catch (Throwable t) { @@ -800,7 +807,7 @@ * rather than running on in the background with a disconnected client. */ final PipedOutputStream os = new PipedOutputStream(); - final InputStream is = new PipedInputStream(os);//Bytes.kilobyte32*8/*pipeSize*/); + final InputStream is = new PipedInputStream(os,Bytes.kilobyte32*1); // note: default is 1k. final FutureTask<Void> ft = new FutureTask<Void>(getQueryTask( namespace, timestamp, queryStr, os)); try { @@ -910,7 +917,10 @@ * position of having to parse the query here and then again when it is * executed. */ - /*final ParsedQuery q =*/ engine.parseQuery(queryStr, null/*baseURI*/); + final ParsedQuery q = engine.parseQuery(queryStr, null/*baseURI*/); + + if(log.isInfoEnabled()) + log.info(q.toString()); final NanoSparqlClient.QueryType queryType = NanoSparqlClient.QueryType .fromQuery(queryStr); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 19:10:01
|
Revision: 4016 http://bigdata.svn.sourceforge.net/bigdata/?rev=4016&view=rev Author: thompsonbry Date: 2010-12-19 19:09:55 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Bug fix for divide by zero error in statistics reported out by the DataLoader. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java 2010-12-19 19:09:28 UTC (rev 4015) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractLocalTripleStore.java 2010-12-19 19:09:55 UTC (rev 4016) @@ -116,7 +116,9 @@ final long nodesWritten = btreeCounters.getNodesWritten(); final long leavesWritten = btreeCounters.getLeavesWritten(); final long bytesWritten = btreeCounters.getBytesWritten(); - final long bytesPerRecord = bytesWritten/(nodesWritten+leavesWritten); + final long totalWritten = (nodesWritten + leavesWritten); + final long bytesPerRecord = totalWritten == 0 ? 0 : bytesWritten + / (nodesWritten + leavesWritten); sb.append((first ? "" : ", ") + fqn + "{nodes=" + nodesWritten + ",leaves=" + leavesWritten + ", bytes=" + bytesWritten This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 19:09:34
|
Revision: 4015 http://bigdata.svn.sourceforge.net/bigdata/?rev=4015&view=rev Author: thompsonbry Date: 2010-12-19 19:09:28 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Modified the DataLoader to pass through the overridden properties with new DataLoader(properties,kb) rather than using AbstractTripleStore.getDataLoader(), which uses the defaults from the properties associated with the kb. Bug fix to incremental TM option. Modified to permit operations when no source files are specified (e.g., just do the closure). Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java Modified: branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-12-19 19:05:23 UTC (rev 4014) +++ branches/JOURNAL_HA_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-12-19 19:09:28 UTC (rev 4015) @@ -860,7 +860,7 @@ // rdfFormat); loadFiles(totals, depth + 1, f, baseURI, rdfFormat, defaultGraph, filter, - (depth == 0 && i < files.length ? false : endOfBatch)); + (depth == 0 && i < (files.length-1) ? false : endOfBatch)); } @@ -1128,12 +1128,6 @@ */ public ClosureStats doClosure() { - if (buffer == null) - throw new IllegalStateException(); - - // flush anything in the buffer. - buffer.flush(); - final ClosureStats stats; switch (closureEnum) { @@ -1145,6 +1139,12 @@ * Incremental truth maintenance. */ + if (buffer == null) + throw new IllegalStateException(); + + // flush anything in the buffer. + buffer.flush(); + stats = new TruthMaintenance(inferenceEngine) .assertAll((TempTripleStore) buffer.getStatementStore()); @@ -1252,7 +1252,7 @@ final int remaining = args.length - i; - if (remaining < 2) { + if (remaining < 1/*allow run w/o any named files or directories*/) { System.err.println("Not enough arguments."); @@ -1360,7 +1360,8 @@ } final LoadStats totals = new LoadStats(); - final DataLoader dataLoader = kb.getDataLoader(); + final DataLoader dataLoader = //kb.getDataLoader(); + new DataLoader(properties,kb); // use the override properties. for (File fileOrDir : files) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-19 19:05:29
|
Revision: 4014 http://bigdata.svn.sourceforge.net/bigdata/?rev=4014&view=rev Author: thompsonbry Date: 2010-12-19 19:05:23 +0000 (Sun, 19 Dec 2010) Log Message: ----------- Disabled the session optimization in RWStore per https://sourceforge.net/apps/trac/bigdata/ticket/214. Martyn is looking into this issue. Modified TemporaryStore per https://sourceforge.net/apps/trac/bigdata/ticket/215 to support incremental truth maintenance in combination with the RWStore. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java 2010-12-18 14:31:23 UTC (rev 4013) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/TemporaryStore.java 2010-12-19 19:05:23 UTC (rev 4014) @@ -350,10 +350,21 @@ } - throw new UnsupportedOperationException( - "Not supported: timestamp=" - + TimestampUtility.toString(timestamp)); + /* + * FIXME The RWStore uses a read-only transaction to protect against + * recycling of the B+Tree revisions associated with the commit point + * on which it is reading. The temporary store only supports unisolated + * reads, so this is just ignoring the tx specified by the mutation rule + * for reading on the temporary store and going with the unisolated index + * anyway. See https://sourceforge.net/apps/trac/bigdata/ticket/215. + */ +// throw new UnsupportedOperationException( +// "Not supported: timestamp=" +// + TimestampUtility.toString(timestamp)); + + return getIndex(name); + } public SparseRowStore getGlobalRowStore() { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-18 14:31:23 UTC (rev 4013) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-19 19:05:23 UTC (rev 4014) @@ -1589,7 +1589,7 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - if (m_minReleaseAge == 0) { + if (false&&m_minReleaseAge == 0) { /* * The session protection is complicated by the mix of * transaction protection and isolated AllocationContexts. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-18 14:31:30
|
Revision: 4013 http://bigdata.svn.sourceforge.net/bigdata/?rev=4013&view=rev Author: thompsonbry Date: 2010-12-18 14:31:23 +0000 (Sat, 18 Dec 2010) Log Message: ----------- Some work on optional join group support, primarily declaring the new annotations. I need to apply the annotations to the unit tests and modify the QueryEngine logic for setting up the altSink to handle the push/pop(bool:save) as indicated by the annotations. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-12-17 15:05:06 UTC (rev 4012) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-12-18 14:31:23 UTC (rev 4013) @@ -30,12 +30,9 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.relation.accesspath.IBlockingBuffer; /** * Abstract base class for pipeline operators where the data moving along the @@ -85,6 +82,48 @@ boolean DEFAULT_SHARED_STATE = false; + /** + * Annotation used to mark the set of non-optional joins which may be + * input to either the static or runtime query optimizer. Joins within a + * join graph may be freely reordered by the query optimizer in order to + * minimize the amount of work required to compute the solutions. + * <p> + * Note: Optional joins MAY NOT appear within the a join graph. Optional + * joins SHOULD be evaluated as part of the "tail plan" following the + * join graph, but before operations such as SORT, DISTINCT, etc. + * + * @todo We should be able to automatically apply the static or runtime + * query optimizers to an operator tree using this annotation to + * identify the join graphs. + */ + String JOIN_GRAPH = PipelineOp.class.getName() + ".joinGraph"; + + /** + * Annotation marks a high level join group, which may include optional + * joins. Join groups are marked in order to decide the re-entry point + * in the query plan when a join within an optional join group fails. + * Also, the top-level join group is not marked -- only nested join + * groups are marked. This is used by the decision rule to handle do + * {@link IBindingSet#push()} when entering a + * <p> + * This is different from a {@link #JOIN_GRAPH} primarily in that the + * latter may not include optional joins. + */ + String JOIN_GROUP = PipelineOp.class.getName() + ".joinGroup"; + + /** + * Annotation is used to designate the target when a join within an + * optional join group fails. The value of this annotation must be the + * {@link #JOIN_GROUP} identifier corresponding to the next join group + * in the query plan. The target join group identifier is specified + * (rather than the bopId of the target join) since the joins in the + * target join group may be reordered by the query optimizer. The entry + * point for solutions redirected to the {@link #ALT_SINK_GROUP} is + * therefore the first operator in the target {@link #JOIN_GROUP}. This + * decouples the routing decisions from the join ordering decisions. + */ + String ALT_SINK_GROUP = PipelineOp.class.getName() + ".altSinkGroup"; + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2010-12-17 15:05:06 UTC (rev 4012) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2010-12-18 14:31:23 UTC (rev 4013) @@ -75,6 +75,9 @@ // stress test for SliceOp. suite.addTestSuite(TestQueryEngine_Slice.class); + // test suite for optional join groups. + suite.addTestSuite(TestQueryEngineOptionalJoins.class); + // @todo test suite for query evaluation (DISTINCT, ORDER BY, GROUP BY). // suite.addTestSuite(TestQueryEngine2.class); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java 2010-12-17 15:05:06 UTC (rev 4012) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java 2010-12-18 14:31:23 UTC (rev 4013) @@ -28,17 +28,9 @@ package com.bigdata.bop.engine; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase2; @@ -62,14 +54,10 @@ import com.bigdata.bop.bindingSet.HashBindingSet; import com.bigdata.bop.bset.ConditionalRoutingOp; import com.bigdata.bop.bset.StartOp; -import com.bigdata.bop.constraint.EQ; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.constraint.NEConstant; -import com.bigdata.bop.fed.TestFederatedQueryEngine; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.bop.solutions.SliceOp.SliceStats; -import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; @@ -78,16 +66,16 @@ import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.Dechunkerator; import com.bigdata.striterator.ICloseableIterator; -import com.bigdata.util.InnerCause; -import com.bigdata.util.concurrent.LatchedExecutor; -import com.ibm.icu.impl.ByteBuffer; /** - * Test suite for the {@link QueryEngine} against a local database instance. - * <p> - * Note: The {@link BOp}s are unit tested separately. This test suite is focused - * on interactions when {@link BOp}s are chained together in a query, such as a - * sequence of pipeline joins, a slice applied to a query, etc. + * Test suite for handling of optional join groups during query evaluation + * against a local database instance. Optional join groups are handled using + * {@link IBindingSet#push()} when entering the join group and + * {@link IBindingSet#pop(boolean)} when exiting the join group. If the join + * group was successful for a given binding set, then <code>save:=true</code> is + * specified for {@link IBindingSet#pop(boolean)} and the applied bindings will + * be visible to the downstream consumer. Otherwise the bindings applied during + * the join group are simply discarded. * * <pre> * -Dlog4j.configuration=bigdata/src/resources/logging/log4j.properties @@ -95,10 +83,6 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: TestQueryEngine.java 3950 2010-11-17 02:14:08Z thompsonbry $ - * - * @see TestFederatedQueryEngine - * - * @todo write a unit and stress tests for deadlines. */ public class TestQueryEngineOptionalJoins extends TestCase2 { @@ -270,14 +254,14 @@ */ public void test_query_join2_optionals() throws Exception { - final int startId = 1; - final int joinId1 = 2; - final int predId1 = 3; - final int joinId2 = 4; - final int predId2 = 5; - final int joinId3 = 6; - final int predId3 = 7; - final int sliceId = 8; + final int startId = 1; // + final int joinId1 = 2; // : base join group. + final int predId1 = 3; // (a b) + final int joinId2 = 4; // : joinGroup1 + final int predId2 = 5; // (b c) + final int joinId3 = 6; // : joinGroup1 + final int predId3 = 7; // (c d) + final int sliceId = 8; // final IVariable<?> a = Var.var("a"); final IVariable<?> b = Var.var("b"); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-12-17 15:05:06 UTC (rev 4012) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-12-18 14:31:23 UTC (rev 4013) @@ -535,16 +535,16 @@ } - /** - * Unit tests for optional joins. For an optional join, an alternative sink - * may be specified in the {@link BOpContext}. When specified, it is used if - * the join fails (if not specified, the binding sets which do not join are - * forwarded to the primary sink). Binding sets which join go to the primary - * sink regardless. - * - * @throws ExecutionException - * @throws InterruptedException - */ + /** + * Unit tests for optional joins. For an optional join, an alternative sink + * may be specified for the join. When specified, it is used if the join + * fails (if not specified, the binding sets which do not join are forwarded + * to the primary sink). Binding sets which join go to the primary sink + * regardless. + * + * @throws ExecutionException + * @throws InterruptedException + */ public void test_optionalJoin() throws InterruptedException, ExecutionException { final Var<?> x = Var.var("x"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-17 15:05:12
|
Revision: 4012 http://bigdata.svn.sourceforge.net/bigdata/?rev=4012&view=rev Author: martyncutcher Date: 2010-12-17 15:05:06 +0000 (Fri, 17 Dec 2010) Log Message: ----------- Relax session address checking to allow read on any transient bit - either session protected or committed. Resolves problem reading committed CommitRecords from CommitRecordIndex Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-16 14:28:33 UTC (rev 4011) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-17 15:05:06 UTC (rev 4012) @@ -113,9 +113,14 @@ final int bit = offset % allocBlockRange; - if (RWStore.tstBit(block.m_live, bit) - || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) - { +// if (RWStore.tstBit(block.m_live, bit) +// || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) + /* + * Just check transients since there are case (eg CommitRecordIndex) + * where committed data is accessed even if has been marked as ready to + * be recycled after the next commit + */ + if (RWStore.tstBit(block.m_transients, bit)) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { return 0L; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-16 14:28:33 UTC (rev 4011) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-17 15:05:06 UTC (rev 4012) @@ -1253,9 +1253,9 @@ readLock.lock(); - assertOpen(); // check again after taking lock + try { + assertOpen(); // check again after taking lock - try { // length includes space for the checksum if (length > m_maxFixedAlloc) { try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-16 14:28:39
|
Revision: 4011 http://bigdata.svn.sourceforge.net/bigdata/?rev=4011&view=rev Author: martyncutcher Date: 2010-12-16 14:28:33 +0000 (Thu, 16 Dec 2010) Log Message: ----------- Fix NOP test in BufferedWrite.flush Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-16 12:44:43 UTC (rev 4010) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-16 14:28:33 UTC (rev 4011) @@ -213,7 +213,7 @@ final ByteBuffer m_data = this.m_data.get(); - if (m_data.remaining() == 0) { + if (m_data.position() == 0) { // NOP. return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-16 13:05:00
|
Revision: 4010 http://bigdata.svn.sourceforge.net/bigdata/?rev=4010&view=rev Author: martyncutcher Date: 2010-12-16 12:44:43 +0000 (Thu, 16 Dec 2010) Log Message: ----------- 1) Fixes problem on RWStore open where a FixedAllocator can be associated with an incorrect free list. 2) Fixes issue with session protection interaction with WriteCacheService. ReleaseSession now clears transient writes from cache. 3) Fixes problem with session protection with FixedAllocator freebit count not kept in sync. Clears reported exceptions from concurrent stress tests. 4) Added extra validation to RWStore allocation access, validating IO requests against slot sizes. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/TestConcurrentJournal.java branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/rwstore/TestRWJournal.java Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -992,6 +992,20 @@ } + case TemporaryRW: { + + /* + * Setup the buffer strategy. + */ + + _bufferStrategy = new RWStrategy(fileMetadata, quorum); + + this._rootBlock = fileMetadata.rootBlock; + + break; + + } + case Temporary: { /* @@ -2354,6 +2368,14 @@ * the store. */ final long commitRecordIndexAddr = _commitRecordIndex.writeCheckpoint(); + + /* + * DEBUG: The commitRecordIndexAddr should not be deleted, the + * call to lockAddress forces a runtime check protecting the address + */ + if (_bufferStrategy instanceof RWStrategy) { + ((RWStrategy) _bufferStrategy).lockAddress(commitRecordIndexAddr); + } if (quorum != null) { /* Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/BufferMode.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -139,6 +139,18 @@ /** * <p> + * A variant on the DiskRW backed by a temporary file. Options enable + * part of the store to be held with Direct ByteBuffers. A significant + * use case would be an in-memory store but with disk overflow if + * required. + * </p> + * + * @see RWStrategy + */ + TemporaryRW(false/* stable */, false/* fullyBuffered */,StoreTypeEnum.RW), + + /** + * <p> * A variant on the {@link #Disk} mode that is not restart-safe. This mode * is useful for all manners of temporary data with full concurrency control * and scales-up to very large temporary files. The backing file (if any) is Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -1262,14 +1262,14 @@ DiskOnlyStrategy(final long maximumExtent, final FileMetadata fileMetadata) { super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, - fileMetadata.nextOffset, fileMetadata.bufferMode, + fileMetadata.nextOffset, fileMetadata.getBufferMode(), fileMetadata.readOnly); this.file = fileMetadata.file; this.fileMode = fileMetadata.fileMode; - this.temporaryStore = (fileMetadata.bufferMode==BufferMode.Temporary); + this.temporaryStore = (fileMetadata.getBufferMode()==BufferMode.Temporary); this.raf = fileMetadata.raf; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -802,6 +802,7 @@ log.info("Mapping file=" + file); buffer = opener.reopenChannel().map(FileChannel.MapMode.READ_WRITE, headerSize0, userExtent); break; + case TemporaryRW: case DiskRW: buffer = null; break; @@ -1514,4 +1515,8 @@ return getProperty(properties, name, defaultValue); } + public BufferMode getBufferMode() { + return bufferMode; + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -633,4 +633,14 @@ } + /** + * An assert oriented method that allows a finite number of addresses + * to be monitored to ensure it is not freed. + * + * @param addr - address to be locked + */ + public void lockAddress(final long addr) { + m_store.lockAddress(decodeAddr(addr)); + } + } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -822,14 +822,14 @@ final Quorum<?, ?> quorum) { super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, - fileMetadata.nextOffset, fileMetadata.bufferMode, + fileMetadata.nextOffset, fileMetadata.getBufferMode(), fileMetadata.readOnly); this.file = fileMetadata.file; this.fileMode = fileMetadata.fileMode; - this.temporaryStore = (fileMetadata.bufferMode==BufferMode.Temporary); + this.temporaryStore = (fileMetadata.getBufferMode()==BufferMode.Temporary); this.raf = fileMetadata.raf; Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/AllocBlock.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -26,6 +26,8 @@ import java.util.ArrayList; +import org.apache.log4j.Logger; + import com.bigdata.rwstore.RWStore.AllocationStats; /** @@ -43,6 +45,15 @@ * @todo change to use long[]s. */ public class AllocBlock { + + private static final Logger log = Logger.getLogger(AllocBlock.class); + + /** + * The FixedAllocator owning this block. The callback reference is needed + * to allow the AllocBlock to determine the session state and whether to + * clear the transient bits. + */ + final FixedAllocator m_allocator; /** * The address of the {@link AllocBlock} -or- ZERO (0) if {@link AllocBlock} * has not yet been allocated on the persistent heap. Note that the space @@ -82,8 +93,9 @@ // */ // private final RWWriteCacheService m_writeCache; - AllocBlock(final int addrIsUnused, final int bitSize) {//, final RWWriteCacheService cache) { + AllocBlock(final int addrIsUnused, final int bitSize, final FixedAllocator allocator) {//, final RWWriteCacheService cache) { // m_writeCache = cache; + m_allocator = allocator; m_ints = bitSize; m_commit = new int[bitSize]; m_live = new int[bitSize]; @@ -116,6 +128,16 @@ } public boolean freeBit(final int bit) { + // by default do NOT session protect, the 2 argument call is made + // directly from the RWStore that has access to sessio and transaction + // state + return freeBit(bit, false); + } + + /* + * + */ + public boolean freeBit(final int bit, final boolean sessionProtect) { if (!RWStore.tstBit(m_live, bit)) { throw new IllegalArgumentException("Freeing bit not set"); } @@ -128,17 +150,35 @@ * Note that with buffered IO there is also an opportunity to avoid * output to the file by removing any pending write to the now freed * address. On large transaction scopes this may be significant. + * + * The sessionProtect parameter indicates whether we really should + * continue to protect this alloction by leaving the transient bit + * set. For general session protection we should, BUT it allocation + * contexts have been used we can allow immediate recycling and this + * is setup by the caller */ RWStore.clrBit(m_live, bit); + + if (log.isTraceEnabled()) { + log.trace("Freeing " + bitPhysicalAddress(bit) + " sessionProtect: " + sessionProtect); + } - if (!RWStore.tstBit(m_commit, bit)) { - RWStore.clrBit(m_transients, bit); - - return true; + if (!sessionProtect) { + if (!RWStore.tstBit(m_commit, bit)) { + RWStore.clrBit(m_transients, bit); + + return true; + } else { + return false; + } } else { return false; } } + + private long bitPhysicalAddress(int bit) { + return RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * bit); + } /** * The shadow, if non-null defines the context for this request. @@ -266,14 +306,45 @@ * of the committed bits and the live bits, but rather an ORing of the live * with all the committed bits since the start of the session. * When the session is released, the state is restored to an ORing of the - * live and the committed, thus releasing slots for re-allocation. + * live and the committed, thus releasing slots for re-allocation. + * + * For each transient bit, check if cleared and ensure any write is removed + * from the write cache. Where the bit is set in the session protected + * but not in the recalculated transient. Tested with new &= ~old; + * + * @param cache */ - public void releaseSession() { + public void releaseSession(RWWriteCacheService cache) { if (m_addr != 0) { // check active! for (int i = 0; i < m_live.length; i++) { + int chkbits = m_transients[i]; m_transients[i] = m_live[i] | m_commit[i]; + chkbits &= ~m_transients[i]; + + if (chkbits != 0) { + // there are writes to clear + for (int b = 0; b < 32; b++) { + if ((chkbits & (1 << b)) != 0) { + long clr = RWStore.convertAddr(m_addr) + ((long) m_allocator.m_size * b); + + if (log.isTraceEnabled()) + log.trace("releasing address: " + clr); + + cache.clearWrite(clr); + } + } + } } } } + public String show() { + StringBuilder sb = new StringBuilder(); + sb.append("AllocBlock, baseAddress: " + RWStore.convertAddr(m_addr) + " bits: "); + for (int b: m_transients) + sb.append(b + " "); + + return sb.toString(); + } + } Added: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java (rev 0) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -0,0 +1,18 @@ +package com.bigdata.rwstore; + +/** + * The DirectFixedAllocator is used to manage in-memory Direct ByteBuffer + * allocated memory. + * + */ +public class DirectFixedAllocator extends FixedAllocator { + + DirectFixedAllocator(RWStore store, int size) { + super(store, size); + } + + protected int grabAllocation(RWStore store, int blockSize) { + return store.allocateDirect(blockSize); + } + +} Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/DirectFixedAllocator.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -114,7 +114,7 @@ final int bit = offset % allocBlockRange; if (RWStore.tstBit(block.m_live, bit) - || (this.m_sessionActive && RWStore.tstBit(block.m_transients, bit))) + || (m_sessionActive && RWStore.tstBit(block.m_transients, bit))) { return RWStore.convertAddr(block.m_addr) + ((long) m_size * bit); } else { @@ -155,7 +155,7 @@ * store from re-allocating allocations reachable from read-only * requests and concurrent transactions. */ - private boolean m_sessionActive; + boolean m_sessionActive; public void setAllocationContext(final IAllocationContext context) { if (context == null && m_context != null) { @@ -196,13 +196,11 @@ public byte[] write() { try { final AllocBlock fb = m_allocBlocks.get(0); - if (log.isDebugEnabled()) - log.debug("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); + if (log.isTraceEnabled()) + log.trace("writing allocator " + m_index + " for " + getStartAddr() + " with " + fb.m_live[0]); final byte[] buf = new byte[1024]; final DataOutputStream str = new DataOutputStream(new FixedOutputStream(buf)); try { - m_sessionActive = m_store.isSessionProtected(); - str.writeInt(m_size); final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); @@ -240,19 +238,19 @@ str.close(); } -// if (!m_store.isSessionPreserved()) { - m_freeBits += m_freeTransients; - - // Handle re-addition to free list once transient frees are - // added back - if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { - m_freeList.add(this); - m_freeWaiting = false; + if (!this.m_sessionActive) { + m_freeBits += m_freeTransients; + + // Handle re-addition to free list once transient frees are + // added back + if ((m_freeTransients == m_freeBits) && (m_freeTransients != 0)) { + m_freeList.add(this); + m_freeWaiting = false; + } + + m_freeTransients = 0; } - m_freeTransients = 0; -// } - return buf; } catch (IOException e) { throw new StorageTerminalError("Error on write", e); @@ -309,7 +307,7 @@ } /** The size of the allocation slots in bytes. */ - private final int m_size; + final int m_size; private int m_startAddr = 0; private int m_endAddr = 0; @@ -343,11 +341,13 @@ m_size = size; - m_bitSize = calcBitSize(true, size, cMinAllocation, cModAllocation); + // By default, disk-based allocators should optimise for density + m_bitSize = calcBitSize(true /* optDensity */, size, cMinAllocation, cModAllocation); -// m_writeCache = cache; - // number of blocks in this allocator, bitSize plus 1 for start address + // The 1K allocator is 256 ints, one is used to record the slot size and + // another for the checksum; leaving 254 to be used to store the + // AllocBlocks. final int numBlocks = 254 / (m_bitSize + 1); /* @@ -357,7 +357,7 @@ */ m_allocBlocks = new ArrayList<AllocBlock>(numBlocks); for (int i = 0; i < numBlocks; i++) { - m_allocBlocks.add(new AllocBlock(0, m_bitSize));//, cache)); + m_allocBlocks.add(new AllocBlock(0, m_bitSize, this));//, cache)); } m_freeTransients = 0; @@ -415,7 +415,7 @@ * content and 1 more for the header). A variation on the current Blob * implementation could include the header in the first allocation, thus * reducing the minimum Blob allocations from 3 to 2, but the point still - * holds that too small a max fixed allocation could rmatically reduce the + * holds that too small a max fixed allocation could dramatically reduce the * number of allocations that could be made. * * @param alloc the slot size to be managed @@ -435,8 +435,6 @@ while ((nints * intAllocation) % modAllocation != 0) nints++; -// System.out.println("calcBitSize for " + alloc + " returns " + nints); - return nints; } @@ -498,6 +496,10 @@ private boolean m_freeWaiting = true; public boolean free(final int addr, final int size) { + return free(addr, size, false); + } + + public boolean free(final int addr, final int size, final boolean overideSession) { if (addr < 0) { final int offset = ((-addr) & RWStore.OFFSET_BITS_MASK) - 3; // bit adjust @@ -505,15 +507,24 @@ final int block = offset/nbits; + m_sessionActive = m_store.isSessionProtected(); + if (((AllocBlock) m_allocBlocks.get(block)) - .freeBit(offset % nbits)) { // bit adjust + .freeBit(offset % nbits, m_sessionActive && !overideSession)) { // bit adjust - // Only add back to the free list if at least 3000 bits avail - if (m_freeBits++ == 0 && false) { + // Only add back to the free list this is a DirectFixedAllocator + // or the freeBits exceed the cDefaultFreeBitsThreshold + // If a DirectFixedAllocator then also ensure it is added to the + // front of the free list + if (m_freeBits++ == 0 && this instanceof DirectFixedAllocator) { m_freeWaiting = false; - m_freeList.add(this); + m_freeList.add(0, this); } else if (m_freeWaiting && m_freeBits == m_store.cDefaultFreeBitsThreshold) { m_freeWaiting = false; + + if (log.isDebugEnabled()) + log.debug("Returning Allocator to FreeList - " + m_size); + m_freeList.add(this); } } else { @@ -554,6 +565,11 @@ throw new IllegalArgumentException( "Allocate requires positive size, got: " + size); + if (size > m_size) + throw new IllegalArgumentException( + "FixedAllocator with slots of " + m_size + + " bytes requested allocation for "+ size + " bytes"); + int addr = -1; final Iterator<AllocBlock> iter = m_allocBlocks.iterator(); @@ -570,9 +586,9 @@ blockSize *= m_size; blockSize >>= RWStore.ALLOCATION_SCALEUP; - block.m_addr = store.allocBlock(blockSize); - if (log.isInfoEnabled()) - log.info("Allocation block at " + block.m_addr + " of " + (blockSize << 16) + " bytes"); + block.m_addr = grabAllocation(store, blockSize); + if (log.isDebugEnabled()) + log.debug("Allocation block at " + block.m_addr + " of " + (blockSize << 16) + " bytes"); if (m_startAddr == 0) { m_startAddr = block.m_addr; @@ -583,8 +599,9 @@ } if (addr != -1) { - addr += 3; // Tweak to ensure non-zero address for offset 0 + addr += 3; // Tweak to ensure non-zero address for offset 0 + if (--m_freeBits == 0) { if (log.isTraceEnabled()) log.trace("Remove from free list"); @@ -593,9 +610,10 @@ // Should have been first on list, now check for first if (m_freeList.size() > 0) { - final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); - if (log.isInfoEnabled()) - log.info("Freelist head: " + nxt.getSummaryStats()); + if (log.isDebugEnabled()) { + final FixedAllocator nxt = (FixedAllocator) m_freeList.get(0); + log.debug("Freelist head: " + nxt.getSummaryStats()); + } } } @@ -606,16 +624,28 @@ if (m_statsBucket != null) { m_statsBucket.allocate(size); } - + return value; } else { - if (log.isTraceEnabled()) - log.trace("FixedAllocator returning null address"); - + if (log.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("FixedAllocator returning null address, with freeBits: " + m_freeBits + "\n"); + + for (AllocBlock ab: m_allocBlocks) { + sb.append(ab.show() + "\n"); + } + + log.debug(sb); + } + return 0; } } + protected int grabAllocation(RWStore store, int blockSize) { + return store.allocBlock(blockSize); + } + public boolean hasFree() { return m_freeBits > 0; } @@ -764,12 +794,12 @@ m_statsBucket = b; } - public void releaseSession() { + public void releaseSession(RWWriteCacheService cache) { if (this.m_sessionActive) { if (log.isTraceEnabled()) log.trace("Allocator: #" + m_index + " releasing session protection"); for (AllocBlock ab : m_allocBlocks) { - ab.releaseSession(); + ab.releaseSession(cache); } } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/IStore.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -77,6 +77,12 @@ */ public void getData(long l, byte buf[]); + /************************************************************** + * @param addr - the address + * @return the size of the slot associated + */ + public int getAssociatedSlotSize(int addr); + // /************************************************************** // * Given a physical address (byte offset on the store), return true // * if that address could be managed by an allocated block. Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -37,6 +37,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -54,6 +56,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.counters.Instrument; import com.bigdata.counters.striped.StripedCounters; +import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; @@ -61,6 +64,7 @@ import com.bigdata.io.writecache.WriteCacheService; import com.bigdata.journal.AbstractBufferStrategy; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.BufferMode; import com.bigdata.journal.CommitRecordIndex; import com.bigdata.journal.CommitRecordSerializer; import com.bigdata.journal.FileMetadata; @@ -194,9 +198,9 @@ * <p> * Add metabits header record checksum field and verify on read back. * <p> - * Checksum fixed allocators (needs to be tested on read back). + * Done. Checksum fixed allocators (needs to be tested on read back). * <p> - * Add version field to the fixed allocator. + * Done. Add version field to the fixed allocator. * <p> * Done. Checksum delete blocks / blob records. * <p> @@ -207,7 +211,7 @@ * Modify FixedAllocator to use arrayCopy() rather than clone and * declare more fields to be final. See notes on {@link AllocBlock}. * <p> - * Implement logic to "abort" a shadow allocation context. + * Done. Implement logic to "abort" a shadow allocation context. * <p> * Unit test to verify that we do not recycle allocations from the last * commit point even when the retention time is zero such that it is @@ -335,6 +339,12 @@ // m_commitCallback = callback; // } + // If required, then allocate 1M direct buffers + private static final int cDirectBufferCapacity = 1024 * 1024; + + private int cMaxDirectBuffers = 20; // 20M of direct buffers + static final int cDirectAllocationOffset = 64 * 1024; + // /////////////////////////////////////////////////////////////////////////////////////// // RWStore Data // /////////////////////////////////////////////////////////////////////////////////////// @@ -483,11 +493,26 @@ private StorageStats m_storageStats; private long m_storageStatsAddr = 0; + /** + * Direct ByteBuffer allocations. + * + * TODO: Support different scaleups for disk and direct allocation to + * allow for finer granularity of allocation. For example, a 1K + * scaleup would allow 32bit slot allocations for all slot sizes. + */ + private int m_directSpaceAvailable = 0; + private int m_nextDirectAllocation = cDirectAllocationOffset; + private ArrayList<ByteBuffer> m_directBuffers = null; + + private final boolean m_enableDirectBuffer; + /** * <code>true</code> iff the backing store is open. */ private volatile boolean m_open = true; + private TreeMap<Integer, Integer> m_lockAddresses = null; + class WriteCacheImpl extends WriteCache.FileChannelScatteredWriteCache { public WriteCacheImpl(final ByteBuffer buf, final boolean useChecksum, @@ -549,13 +574,23 @@ if (fileMetadata == null) throw new IllegalArgumentException(); - this.m_minReleaseAge = LongValidator.GTE_ZERO.parse( + this.m_minReleaseAge = Long.valueOf(fileMetadata.getProperty( AbstractTransactionService.Options.MIN_RELEASE_AGE, - AbstractTransactionService.Options.DEFAULT_MIN_RELEASE_AGE); + AbstractTransactionService.Options.DEFAULT_MIN_RELEASE_AGE)); if (log.isInfoEnabled()) log.info(AbstractTransactionService.Options.MIN_RELEASE_AGE + "=" + m_minReleaseAge); + /* + * Disable TemporaryRW option for now + */ + // m_enableDirectBuffer = fileMetadata.getBufferMode() == BufferMode.TemporaryRW; + m_enableDirectBuffer = false; + + if (m_enableDirectBuffer) { + m_directBuffers = new ArrayList<ByteBuffer>(); + addDirectBuffer(); + } cDefaultMetaBitsSize = Integer.valueOf(fileMetadata.getProperty( Options.META_BITS_SIZE, @@ -673,6 +708,8 @@ for (FixedAllocator fa: m_allocs) { m_storageStats.register(fa); } + } else { + m_storageStats = new StorageStats(m_allocSizes); } } @@ -694,7 +731,15 @@ } } - private void setAllocations(final FileMetadata fileMetadata) + private void addDirectBuffer() { + if (cMaxDirectBuffers > m_directBuffers.size()) { + ByteBuffer bbuf = ByteBuffer.allocateDirect(cDirectBufferCapacity); + m_directBuffers.add(bbuf); + m_directSpaceAvailable += cDirectBufferCapacity; + } + } + + private void setAllocations(final FileMetadata fileMetadata) throws IOException { final String buckets = fileMetadata.getProperty( @@ -1017,11 +1062,20 @@ final ArrayList<? extends Allocator> freeList; assert allocSize > 0; + // m_minFixedAlloc and m_maxFixedAlloc may not be set since + // as finals they must be set in the constructor. Therefore + // recalculate for local load + final int minFixedAlloc = 64 * m_allocSizes[0]; + final int maxFixedAlloc = 64 * m_allocSizes[m_allocSizes.length-1]; int index = 0; - int fixedSize = m_minFixedAlloc; - while (fixedSize < allocSize) + int fixedSize = minFixedAlloc; + while (fixedSize < allocSize && fixedSize < maxFixedAlloc) fixedSize = 64 * m_allocSizes[++index]; + if (allocSize != fixedSize) { + throw new IllegalStateException("Unexpected allocator size: " + + allocSize + " != " + fixedSize); + } allocator = new FixedAllocator(this, allocSize);//, m_writeCache); freeList = m_freeFixed[index]; @@ -1056,13 +1110,6 @@ for (int index = 0; index < m_allocs.size(); index++) { ((Allocator) m_allocs.get(index)).setIndex(index); } - - if (false) { - StringBuilder tmp = new StringBuilder(); - showAllocators(tmp); - - System.out.println("Allocators: " + tmp.toString()); - } } /** @@ -1206,6 +1253,8 @@ readLock.lock(); + assertOpen(); // check again after taking lock + try { // length includes space for the checksum if (length > m_maxFixedAlloc) { @@ -1277,6 +1326,10 @@ } try { + + if (getBlock((int) addr).getBlockSize() < length) { + throw new IllegalStateException("Bad Address: length requested greater than allocated slot"); + } final long paddr = physicalAddress((int) addr); @@ -1287,6 +1340,12 @@ throw new PhysicalAddressResolutionException(addr); } + + if (paddr < 0) { // read from Direct ByteBuffer + directRead(paddr, buf, offset, length); + + return; + } /** * Check WriteCache first @@ -1382,6 +1441,69 @@ } } + /** + * Retrieves data from the direct byte buffers, must handle transfers across + * multiple buffers + */ + private void directRead(final long paddr, final byte[] buf, final int offset, final int length) { + assert paddr < 0; + assert m_directBuffers != null; + + final int baddr = (int) (-paddr) - cDirectAllocationOffset; // buffer address + int bufIndex = baddr / cDirectBufferCapacity; + int bufOffset = baddr % cDirectBufferCapacity; + + int transfer = 0; + int curOut = offset; + + while (transfer < length) { + ByteBuffer direct = m_directBuffers.get(bufIndex); + direct.position(bufOffset); + int avail = cDirectBufferCapacity - bufOffset; + int req = length - transfer; + int tlen = avail < req ? avail : req; + + direct.get(buf, curOut, tlen); + + transfer += tlen; + curOut += tlen; + + bufIndex++; + bufOffset = 0; + } + } + + /** + * Writes to direct buffers, transferring across boundaries as required + */ + private void directWrite(final long pa, final byte[] buf, final int offset, final int length, final int chk) { + assert pa < 0; + assert m_directBuffers != null; + + final int baddr = (int) (-pa) - cDirectAllocationOffset; // buffer address + int bufIndex = baddr / cDirectBufferCapacity; + int bufOffset = baddr % cDirectBufferCapacity; + + int transfer = 0; + int curIn = offset; + + while (transfer < length) { + ByteBuffer direct = m_directBuffers.get(bufIndex); + direct.position(bufOffset); + int avail = cDirectBufferCapacity - bufOffset; + int req = length - transfer; + int tlen = avail < req ? avail : req; + + direct.put(buf, curIn, tlen); + + transfer += tlen; + curIn += tlen; + + bufIndex++; + bufOffset = 0; + } + } + private void assertAllocators() { for (int i = 0; i < m_allocs.size(); i++) { if (m_allocs.get(i).getIndex() != i) { @@ -1434,7 +1556,7 @@ public void free(final long laddr, final int sze, final IAllocationContext context) { assertOpen(); final int addr = (int) laddr; - + switch (addr) { case 0: case -1: @@ -1443,6 +1565,9 @@ } m_allocationLock.lock(); try { + if (m_lockAddresses != null && m_lockAddresses.containsKey((int)laddr)) + throw new IllegalStateException("address locked: " + laddr); + if (sze > m_maxFixedAlloc-4) { freeBlob(addr, sze, context); } else { @@ -1464,35 +1589,32 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - boolean alwaysDefer = m_minReleaseAge > 0L - || m_activeTxCount > 0; - if (!alwaysDefer) - alwaysDefer = context == null && !m_contexts.isEmpty(); - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + addr + " real: " - + physicalAddress(addr)); - if (alwaysDefer - || !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); + if (m_minReleaseAge == 0) { + /* + * The session protection is complicated by the mix of + * transaction protection and isolated AllocationContexts. + */ + if (this.isSessionProtected()) { + + immediateFree(addr, sze, context != null && alloc.canImmediatelyFree(addr, sze, context)); + } else { + immediateFree(addr, sze); + } } else { - immediateFree(addr, sze); + boolean alwaysDefer = m_activeTxCount > 0; + + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); + if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); + } else { + immediateFree(addr, sze); + } } -// if (m_minReleaseAge == 0) { -// immediateFree(addr, sze); -// } else { -// boolean alwaysDefer = m_activeTxCount > 0; -// if (!alwaysDefer) -// alwaysDefer = context == null && !m_contexts.isEmpty(); -// if (alwaysDefer) -// if (log.isDebugEnabled()) -// log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); -// if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { -// deferFree(addr, sze); -// } else { -// immediateFree(addr, sze); -// } -// } } } finally { m_allocationLock.unlock(); @@ -1504,6 +1626,19 @@ return m_minReleaseAge; } + /** + * Session protection can only be used in preference to deferred frees when + * the minReleaseAge is zero. If so then two protection states are checked: + * either a positive activeTxCount incremented by the TransactionManager + * or if there are active AllocationContexts. + * + * The activeTxCount esentially protects read-only transactions while the + * AllocationContexts enable concurrent store allocations, whilst also + * supporting immediate re-cycling of localized allocations (those made + * and released within the same AllocationContext). + * + * @return whether there is a logical active session + */ boolean isSessionProtected() { return m_minReleaseAge == 0 && (m_activeTxCount > 0 || !m_contexts.isEmpty()); } @@ -1515,11 +1650,16 @@ * * When called, will call through to the Allocators to re-sync the * transient bits with the committed and live. + * + * The writeCache is passed into the allocator to enable any "now free" + * allocations to be cleared from the cache. Until the session is released + * the writeCache must be maintained to support readers of uncommitted and + * unwritten allocations. */ void releaseSessions() { if (m_minReleaseAge == 0) { for (FixedAllocator fa : m_allocs) { - fa.releaseSession(); + fa.releaseSession(m_writeCache); } } } @@ -1559,6 +1699,10 @@ // private long immediateFreeCount = 0; private void immediateFree(final int addr, final int sze) { + immediateFree(addr, sze, false); + } + + private void immediateFree(final int addr, final int sze, final boolean overrideSession) { switch (addr) { case 0: @@ -1575,14 +1719,18 @@ throw new IllegalArgumentException("Invalid address provided to immediateFree: " + addr + ", size: " + sze); } final long pa = alloc.getPhysicalAddress(addrOffset); + if (log.isTraceEnabled()) log.trace("Freeing allocation at " + addr + ", physical address: " + pa); - alloc.free(addr, sze); + alloc.free(addr, sze, overrideSession); // must clear after free in case is a blobHdr that requires reading! // the allocation lock protects against a concurrent re-allocation // of the address before the cache has been cleared assert pa != 0; - m_writeCache.clearWrite(pa); + // only clear any existing write to cache if no active session + if (overrideSession || !this.isSessionProtected()) { + m_writeCache.clearWrite(pa); + } m_frees++; if (alloc.isAllocated(addrOffset)) throw new IllegalStateException("Reallocation problem with WriteCache"); @@ -1649,7 +1797,12 @@ final ArrayList<FixedAllocator> list = m_freeFixed[i]; if (list.size() == 0) { - allocator = new FixedAllocator(this, block);//, m_writeCache); + if (canAllocateDirect()) { + allocator = new DirectFixedAllocator(this, block); + } else { + allocator = new FixedAllocator(this, block); + } + allocator.setFreeList(list); allocator.setIndex(m_allocs.size()); @@ -1707,6 +1860,13 @@ } } + /** + * @return true if we have spare directBuffers. + */ + private boolean canAllocateDirect() { + return m_directBuffers != null && m_directBuffers.size() < cMaxDirectBuffers; + } + private int fixedAllocatorIndex(final int size) { int i = 0; @@ -1788,13 +1948,23 @@ } final int newAddr = alloc(size + 4, context); // allow size for checksum + + if (newAddr == 0) + throw new IllegalStateException("NULL address allocated"); final int chk = ChecksumUtility.getCHK().checksum(buf, size); + + final long pa = physicalAddress(newAddr); - try { - m_writeCache.write(physicalAddress(newAddr), ByteBuffer.wrap(buf, 0, size), chk); - } catch (InterruptedException e) { - throw new RuntimeException("Closed Store?", e); + // if from DirectFixedAllocator then physical address will be negative + if (pa < 0) { + directWrite(pa, buf, 0, size, chk); + } else { + try { + m_writeCache.write(pa, ByteBuffer.wrap(buf, 0, size), chk); + } catch (InterruptedException e) { + throw new RuntimeException("Closed Store?", e); + } } // Update counters. @@ -1875,12 +2045,19 @@ // } // } - /** + /** * Toss away all buffered writes and then reload from the current root * block. + * + * If the store is using DirectFixedAllocators then an IllegalStateException + * is thrown */ public void reset() { assertOpen(); + + if (m_directBuffers != null) + throw new IllegalStateException("Reset is not supported with direct buffers"); + if (log.isInfoEnabled()) { log.info("RWStore Reset"); } @@ -1915,7 +2092,7 @@ // notify of current file length. m_writeCache.setExtent(convertAddr(m_fileSize)); } catch (Exception e) { - throw new IllegalStateException("Unable reset the store", e); + throw new IllegalStateException("Unable to reset the store", e); } finally { m_allocationLock.unlock(); } @@ -1971,10 +2148,14 @@ if (addr == 0) { throw new IllegalStateException("Invalid metabits address: " + m_metaBitsAddr); } - try { - m_writeCache.write(addr, ByteBuffer.wrap(buf), 0, false); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (addr < 0) { + directWrite(addr, buf, 0, buf.length, 0); + } else { + try { + m_writeCache.write(addr, ByteBuffer.wrap(buf), 0, false); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -1993,8 +2174,11 @@ try { - checkDeferredFrees(true, journal); // free now if possible + final int totalFreed = checkDeferredFrees(true, journal); // free now if possible + if (totalFreed > 0 && log.isInfoEnabled()) { + log.info("Freed " + totalFreed + " deferralls on commit"); + } // free old storageStatsAddr if (m_storageStatsAddr != 0) { int len = (int) (m_storageStatsAddr & 0xFFFF); @@ -2017,14 +2201,14 @@ throw new IllegalStateException("Returned MetaBits Address not valid!"); } - // TODO: assert that m_deferredFreeOut is empty! - assert m_deferredFreeOut.getBytesWritten() == 0; - // Call immediateFree - no need to defer freeof metaBits, this // has to stop somewhere! // No more allocations must be made immediateFree((int) oldMetaBits, oldMetaBitsSize); + // There must be no buffered deferred frees + assert m_deferredFreeOut.getBytesWritten() == 0; + // save allocation headers final Iterator<Allocator> iter = m_commitList.iterator(); while (iter.hasNext()) { @@ -2097,8 +2281,10 @@ * <p> * Note: This method is package private in order to expose it to the unit * tests. + * + * returns number of addresses freed */ - /* public */void checkDeferredFrees(final boolean freeNow, + /* public */int checkDeferredFrees(final boolean freeNow, final Journal journal) { // Note: Invoked from unit test w/o the lock... @@ -2140,9 +2326,11 @@ * Note: This adds one to the lastDeferredReleaseTime to give * exclusive lower bound semantics. */ - freeDeferrals(journal, m_lastDeferredReleaseTime + 1, + return freeDeferrals(journal, m_lastDeferredReleaseTime + 1, latestReleasableTime); + } else { + return 0; } } @@ -2397,31 +2585,6 @@ return ret; } -// /* -// * clear -// * -// * reset the file size commit the root blocks -// */ -// public void clear() { -// try { -// baseInit(); -// -// m_fileSize = -4; -// m_metaStartAddr = m_fileSize; -// m_nextAllocation = -1; // keep on a 8K boundary (8K minimum -// // allocation) -// m_raf.setLength(convertAddr(m_fileSize)); -// -// m_curHdrAddr = 0; -// m_rootAddr = 0; -// -// startTransaction(); -// commitTransaction(); -// } catch (Exception e) { -// throw new StorageTerminalError("Unable to clear store", e); -// } -// } - public static long convertAddr(final int addr) { final long laddr = addr; if (laddr < 0) { @@ -2587,36 +2750,6 @@ return -1; } - -// // -------------------------------------------------------------------------------------- -// private String allocListStats(final List<Allocator> list, final AtomicLong counter) { -// final StringBuffer stats = new StringBuffer(); -// final Iterator<Allocator> iter = list.iterator(); -// while (iter.hasNext()) { -// stats.append(iter.next().getStats(counter)); -// } -// -// return stats.toString(); -// } -// -// public String getStats(final boolean full) { -// -// final AtomicLong counter = new AtomicLong(); -// -// final StringBuilder sb = new StringBuilder("FileSize : " + m_fileSize -// + " allocated : " + m_nextAllocation + "\r\n"); -// -// if (full) { -// -// sb.append(allocListStats(m_allocs, counter)); -// -// sb.append("Allocated : " + counter); -// -// } -// -// return sb.toString(); -// -// } public static class AllocationStats { public AllocationStats(final int i) { @@ -2626,11 +2759,29 @@ long m_reservedSlots; long m_filledSlots; } - /** - * Collected statistics are against each Allocation Block size. See - * {@link StorageStats#showStats(StringBuilder)} for details on the - * generated report. + * Utility debug outputing the allocator array, showing index, start + * address and alloc type/size + * + * Collected statistics are against each Allocation Block size: + * total number of slots | store size + * number of filled slots | store used + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> + * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> + * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> + * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> + * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> + * </dl> */ public void showAllocators(final StringBuilder str) { m_storageStats.showStats(str); @@ -2761,8 +2912,8 @@ final FixedAllocator allocator = getBlock(addr); final int offset = getOffset(addr); final long laddr = allocator.getPhysicalAddress(offset); - - return laddr; + + return allocator instanceof DirectFixedAllocator ? -laddr : laddr; } } @@ -2790,10 +2941,6 @@ return alloc; } -// private int blockIndex(int addr) { -// return (-addr) >>> OFFSET_BITS; -// } - private FixedAllocator getBlock(final int addr) { final int index = (-addr) >>> OFFSET_BITS; @@ -2804,24 +2951,6 @@ return (-addr) & OFFSET_BITS_MASK; // OFFSET_BITS } -// public int addr2Size(final int addr) { -// if (addr > 0) { -// int size = 0; -// -// final int index = ((int) addr) % 16; -// -// if (index == 15) { // blob -// throw new Error("FIX ME : legacy BLOB code being accessed somehow"); -// } else { -// size = m_minFixedAlloc * m_allocSizes[index]; -// } -// -// return size; -// } else { -// return getBlock(addr).getPhysicalSize(getOffset(addr)); -// } -// } - /** * The {@link RWStore} always generates negative address values. * @@ -2831,150 +2960,10 @@ return addr <= 0; } -// /******************************************************************************* -// * called when used as a server, returns whether facility is enabled, this -// * is the whole point of the wormStore - so the answer is true -// **/ -// public boolean preserveSessionData() { -// m_preserveSession = true; -// -// return true; -// } -// -// /******************************************************************************* -// * called by allocation blocks to determine whether they can re-allocate -// * data within this session. -// **/ -// protected boolean isSessionPreserved() { -// return m_preserveSession || m_contexts.size() > 0; -// } - -// /********************************************************************* -// * create backup file, copy data to it, and close it. -// **/ -// synchronized public void backup(String filename) throws FileNotFoundException, IOException { -// File destFile = new File(filename); -// destFile.createNewFile(); -// -// RandomAccessFile dest = new RandomAccessFile(destFile, "rw"); -// -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = m_raf.read(buf); -// if (rdSize > 0) { -// dest.write(buf, 0, rdSize); -// } -// } -// -// dest.close(); -// } -// -// /********************************************************************* -// * copy storefile to output stream. -// **/ -// synchronized public void backup(OutputStream outstr) throws IOException { -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = m_raf.read(buf); -// if (rdSize > 0) { -// outstr.write(buf, 0, rdSize); -// } -// } -// } -// -// synchronized public void restore(InputStream instr) throws IOException { -// int bufSize = 64 * 1024; -// byte[] buf = new byte[bufSize]; -// -// m_raf.seek(0); -// -// int rdSize = bufSize; -// while (rdSize == bufSize) { -// rdSize = instr.read(buf); -// if (rdSize > 0) { -// m_raf.write(buf, 0, rdSize); -// } -// } -// } - -// /*************************************************************************************** -// * Needed by PSOutputStream for BLOB buffer chaining. -// **/ -// public void absoluteWriteInt(final int addr, final int offset, final int value) { -// try { -// // must check write cache!!, or the write may be overwritten - just -// // flush for now -// m_writes.flush(); -// -// m_raf.seek(physicalAddress(addr) + offset); -// m_raf.writeInt(value); -// } catch (IOException e) { -// throw new StorageTerminalError("Unable to write integer", e); -// } -// } - -// /*************************************************************************************** -// * Needed to free Blob chains. -// **/ -// public int absoluteReadInt(final int addr, final int offset) { -// try { -// m_raf.seek(physicalAddress(addr) + offset); -// return m_raf.readInt(); -// } catch (IOException e) { -// throw new StorageTerminalError("Unable to write integer", e); -// } -// } - -// /*************************************************************************************** -// * Needed by PSOutputStream for BLOB buffer chaining. -// **/ -// public int bufferChainOffset() { -// return m_maxFixedAlloc - 4; -// } - public File getStoreFile() { return m_fd; } -// public boolean isLongAddress() { -// // always ints -// return false; -// } - -// public int absoluteReadLong(long addr, int offset) { -// throw new UnsupportedOperationException(); -// } -// -// public void absoluteWriteLong(long addr, int threshold, long value) { -// throw new UnsupportedOperationException(); -// } - -// public void absoluteWriteAddress(long addr, int threshold, long addr2) { -// absoluteWriteInt((int) addr, threshold, (int) addr2); -// } - -// public int getAddressSize() { -// return 4; -// } - -// public RandomAccessFile getRandomAccessFile() { -// return m_raf; -// } - -// public FileChannel getChannel() { -// return m_raf.getChannel(); -// } - public boolean requiresCommit() { return m_recentAlloc; } @@ -3359,8 +3348,9 @@ /** * Provided with the address of a block of addresses to be freed * @param blockAddr + * @return the total number of addresses freed */ - private void freeDeferrals(final long blockAddr, final long lastReleaseTime) { + private int freeDeferrals(final long blockAddr, final long lastReleaseTime) { final int addr = (int) (blockAddr >> 32); final int sze = (int) blockAddr & 0xFFFFFF; @@ -3371,9 +3361,12 @@ getData(addr, buf); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); m_allocationLock.lock(); + int totalFreed = 0; try { int nxtAddr = strBuf.readInt(); + int cnt = 0; + while (nxtAddr != 0) { // while (false && addrs-- > 0) { if (nxtAddr > 0) { // Blob @@ -3386,6 +3379,8 @@ immediateFree(nxtAddr, 1); // size ignored for FixedAllocators } + totalFreed++; + nxtAddr = strBuf.readInt(); } m_lastDeferredReleaseTime = lastReleaseTime; @@ -3397,6 +3392,8 @@ } finally { m_allocationLock.unlock(); } + + return totalFreed; } /** @@ -3409,7 +3406,7 @@ * @param toTime * The exclusive upper bound. */ - private void freeDeferrals(final AbstractJournal journal, + private int freeDeferrals(final AbstractJournal journal, final long fromTime, final long toTime) { @@ -3438,6 +3435,8 @@ if(log.isTraceEnabled()) log.trace("fromTime=" + fromTime + ", toTime=" + toTime); + int totalFreed = 0; + while (commitRecords.hasNext()) { final ITuple<CommitRecordIndex.Entry> tuple = commitRecords.next(); @@ -3452,12 +3451,13 @@ if (blockAddr != 0) { - freeDeferrals(blockAddr, record.getTimestamp()); + totalFreed += freeDeferrals(blockAddr, record.getTimestamp()); } } + return totalFreed; } /** @@ -3465,6 +3465,7 @@ * and an overall list of allocators. When the context is detached, all * allocators must be released and any that has available capacity will be * assigned to the global free lists. + * See {@link AllocBlock #releaseSession} * * @param context * The context to be released from all FixedAllocators. @@ -3485,9 +3486,9 @@ /** * The ContextAllocation object manages a freeList of associated allocators - * and an overall list of allocators. When the context is detached, all - * allocators must be released and any that has available capacity will be - * assigned to the global free lists. + * and an overall list of allocators. When the context is aborted then + * allocations made by that context should be released. + * See {@link AllocBlock #abortShadow} * * @param context * The context to be released from all FixedAllocators. @@ -3499,7 +3500,7 @@ final ContextAllocation alloc = m_contexts.remove(context); if (alloc != null) { - alloc.release(); + alloc.abort(); } } finally { m_allocationLock.unlock(); @@ -4352,5 +4353,59 @@ m_allocationLock.unlock(); } } + + /** + * A request for a direct allocation from a Direct ByteBuffer + * + * @param blockSize the size requested + * @return the address of the direct allocation + */ + public int allocateDirect(final int blockSize) { + final int allocBytes = blockSize << this.ALLOCATION_SCALEUP; + if (m_directSpaceAvailable < allocBytes) { + // try and allocate a further buffer + addDirectBuffer(); + } + + if (m_directSpaceAvailable < allocBytes) { + return -1; + } else { + final int ret = m_nextDirectAllocation; + m_nextDirectAllocation += allocBytes; + m_directSpaceAvailable -= allocBytes; + + return ret; + } + } + + /** + * Returns the slot size associated with this address + */ + public int getAssociatedSlotSize(int addr) { + return getBlock(addr).getBlockSize(); + } + + /** + * lockAddress adds the address passed to a lock list. This is for + * debug only and is not intended to be used generally for the live system. + * + * @param addr - address to be locked + */ + public void lockAddress(int addr) { + m_allocationLock.lock(); + try { + if (m_lockAddresses == null) { + m_lockAddresses = new TreeMap<Integer, Integer>(); + } + + if (m_lockAddresses.containsKey(addr)) { + throw new IllegalStateException("address already locked " + addr); + } + + m_lockAddresses.put(addr, addr); + } finally { + m_allocationLock.unlock(); + } + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -207,7 +207,7 @@ return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } public float totalWaste(long total) { - if (usedStore() == 0) + if (total == 0) return 0.0f; long slotWaste = reservedStore() - usedStore(); @@ -234,18 +234,30 @@ return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); } public float slotsUnused() { + if (m_totalSlots == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * (m_totalSlots-usedSlots())); BigDecimal total = new BigDecimal(m_totalSlots); if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentAllocations(long totalAllocations) { + if (totalAllocations == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * m_slotAllocations); BigDecimal total = new BigDecimal(totalAllocations); if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentSlotsInuse(long totalInuse) { + if (totalInuse == 0) { + return 0.0f; + } + BigDecimal used = new BigDecimal(100 * usedSlots()); BigDecimal total = new BigDecimal(totalInuse); if(total.signum()==0) return 0f; @@ -508,6 +520,9 @@ } private float dataPercent(long usedData, long totalData) { + if (totalData == 0) + return 0.0f; + BigDecimal used = new BigDecimal(100 * usedData); BigDecimal total = new BigDecimal(totalData); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-14 17:15:11 UTC (rev 4009) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/test/com/bigdata/journal/StressTestConcurrentUnisolatedIndices.java 2010-12-16 12:44:43 UTC (rev 4010) @@ -103,6 +103,11 @@ final Journal journal = new Journal(properties); + final IBufferStrategy bufferStrategy = journal.getBufferStrategy(); + if (bufferStrategy instanceof RWStrategy) { + ((RWStrategy)bufferStrategy).getRWStore().activateTx(); + } + try { // if(journal.getBufferStrategy() instanceof MappedBufferStrategy) { @@ -118,7 +123,7 @@ // } doConcurrentClientTest(journal,// - 10,// timeout + 30,// timeout 20,// nresources 1, // minLocks 3, // maxLocks @@ -129,6 +134,9 @@ ); } finally { + if (bufferStrategy instanceof RWStrategy) { + ... [truncated message content] |
From: <tho...@us...> - 2010-12-14 17:15:17
|
Revision: 4009 http://bigdata.svn.sourceforge.net/bigdata/?rev=4009&view=rev Author: thompsonbry Date: 2010-12-14 17:15:11 +0000 (Tue, 14 Dec 2010) Log Message: ----------- Added Option to RWStore to disable the double buffering of writes. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:06:17 UTC (rev 4008) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:15:11 UTC (rev 4009) @@ -285,6 +285,16 @@ String DEFAULT_FREE_BITS_THRESHOLD = "300"; + /** + * When <code>true</code>, scattered writes which are strictly ascending + * will be coalesced within a buffer and written out as a single IO + * (default {@value #DEFAULT_DOUBLE_BUFFER_WRITES}). This improves write + * performance for SATA, SAS, and even SSD. + */ + String DOUBLE_BUFFER_WRITES = RWStore.class.getName() + ".doubleBuffer"; + + String DEFAULT_DOUBLE_BUFFER_WRITES = "true"; + } /* @@ -594,10 +604,16 @@ } catch (IOException e1) { throw new RuntimeException(e1); } - - try { - m_bufferedWrite = new BufferedWrite(this); - } catch (InterruptedException e1) { + + if (Boolean.valueOf(fileMetadata.getProperty( + Options.DOUBLE_BUFFER_WRITES, + Options.DEFAULT_DOUBLE_BUFFER_WRITES))) { + try { + m_bufferedWrite = new BufferedWrite(this); + } catch (InterruptedException e1) { + m_bufferedWrite = null; + } + } else { m_bufferedWrite = null; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-14 17:06:24
|
Revision: 4008 http://bigdata.svn.sourceforge.net/bigdata/?rev=4008&view=rev Author: thompsonbry Date: 2010-12-14 17:06:17 +0000 (Tue, 14 Dec 2010) Log Message: ----------- This is a re-do on a commit to fix a problem with RWStore where running with a ZERO (0) retention window would improperly recycle records while there was an open transaction. I thought that this change set was already committed, but clearly it was not. There is also a change to StorageStats and RWStore#showAllocators() to fix a DivideByZero problem (in the former) and to use the StorageStats in place of the older RWStore's self-reporting capabilities. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 16:56:43 UTC (rev 4007) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-12-14 17:06:17 UTC (rev 4008) @@ -1448,21 +1448,35 @@ * FIXME We need unit test when MIN_RELEASE_AGE is ZERO AND * there are open read-only transactions. */ - if (m_minReleaseAge == 0) { - immediateFree(addr, sze); + boolean alwaysDefer = m_minReleaseAge > 0L + || m_activeTxCount > 0; + if (!alwaysDefer) + alwaysDefer = context == null && !m_contexts.isEmpty(); + if (alwaysDefer) + if (log.isDebugEnabled()) + log.debug("Should defer " + addr + " real: " + + physicalAddress(addr)); + if (alwaysDefer + || !alloc.canImmediatelyFree(addr, sze, context)) { + deferFree(addr, sze); } else { - boolean alwaysDefer = m_activeTxCount > 0; - if (!alwaysDefer) - alwaysDefer = context == null && !m_contexts.isEmpty(); - if (alwaysDefer) - if (log.isDebugEnabled()) - log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); - if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { - deferFree(addr, sze); - } else { - immediateFree(addr, sze); - } + immediateFree(addr, sze); } +// if (m_minReleaseAge == 0) { +// immediateFree(addr, sze); +// } else { +// boolean alwaysDefer = m_activeTxCount > 0; +// if (!alwaysDefer) +// alwaysDefer = context == null && !m_contexts.isEmpty(); +// if (alwaysDefer) +// if (log.isDebugEnabled()) +// log.debug("Should defer " + addr + " real: " + physicalAddress(addr)); +// if (alwaysDefer || !alloc.canImmediatelyFree(addr, sze, context)) { +// deferFree(addr, sze); +// } else { +// immediateFree(addr, sze); +// } +// } } } finally { m_allocationLock.unlock(); @@ -2596,85 +2610,68 @@ long m_reservedSlots; long m_filledSlots; } + /** - * Utility debug outputing the allocator array, showing index, start - * address and alloc type/size - * - * Collected statistics are against each Allocation Block size: - * total number of slots | store size - * number of filled slots | store used - * <dl> - * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> - * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> - * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> - * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> - * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> - * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> - * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> - * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> - * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> - * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> - * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> - * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> - * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> - * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> - * </dl> + * Collected statistics are against each Allocation Block size. See + * {@link StorageStats#showStats(StringBuilder)} for details on the + * generated report. */ public void showAllocators(final StringBuilder str) { - final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; - for (int i = 0; i < stats.length; i++) { - stats[i] = new AllocationStats(m_allocSizes[i]*64); - } - - final Iterator<FixedAllocator> allocs = m_allocs.iterator(); - while (allocs.hasNext()) { - Allocator alloc = (Allocator) allocs.next(); - alloc.appendShortStats(str, stats); - } - - // Append Summary - str.append("\n-------------------------\n"); - str.append("RWStore Allocation Summary\n"); - str.append("-------------------------\n"); - str.append(padRight("Allocator", 10)); - str.append(padLeft("SlotsUsed", 12)); - str.append(padLeft("reserved", 12)); - str.append(padLeft("StoreUsed", 14)); - str.append(padLeft("reserved", 14)); - str.append(padLeft("Usage", 8)); - str.append(padLeft("Store", 8)); - str.append("\n"); - long treserved = 0; - long treservedSlots = 0; - long tfilled = 0; - long tfilledSlots = 0; - for (int i = 0; i < stats.length; i++) { - final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; - treserved += reserved; - treservedSlots += stats[i].m_reservedSlots; - final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; - tfilled += filled; - tfilledSlots += stats[i].m_filledSlots; - } - for (int i = 0; i < stats.length; i++) { - final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; - final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; - str.append(padRight("" + stats[i].m_blockSize, 10)); - str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); - str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); - str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); - str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); - str.append("\n"); - } - str.append("\n"); - - str.append(padRight("Totals", 10)); - str.append(padLeft("" + tfilledSlots, 12)); - str.append(padLeft("" + treservedSlots, 12)); - str.append(padLeft("" + tfilled, 14)); - str.append(padLeft("" + treserved, 14)); - str.append(padLeft("" + (treserved==0?0:(tfilled * 100 / treserved)) + "%", 8)); - str.append("\nFile size: " + convertAddr(m_fileSize) + "bytes\n"); + m_storageStats.showStats(str); +// final AllocationStats[] stats = new AllocationStats[m_allocSizes.length]; +// for (int i = 0; i < stats.length; i++) { +// stats[i] = new AllocationStats(m_allocSizes[i]*64); +// } +// +// final Iterator<FixedAllocator> allocs = m_allocs.iterator(); +// while (allocs.hasNext()) { +// Allocator alloc = (Allocator) allocs.next(); +// alloc.appendShortStats(str, stats); +// } +// +// // Append Summary +// str.append("\n-------------------------\n"); +// str.append("RWStore Allocation Summary\n"); +// str.append("-------------------------\n"); +// str.append(padRight("Allocator", 10)); +// str.append(padLeft("SlotsUsed", 12)); +// str.append(padLeft("reserved", 12)); +// str.append(padLeft("StoreUsed", 14)); +// str.append(padLeft("reserved", 14)); +// str.append(padLeft("Usage", 8)); +// str.append(padLeft("Store", 8)); +// str.append("\n"); +// long treserved = 0; +// long treservedSlots = 0; +// long tfilled = 0; +// long tfilledSlots = 0; +// for (int i = 0; i < stats.length; i++) { +// final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; +// treserved += reserved; +// treservedSlots += stats[i].m_reservedSlots; +// final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; +// tfilled += filled; +// tfilledSlots += stats[i].m_filledSlots; +// } +// for (int i = 0; i < stats.length; i++) { +// final long reserved = stats[i].m_reservedSlots * stats[i].m_blockSize; +// final long filled = stats[i].m_filledSlots * stats[i].m_blockSize; +// str.append(padRight("" + stats[i].m_blockSize, 10)); +// str.append(padLeft("" + stats[i].m_filledSlots, 12) + padLeft("" + stats[i].m_reservedSlots, 12)); +// str.append(padLeft("" + filled, 14) + padLeft("" + reserved, 14)); +// str.append(padLeft("" + (reserved==0?0:(filled * 100 / reserved)) + "%", 8)); +// str.append(padLeft("" + (treserved==0?0:(reserved * 100 / treserved)) + "%", 8)); +// str.append("\n"); +// } +// str.append("\n"); +// +// str.append(padRight("Totals", 10)); +// str.append(padLeft("" + tfilledSlots, 12)); +// str.append(padLeft("" + treservedSlots, 12)); +// str.append(padLeft("" + tfilled, 14)); +// str.append(padLeft("" + treserved, 14)); +// str.append(padLeft("" + (treserved==0?0:(tfilled * 100 / treserved)) + "%", 8)); +// str.append("\nFile size: " + convertAddr(m_fileSize) + "bytes\n"); } private String padLeft(String str, int minlen) { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 16:56:43 UTC (rev 4007) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/StorageStats.java 2010-12-14 17:06:17 UTC (rev 4008) @@ -31,7 +31,6 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; -import java.util.Formatter; /** * Maintains stats on the RWStore allocations, useful for tuning Allocator @@ -204,7 +203,7 @@ BigDecimal size = new BigDecimal(reservedStore()); BigDecimal store = new BigDecimal(100 * (reservedStore() - usedStore())); - + if(store.signum()==0) return 0f; return store.divide(size, 2, RoundingMode.HALF_UP).floatValue(); } public float totalWaste(long total) { @@ -215,7 +214,7 @@ BigDecimal localWaste = new BigDecimal(100 * slotWaste); BigDecimal totalWaste = new BigDecimal(total); - + if(totalWaste.signum()==0) return 0f; return localWaste.divide(totalWaste, 2, RoundingMode.HALF_UP).floatValue(); } public long reservedStore() { @@ -231,25 +230,25 @@ BigDecimal allocs = new BigDecimal(m_slotAllocations); BigDecimal used = new BigDecimal(usedSlots()); - + if(used.signum()==0) return 0f; return allocs.divide(used, 2, RoundingMode.HALF_UP).floatValue(); } public float slotsUnused() { BigDecimal used = new BigDecimal(100 * (m_totalSlots-usedSlots())); BigDecimal total = new BigDecimal(m_totalSlots); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentAllocations(long totalAllocations) { BigDecimal used = new BigDecimal(100 * m_slotAllocations); BigDecimal total = new BigDecimal(totalAllocations); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public float percentSlotsInuse(long totalInuse) { BigDecimal used = new BigDecimal(100 * usedSlots()); BigDecimal total = new BigDecimal(totalInuse); - + if(total.signum()==0) return 0f; return used.divide(total, 2, RoundingMode.HALF_UP).floatValue(); } public int meanAllocation() { @@ -384,7 +383,40 @@ public void register(FixedAllocator alloc) { register(alloc, false); } - + + /** + * Collected statistics are against each Allocation Block size: + * <dl> + * <dt>AllocatorSize</dt><dd>The #of bytes in the allocated slots issued by this allocator.</dd> + * <dt>AllocatorCount</dt><dd>The #of fixed allocators for that slot size.</dd> + * <dt>SlotsInUse</dt><dd>The difference between the two previous columns (net slots in use for this slot size).</dd> + * <dt>SlotsReserved</dt><dd>The #of slots in this slot size which have had storage reserved for them.</dd> + * <dt>SlotsAllocated</dt><dd>Cumulative allocation of slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsRecycled</dt><dd>Cumulative recycled slots to date in this slot size (regardless of the transaction outcome).</dd> + * <dt>SlotsChurn</dt><dd>How frequently slots of this size are re-allocated (SlotsInUse/SlotsAllocated).</dd> + * <dt>%SlotsUnused</dt><dd>The percentage of slots of this size which are not in use (1-(SlotsInUse/SlotsReserved)).</dd> + * <dt>BytesReserved</dt><dd>The space reserved on the backing file for those allocation slots</dd> + * <dt>BytesAppData</dt><dd>The #of bytes in the allocated slots which are used by application data (including the record checksum).</dd> + * <dt>%SlotWaste</dt><dd>How well the application data fits in the slots (BytesAppData/(SlotsInUse*AllocatorSize)).</dd> + * <dt>%AppData</dt><dd>How much of your data is stored by each allocator (BytesAppData/Sum(BytesAppData)).</dd> + * <dt>%StoreFile</dt><dd>How much of the backing file is reserved for each allocator (BytesReserved/Sum(BytesReserved)).</dd> + * <dt>%StoreWaste</dt><dd>How much of the total waste on the store is waste for this allocator size ((BytesReserved-BytesAppData)/(Sum(BytesReserved)-Sum(BytesAppData))).</dd> + * </dl> + * + * @param str + * + * FIXME Javadoc edit - this has diverged from the comments above. Also, there + * is also a divideByZero which can appear (this has been fixed).<pre> + [java] Exception in thread "main" java.lang.ArithmeticException: / by zero + [java] at java.math.BigDecimal.divideAndRound(BigDecimal.java:1407) + [java] at java.math.BigDecimal.divide(BigDecimal.java:1381) + [java] at java.math.BigDecimal.divide(BigDecimal.java:1491) + [java] at com.bigdata.rwstore.StorageStats$Bucket.slotsUnused(StorageStats.java:240) + [java] at com.bigdata.rwstore.StorageStats.showStats(StorageStats.java:448) + [java] at com.bigdata.rwstore.RWStore.showAllocators(RWStore.java:2620) + [java] at com.bigdata.rdf.store.DataLoader.main(DataLoader.java:1415) + </pre> + */ public void showStats(StringBuilder str) { str.append("\n-------------------------\n"); str.append("RWStore Allocator Summary\n"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-14 16:56:49
|
Revision: 4007 http://bigdata.svn.sourceforge.net/bigdata/?rev=4007&view=rev Author: thompsonbry Date: 2010-12-14 16:56:43 +0000 (Tue, 14 Dec 2010) Log Message: ----------- Javadoc on BufferedWrite, added explicit synchronization, safe release of the direct ByteBuffer and a variety of other things for the paranoid. Modified WriteCache to invoke BufferedWrite#reset() when non-null. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -27,99 +27,242 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicReference; +import com.bigdata.counters.CAT; +import com.bigdata.counters.CounterSet; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.rwstore.RWStore; /** - * The BufferedWrite merges/elides sorted scattered writes to minimise - * IO requests and maximise IO rates. + * The BufferedWrite merges/elides sorted scattered writes to minimize IO + * requests and maximize IO rates. This has a net positive effect on SAS, SATA, + * and SSD. * * @author Martyn Cutcher - * + * + * @todo unit tests (this is used by RWStore and so is in general tested as part + * of that class, but it does not have its own test suite and it should not + * be all that difficult to write one, especially if we factor out an API + * for reporting the slotSize and then use a mock object in place of the + * RWStore). + * + * @todo harmonize with {@link CounterSet} for reporting purposes. */ public class BufferedWrite { - final RWStore m_store; - final ByteBuffer m_data; - long m_startAddr = -1; - long m_endAddr = 0; + + /** + * Used to determine the size of the allocation slot onto which a record is + * being written. This is used to pad the size of the IO out to the size of + * the slot. This can improve the IO efficiency When the slots are sized so + * as to fall on multiples of sector boundaries. + */ + private final RWStore m_store; + + /** + * The direct {@link ByteBuffer} used to combine writes which are contiguous + * into a single IO. + */ +// private final ByteBuffer m_data; + private final AtomicReference<ByteBuffer> m_data = new AtomicReference<ByteBuffer>(); + + /** + * The offset on the backing channel at which the data in {@link #m_data} + * will be written when it is flushed to the backing channel. This is + * <code>-1</code> initially (and when reset) as a flag indicating that + * there is no data in {@link #m_data} and that the next record written by + * the caller on the buffer will assign the {@link #m_startAddr starting + * offset} of the data in the buffer. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_startAddr = -1; + + /** + * The offset of the backing channel at which the next byte would be written + * if it were appended to the data already present in {@link #m_data}. + * <p> + * Guarded by synchronized(this) (paranoia) + */ + private long m_endAddr = 0; - long m_dataBytes = 0; - long m_dataWrites = 0; - long m_fileWrites = 0; + /* + * Counters. + */ + private final CAT m_dataBytes = new CAT(); + private final CAT m_dataWrites = new CAT(); + private final CAT m_fileWrites = new CAT(); public BufferedWrite(final RWStore store) throws InterruptedException { + + if (store == null) + throw new IllegalArgumentException(); + m_store = store; - m_data = DirectBufferPool.INSTANCE.acquire(); + + m_data.set( DirectBufferPool.INSTANCE.acquire() ); + } - + + /** + * Release the direct buffer associated with this object. + * + * @throws InterruptedException + */ +// /* +// * Note: Consider adding synchronized(this) here to guard against the +// * possibility that the buffer could be released (and hence recycled) while +// * a write operation was occurring concurrently, However, this raises the +// * specter that a lock ordering problem could cause a deadlock. +// */ +// synchronized public void release() throws InterruptedException { - DirectBufferPool.INSTANCE.release(m_data); + + final ByteBuffer tmp = m_data.get(); + + if (tmp == null) { + + // Already closed. + return; + + } + + if (m_data.compareAndSet(tmp/* expected */, null/* update */)) { + + DirectBufferPool.INSTANCE.release(tmp); + + } + } - - public int write(final long offset, final ByteBuffer data, final IReopenChannel<FileChannel> opener) throws IOException { - int nwrites = 0; + + /** + * Buffer a write. + * + * @param offset + * The offset on the backing channel at which the data should be + * written. + * @param data + * The data. + * @param opener + * The object which knows how to re-open the backing channel. + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int write(final long offset, final ByteBuffer data, + final IReopenChannel<FileChannel> opener) throws IOException { - m_dataWrites++; + m_dataWrites.increment(); - int data_len = data.remaining(); - int slot_len = m_store.getSlotSize(data_len); + final int data_len = data.remaining(); + final int slot_len = m_store.getSlotSize(data_len); + int nwrites = 0; + final ByteBuffer m_data = this.m_data.get(); if (slot_len > m_data.remaining()) { + /* + * There is not enough room in [m_data] to absorb the caller's data + * record, so we have to flush first. + */ nwrites += flush(opener); } if (m_startAddr == -1) { + /* + * The buffer will begin to absorb data destined for the [offset] + * into the backing channel specified for the caller's data record. + */ m_startAddr = m_endAddr = offset; } else if (m_endAddr != offset) { - // if this is NOT a contiguous write then flush existing content + /* + * If this is NOT a contiguous write then flush existing content. + * After the flush, the buffer will begin to absorb data destined + * for the [offset] into the backing channel specified for the + * caller's data record. + */ nwrites += flush(opener); m_startAddr = m_endAddr = offset; } + // copy the caller's record into the buffer. m_data.put(data); + // update the file offset by the size of the allocation slot m_endAddr += slot_len; - long pos = m_endAddr - m_startAddr; + // update the buffer position by the size of the allocation slot. + final long pos = m_endAddr - m_startAddr; m_data.position((int) pos); return nwrites; } - public int flush(final IReopenChannel<FileChannel> opener) throws IOException { - m_dataBytes += m_data.position(); + /** + * Flush buffered data to the backing channel. + * + * @param opener + * The object which knows how to re-open the backing channel. + * + * @return The #of write IOs performed during this method call. + * + * @throws IOException + */ + synchronized + public int flush(final IReopenChannel<FileChannel> opener) + throws IOException { + + final ByteBuffer m_data = this.m_data.get(); + + if (m_data.remaining() == 0) { + // NOP. + return 0; + } + // increment by the amount of data currently in the buffer. + m_dataBytes.add( m_data.position() ); + + // write out the data in the buffer onto the backing channel. m_data.flip(); final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); - m_fileWrites++; - + m_fileWrites.add(nwrites); + reset(); return nwrites; } + + /** + * Reset the buffer position and limit and clear the starting offset on the + * file to <code>-1</code>. + */ + synchronized + public void reset() { + + final ByteBuffer m_data = this.m_data.get(); + + // reset the buffer state. + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } - public String getStats(StringBuffer buf, boolean reset) { - String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; + public String getStats(final StringBuffer buf, final boolean reset) { + + final String ret = "BufferedWrites, data: " + m_dataWrites + ", file: " + m_fileWrites + ", bytes: " + m_dataBytes; if (buf != null) { buf.append(ret + "\n"); } if (reset) { - m_dataBytes = m_fileWrites = m_dataWrites = 0; + m_dataBytes.set(0L); + m_fileWrites.set(0L); + m_dataWrites.set(0L); } return ret; } - /** - * Caled by flush and also prior to use by the WriteCache. - */ - public void reset() { - m_data.position(0); - m_data.limit(m_data.capacity()); - - m_startAddr = -1; - m_endAddr = 0; - } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:50:21 UTC (rev 4006) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:56:43 UTC (rev 4007) @@ -1645,7 +1645,7 @@ * * If there is a BufferedWrite then ensure it is reset. */ - if (m_bufferedWrite == null) { + if (m_bufferedWrite != null) { m_bufferedWrite.reset(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2010-12-14 16:50:27
|
Revision: 4006 http://bigdata.svn.sourceforge.net/bigdata/?rev=4006&view=rev Author: martyncutcher Date: 2010-12-14 16:50:21 +0000 (Tue, 14 Dec 2010) Log Message: ----------- add reset to BufferredWrite Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-12 22:22:27 UTC (rev 4005) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2010-12-14 16:50:21 UTC (rev 4006) @@ -93,12 +93,8 @@ final int nwrites = FileChannelUtility.writeAll(opener, m_data, m_startAddr); m_fileWrites++; - m_data.position(0); - m_data.limit(m_data.capacity()); + reset(); - m_startAddr = -1; - m_endAddr = 0; - return nwrites; } @@ -115,4 +111,15 @@ return ret; } + + /** + * Caled by flush and also prior to use by the WriteCache. + */ + public void reset() { + m_data.position(0); + m_data.limit(m_data.capacity()); + + m_startAddr = -1; + m_endAddr = 0; + } } Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-12 22:22:27 UTC (rev 4005) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-12-14 16:50:21 UTC (rev 4006) @@ -1641,8 +1641,14 @@ /* * Retrieve the sorted write iterator and write each block to the - * file + * file. + * + * If there is a BufferedWrite then ensure it is reset. */ + if (m_bufferedWrite == null) { + m_bufferedWrite.reset(); + } + int nwrites = 0; final Iterator<Entry<Long, RecordMetadata>> entries = recordMap.entrySet().iterator(); while (entries.hasNext()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-12 23:22:57
|
Revision: 4005 http://bigdata.svn.sourceforge.net/bigdata/?rev=4005&view=rev Author: thompsonbry Date: 2010-12-12 22:22:27 +0000 (Sun, 12 Dec 2010) Log Message: ----------- Modified to use a different pattern to construct the binding sets in one of the unit tests (this relied on a constructor which is no longer public). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-12-12 22:22:01 UTC (rev 4004) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-12-12 22:22:27 UTC (rev 4005) @@ -458,20 +458,31 @@ * * Note: We can't bind y in advance for the primary index! */ - final IBindingSet[] source = new IBindingSet[] {// - new HashBindingSet(new ArrayBindingSet(// - new IVariable[] { x },// - new IConstant[] { new Constant<String>("Paul") }// - )),// - new HashBindingSet(new ArrayBindingSet(// - new IVariable[] { x },// - new IConstant[] { new Constant<String>("Leon") }// - )), - new HashBindingSet(new ArrayBindingSet(// - new IVariable[] { x },// - new IConstant[] { new Constant<String>("Mary") }// - )), + final IBindingSet[] source; + { + final IBindingSet bset1 = new HashBindingSet(); + bset1.set(x, new Constant<String>("Paul")); + final IBindingSet bset2 = new HashBindingSet(); + bset2.set(x, new Constant<String>("Leon")); + final IBindingSet bset3 = new HashBindingSet(); + bset3.set(x, new Constant<String>("Mary")); + + source = new IBindingSet[] {// + bset1,bset2,bset3 +// new HashBindingSet(new ArrayBindingSet(// +// new IVariable[] { x },// +// new IConstant[] { new Constant<String>("Paul") }// +// )),// +// new HashBindingSet(new ArrayBindingSet(// +// new IVariable[] { x },// +// new IConstant[] { new Constant<String>("Leon") }// +// )), +// new HashBindingSet(new ArrayBindingSet(// +// new IVariable[] { x },// +// new IConstant[] { new Constant<String>("Mary") }// +// )), }; + } // Put each source binding set into a chunk by itself. final IBindingSet[][] sources = new IBindingSet[source.length][]; for (int i = 0; i < sources.length; i++) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-12 22:22:09
|
Revision: 4004 http://bigdata.svn.sourceforge.net/bigdata/?rev=4004&view=rev Author: thompsonbry Date: 2010-12-12 22:22:01 +0000 (Sun, 12 Dec 2010) Log Message: ----------- Added the concept of stackable symbol tables to IBindingSet (push(),pop()) to support optional join groups. Added a new ListBindingSet implementation which should be more efficient than the HashBindingSet, which is our current mutable implementation of choice. Added Serialization tests to the IBindingSet implementations. Modified DistinctBindingSetOp, which used a constructor which is no longer public. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ArrayBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/EmptyBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/HashBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestArrayBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestIBindingSet.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ListBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestListBindingSet.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -103,14 +103,22 @@ * The #of bound variables. */ public int size(); - - /** - * Visits the bindings. - */ + + /** + * Visits the bindings. + * + * @todo The unit tests verify that the implementations do not permit + * mutation using the iterator, but that is not actually specified by + * the API as forbidden. + */ public Iterator<Map.Entry<IVariable,IConstant>> iterator(); /** * Visits the bound variables. + * + * @todo The unit tests verify that the implementations do not permit + * mutation using the iterator, but that is not actually specified by + * the API as forbidden. */ public Iterator<IVariable> vars(); @@ -118,13 +126,17 @@ * Return a shallow copy of the binding set. */ public IBindingSet clone(); - - /** - * Return a shallow copy of the binding set, eliminating unnecessary - * variables. - */ + + /** + * Return a shallow copy of the binding set, eliminating unnecessary + * variables. + * + * @param variablesToKeep + * When non-<code>null</code>, only the listed variables are + * retained. + */ public IBindingSet copy(IVariable[] variablesToKeep); - + /** * True iff the variables and their bound values are the same * for the two binding sets. @@ -134,15 +146,49 @@ */ public boolean equals(Object o); - /** - * The hash code of a binding is defined as the bit-wise XOR of the hash - * codes of the {@link IConstant}s for its bound variables. Unbound - * variables are ignored when computing the hash code. Binding sets are - * unordered collections, therefore the calculated hash code intentionally - * does not dependent on the order in which the bindings are iterated over. - * The hash code reflects the current state of the bindings and must be - * recomputed if the bindings are changed. - */ + /** + * The hash code of a binding is defined as the bit-wise XOR of the hash + * codes of the {@link IConstant}s for its bound variables. Unbound + * variables are ignored when computing the hash code. Binding sets are + * unordered collections, therefore the calculated hash code intentionally + * does not depend on the order in which the bindings are visited. The hash + * code reflects the current state of the bindings and must be recomputed if + * the bindings are changed. + */ public int hashCode(); - + + /** + * Make a copy of the current symbol table (aka current variable bindings) + * and push it onto onto the stack. Variable bindings will be made against + * the current symbol table. The symbol table stack is propagated by + * {@link #clone()} and {@link #copy(IVariable[])}. Symbols tables may be + * used to propagate conditional bindings through a data flow until a + * decision point is reached, at which point they may be either discarded or + * committed. This mechanism may be used to support SPARQL style optional + * join groups. + * + * @throws UnsupportedOperationException + * if the {@link IBindingSet} is not mutable. + * + * @see #pop(boolean) + */ + public void push(); + + /** + * Pop the current symbol table off of the stack. + * + * @param save + * When <code>true</code>, the bindings on the current symbol + * table are copied to the parent symbol table before the current + * symbol table is popped off of the stack. If <code>false</code> + * , any bindings associated with that symbol table are + * discarded. + * + * @throws IllegalStateException + * if there is no nested symbol table. + * + * @see #push() + */ + public void pop(boolean save); + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ArrayBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ArrayBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ArrayBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -28,14 +28,14 @@ package com.bigdata.bop.bindingSet; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Stack; import java.util.Map.Entry; -import org.apache.log4j.Logger; - import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; @@ -54,36 +54,389 @@ private static final long serialVersionUID = -6468905602211956490L; - private static final Logger log = Logger.getLogger(ArrayBindingSet.class); +// private static final Logger log = Logger.getLogger(ArrayBindingSet.class); /** - * A dense array of the bound variables. + * A symbol table implemented by two correlated arrays. */ - private final IVariable[] vars; + private static class ST implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * A dense array of the bound variables. + */ + private final IVariable[] vars; + + /** + * A dense array of the values bound to the variables (correlated with + * {@link #vars}). + */ + private final IConstant[] vals; + + /** + * The #of entries in the arrays which have defined values. + */ + private int nbound = 0; + + private ST(final int nbound,final IVariable[] vars, final IConstant[] vals) { + this.nbound = nbound; + this.vars = vars; + this.vals = vals; + } + + public IConstant get(final IVariable var) { + + if (var == null) + throw new IllegalArgumentException(); + + for (int i = 0; i < nbound; i++) { + + if (vars[i] == var) { + + return vals[i]; + + } + + } + + return null; + + } + + void set(final IVariable var, final IConstant val) { + + if (var == null) + throw new IllegalArgumentException(); + + if (val == null) + throw new IllegalArgumentException(); + + for (int i = 0; i < nbound; i++) { + + if (vars[i] == var) { + + vals[i] = val; + + return; + + } + + } + + vars[nbound] = var; + + vals[nbound] = val; + + nbound++; + + } + + void clearAll() { + + for (int i = nbound - 1; nbound > 0; i--, nbound--) { + + vars[i] = null; + + vals[i] = null; + + } + + assert nbound == 0; + + } + + /** + * Since the array is dense (no gaps), {@link #clear(IVariable)} + * requires that we copy down any remaining elements in the array by one + * position. + * + * @return <code>true</code> if the data structure was modified by the + * operation. + */ + boolean clear(final IVariable var) { + + if (var == null) + throw new IllegalArgumentException(); + + for (int i = 0; i < nbound; i++) { + + if (vars[i] == var) { + + final int nremaining = nbound-(i+1); + + if (nremaining >= 0) { + + // Copy down to close up the gap! + System.arraycopy(vars, i+1, vars, i, nremaining); + + System.arraycopy(vals, i+1, vals, i, nremaining); + + } else { + + // Just clear the reference. + + vars[i] = null; + + vals[i] = null; + + } + + nbound--; + + return true; + + } + + } + + return false; + + } + + } + + /** + * The stack of symbol tables. Each symbol table is a mapping from an + * {@link IVariable} onto its non-<code>null</code> bound {@link IConstant}. + * The stack is initialized with an empty symbol table. Symbol tables may be + * pushed onto the stack or popped off of the stack, but the stack MAY NOT + * become empty. + */ + private final Stack<ST> stack; + + /** + * Return the symbol table on the top of the stack. + */ + private ST current() { + + return stack.peek(); + + } + + public void push() { + + // The current symbol table. + final ST cur = current(); + + // Create a new symbol table. + final ST tmp = new ST(cur.nbound, cur.vars.clone(), cur.vals.clone()); + + // Push the new symbol table onto the stack. + stack.push(tmp); + + } + + public void pop(final boolean save) { + + if (stack.size() < 2) { + /* + * The stack may never become empty. Therefore there must be at + * least two symbol tables on the stack for a pop() request. + */ + throw new IllegalArgumentException(); + } + + // Pop the symbol table off of the top of the stack. + final ST old = stack.pop(); + + if (save) { + + // discard the current symbol table. + stack.pop(); + + // replacing it with the symbol table which we popped off the stack. + stack.push(old); + + } else { + + // clear the hash code. + hash = 0; + + } + + } + /** - * A dense array of the values bound to the variables (correlated with - * {@link #vars}). - */ - private final IConstant[] vals; + * Copy constructor (used by clone, copy). + * + * @param src + * The source to be copied. + * @param variablesToKeep + * The variables to be retained for the symbol table on the top + * of the stack (optional). + */ + protected ArrayBindingSet(final ArrayBindingSet src, + final IVariable[] variablesToKeep) { - private int nbound = 0; + stack = new Stack<ST>(); - /** - * Copy constructor. - */ - protected ArrayBindingSet(final ArrayBindingSet bindingSet) { + final int stackSize = src.stack.size(); + + int depth = 1; + + for (ST srcLst : src.stack) { + + /* + * Copy the source bindings. + * + * Note: If a restriction exists on the variables to be copied, then + * it is applied onto the the top level of the stack. If the symbol + * table is saved when it is pop()'d, then the modified bindings + * will replace the parent symbol table on the stack. + */ + final ST tmp = copy(srcLst, + depth == stackSize ? variablesToKeep : null); + + // Push onto the stack. + stack.push(tmp); + + } + + } + + /** + * Return a copy of the source list. + * + * @param src + * The source list. + * @param variablesToKeep + * When non-<code>null</code>, only the bindings for the + * variables listed in this array will copied. + * + * @return The copy. + */ + private ST copy(final ST src, final IVariable[] variablesToKeep) { + + if (variablesToKeep == null) { + + return new ST(src.nbound, src.vars, src.vals); + + } + + final ST dst = new ST(0/* nbound */, new IVariable[src.vars.length], + new IConstant[src.vals.length]); + + // bitflag for the old binding set + final boolean[] keep = new boolean[src.nbound]; - if (bindingSet == null) - throw new IllegalArgumentException(); + // for each var in the old binding set, see if we need to keep it + for (int i = 0; i < src.nbound; i++) { + + final IVariable v = src.vars[i]; + + keep[i] = false; + for (IVariable k : variablesToKeep) { + if (v == k) { + keep[i] = true; + break; + } + } + + } - nbound = bindingSet.nbound; + // fill in the new binding set based on the keep bitflag + for (int i = 0; i < src.nbound; i++) { + if (keep[i]) { + dst.vars[dst.nbound] = src.vars[i]; + dst.vals[dst.nbound] = src.vals[i]; + dst.nbound++; + } + } - vars = bindingSet.vars.clone(); +// final Iterator<E> itr = src.iterator(); +// +// while (itr.hasNext()) { +// +// final E e = itr.next(); +// +// boolean keep = true; +// +// if (variablesToKeep != null) { +// +// keep = false; +// +// for (IVariable<?> x : variablesToKeep) { +// +// if (x == e.var) { +// +// keep = true; +// +// break; +// +// } +// +// } +// +// } +// +// if (keep) +// dst.add(new E(e.var, e.val)); +// +// } - vals = bindingSet.vals.clone(); + return dst; + + } + +// public ArrayBindingSet XXcopy(final IVariable[] variablesToKeep) { +// +// // bitflag for the old binding set +// final boolean[] keep = new boolean[nbound]; +// +// // for each var in the old binding set, see if we need to keep it +// for (int i = 0; i < nbound; i++) { +// +// final IVariable v = vars[i]; +// +// keep[i] = false; +// for (IVariable k : variablesToKeep) { +// if (v == k) { +// keep[i] = true; +// break; +// } +// } +// +// } +// +// // allocate the new vars +// final IVariable[] newVars = new IVariable[vars.length]; +// +// // allocate the new vals +// final IConstant[] newVals = new IConstant[vals.length]; +// +// // fill in the new binding set based on the keep bitflag +// int newbound = 0; +// for (int i = 0; i < nbound; i++) { +// if (keep[i]) { +// newVars[newbound] = vars[i]; +// newVals[newbound] = vals[i]; +// newbound++; +// } +// } +// +// ArrayBindingSet bs = new ArrayBindingSet(newVars, newVals); +// bs.nbound = newbound; +// +// return bs; +// +// } + + public ArrayBindingSet clone() { + + return new ArrayBindingSet(this, null/* variablesToKeep */); } - + + public ArrayBindingSet copy(final IVariable[] variablesToKeep) { + + return new ArrayBindingSet(this, variablesToKeep); + + } + /** * Initialized with the given bindings (assumes for efficiency that all * elements of bound arrays are non-<code>null</code> and that no @@ -105,21 +458,9 @@ if(vars.length != vals.length) throw new IllegalArgumentException(); - // for (int i = 0; i < vars.length; i++) { - // - // if (vars[i] == null) - // throw new IllegalArgumentException(); - // - // if (vals[i] == null) - // throw new IllegalArgumentException(); - // - // } - - this.vars = vars; - - this.vals = vals; + stack = new Stack<ST>(); - this.nbound = vars.length; + stack.push(new ST(vars.length, vars, vals)); } @@ -134,22 +475,32 @@ */ public ArrayBindingSet(final int capacity) { - if (capacity < 0) - throw new IllegalArgumentException(); + if (capacity < 0) + throw new IllegalArgumentException(); - vars = new IVariable[capacity]; + stack = new Stack<ST>(); - vals = new IConstant[capacity]; + stack.push(new ST(0/* nbound */, new IVariable[capacity], + new IConstant[capacity])); } - public Iterator<IVariable> vars() { + /** + * {@inheritDoc} + * <p> + * Iterator does not support either removal or concurrent modification of + * the binding set. + */ + public Iterator<IVariable> vars() { - return Collections.unmodifiableList(Arrays.asList(vars)).iterator(); - + return Collections.unmodifiableList(Arrays.asList(current().vars)) + .iterator(); + } /** + * {@inheritDoc} + * <p> * Iterator does not support either removal or concurrent modification of * the binding set. */ @@ -163,9 +514,11 @@ private int i = 0; + private ST cur = current(); + public boolean hasNext() { - return i < nbound; + return i < cur.nbound; } @@ -178,13 +531,13 @@ public IVariable getKey() { - return vars[index]; + return cur.vars[index]; } public IConstant getValue() { - return vals[index]; + return cur.vals[index]; } @@ -193,9 +546,9 @@ if (value == null) throw new IllegalArgumentException(); - final IConstant t = vals[index]; + final IConstant t = cur.vals[index]; - vals[index] = value; + cur.vals[index] = value; return t; @@ -215,89 +568,34 @@ public int size() { - return nbound; + return current().nbound; } public void clearAll() { - for (int i = nbound - 1; nbound > 0; i--, nbound--) { - - vars[i] = null; - - vals[i] = null; - - } - + current().clearAll(); + // clear the hash code. hash = 0; - assert nbound == 0; - } - /** - * Since the array is dense (no gaps), {@link #clear(IVariable)} requires - * that we copy down any remaining elements in the array by one position. - */ - public void clear(final IVariable var) { + public void clear(final IVariable var) { - if (var == null) - throw new IllegalArgumentException(); + if (current().clear(var)) { - for (int i = 0; i < nbound; i++) { + // clear the hash code. + hash = 0; - if (vars[i] == var) { + } - final int nremaining = nbound-(i+1); - - if (nremaining >= 0) { - - // Copy down to close up the gap! - System.arraycopy(vars, i+1, vars, i, nremaining); + } - System.arraycopy(vals, i+1, vals, i, nremaining); - - } else { - - // Just clear the reference. - - vars[i] = null; - - vals[i] = null; - - } - - // clear the hash code. - hash = 0; - - nbound--; - - break; - - } - - } - - } - public IConstant get(final IVariable var) { - if (var == null) - throw new IllegalArgumentException(); - - for (int i = 0; i < nbound; i++) { - - if (vars[i] == var) { + return current().get(var); - return vals[i]; - - } - - } - - return null; - } public boolean isBound(final IVariable var) { @@ -308,122 +606,40 @@ public void set(final IVariable var, final IConstant val) { - if (var == null) - throw new IllegalArgumentException(); + current().set(var, val); - if (val == null) - throw new IllegalArgumentException(); - - if (log.isTraceEnabled()) { - - log.trace("var=" + var + ", val=" + val + ", nbound=" + nbound - + ", capacity=" + vars.length); - - } - - for (int i = 0; i < nbound; i++) { - - if (vars[i] == var) { - - vals[i] = val; - - // clear the hash code. - hash = 0; - - return; - - } - - } - - vars[nbound] = var; - - vals[nbound] = val; - // clear the hash code. hash = 0; - nbound++; - } public String toString() { - final StringBuilder sb = new StringBuilder(); - - sb.append("{"); + final ST cur = current(); + + final StringBuilder sb = new StringBuilder(); - for(int i=0; i<nbound; i++) { - - if(i>0) sb.append(", "); - - sb.append(vars[i]); - - sb.append("="); - - sb.append(vals[i]); - - } - - sb.append("}"); - - return sb.toString(); - - } - - public ArrayBindingSet clone() { + sb.append("{"); - return new ArrayBindingSet(this); - - } + for (int i = 0; i < cur.nbound; i++) { - /** - * Return a shallow copy of the binding set, eliminating unecessary - * variables. - */ - public ArrayBindingSet copy(final IVariable[] variablesToKeep) { + if (i > 0) + sb.append(", "); - // bitflag for the old binding set - final boolean[] keep = new boolean[nbound]; - - // for each var in the old binding set, see if we need to keep it - for (int i = 0; i < nbound; i++) { - - final IVariable v = vars[i]; + sb.append(cur.vars[i]); - keep[i] = false; - for (IVariable k : variablesToKeep) { - if (v == k) { - keep[i] = true; - break; - } - } - + sb.append("="); + + sb.append(cur.vals[i]); + } - // allocate the new vars - final IVariable[] newVars = new IVariable[vars.length]; + sb.append("}"); - // allocate the new vals - final IConstant[] newVals = new IConstant[vals.length]; + return sb.toString(); - // fill in the new binding set based on the keep bitflag - int newbound = 0; - for (int i = 0; i < nbound; i++) { - if (keep[i]) { - newVars[newbound] = vars[i]; - newVals[newbound] = vals[i]; - newbound++; - } - } - - ArrayBindingSet bs = new ArrayBindingSet(newVars, newVals); - bs.nbound = newbound; - - return bs; - } - + public boolean equals(final Object t) { if (this == t) @@ -433,14 +649,16 @@ return false; final IBindingSet o = (IBindingSet)t; - - if (nbound != o.size()) + + final ST cur = current(); + + if (cur.nbound != o.size()) return false; - for(int i=0; i<nbound; i++) { + for(int i=0; i<cur.nbound; i++) { - IConstant<?> o_val = o.get ( vars [ i ] ) ; - if ( null == o_val || !vals[i].equals( o_val )) + final IConstant<?> o_val = o.get ( cur.vars [ i ] ) ; + if ( null == o_val || !cur.vals[i].equals( o_val )) return false; } @@ -455,12 +673,14 @@ int result = 0; - for (int i = 0; i < nbound; i++) { + final ST cur = current(); + + for (int i = 0; i < cur.nbound; i++) { - if (vals[i] == null) + if (cur.vals[i] == null) continue; - result ^= vals[i].hashCode(); + result ^= cur.vals[i].hashCode(); } @@ -471,5 +691,5 @@ } private int hash; - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/EmptyBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/EmptyBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/EmptyBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -44,6 +44,8 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * @todo test suite? */ final public class EmptyBindingSet implements IBindingSet, Serializable { @@ -158,5 +160,13 @@ return EmptyIterator.DEFAULT; } + + public void push() { + throw new IllegalStateException(); + } + public void pop(boolean save) { + throw new UnsupportedOperationException(); + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/HashBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/HashBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/HashBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -29,82 +29,242 @@ package com.bigdata.bop.bindingSet; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.Map; +import java.util.Stack; import java.util.Map.Entry; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; -import com.bigdata.bop.Var; /** - * {@link IBindingSet} backed by a {@link HashMap}. + * {@link IBindingSet} backed by a {@link LinkedHashMap}. + * <p> + * Note: A {@link LinkedHashMap} provides a fast iterator, which we use a bunch. + * However, {@link IBindingSet}s are inherently unordered collections of + * bindings so the order preservation aspect of the {@link LinkedHashMap} is not + * relied upon. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo Since {@link Var}s allow reference testing, a faster implementation - * could be written based on a {@link LinkedList}. Just scan the list - * until the entry is found with the desired {@link Var} reference and - * then return it. */ public class HashBindingSet implements IBindingSet { private static final long serialVersionUID = -2989802566387532422L; - /** - * Note: A {@link LinkedHashMap} provides a fast iterator, which we use a - * bunch. - */ - private LinkedHashMap<IVariable, IConstant> map; +// /** +// * Note: A {@link LinkedHashMap} provides a fast iterator, which we use a +// * bunch. +// */ +// private final LinkedHashMap<IVariable, IConstant> map; + /** + * The stack of symbol tables. Each symbol table is a mapping from an + * {@link IVariable} onto its non-<code>null</code> bound {@link IConstant}. + * The stack is initialized with an empty symbol table. Symbol tables may be + * pushed onto the stack or popped off of the stack, but the stack MAY NOT + * become empty. + */ + private final Stack<LinkedHashMap<IVariable, IConstant>> stack; + + /** + * Return the symbol table on the top of the stack. + */ + private LinkedHashMap<IVariable, IConstant> current() { + + return stack.peek(); + + } + + public void push() { + + // The current symbol table. + final LinkedHashMap<IVariable, IConstant> cur = current(); + + // Create a new symbol table. + final LinkedHashMap<IVariable, IConstant> tmp = new LinkedHashMap<IVariable, IConstant>( + cur.size()); + + // Push the new symbol table onto the stack. + stack.push(tmp); + + /* + * Make a copy of each entry in the symbol table which was on the top of + * the stack when we entered this method, inserting the entries into the + * new symbol table as we go. This avoids side effects of mutation on + * the nested symbol tables and also ensures that we do not need to read + * through to the nested symbol tables when answering a query about the + * current symbol table. The only down side of this is that naive + * serialization is that much less compact. + */ + for (Map.Entry<IVariable, IConstant> e : cur.entrySet()) { + + tmp.put(e.getKey(), e.getValue()); + + } + + } + + public void pop(final boolean save) { + + if (stack.size() < 2) { + /* + * The stack may never become empty. Therefore there must be at + * least two symbol tables on the stack for a pop() request. + */ + throw new IllegalArgumentException(); + } + + // Pop the symbol table off of the top of the stack. + final LinkedHashMap<IVariable,IConstant> old = stack.pop(); + + if (save) { + + // discard the current symbol table. + stack.pop(); + + // replacing it with the symbol table which we popped off the stack. + stack.push(old); + + } else { + + // clear the hash code. + hash = 0; + + } + + } + /** * New empty binding set. */ public HashBindingSet() { + + stack = new Stack<LinkedHashMap<IVariable, IConstant>>(); + + stack.push(new LinkedHashMap<IVariable, IConstant>()); - map = new LinkedHashMap<IVariable, IConstant>(); - } /** - * Copy constructor. + * Copy constructor (used by clone, copy). * * @param src */ - protected HashBindingSet(final HashBindingSet src) { + protected HashBindingSet(final HashBindingSet src, final IVariable[] variablesToKeep) { - map = new LinkedHashMap<IVariable, IConstant>(src.map); - - } + stack = new Stack<LinkedHashMap<IVariable,IConstant>>(); + final int stackSize = src.stack.size(); + + int depth = 1; + + for (LinkedHashMap<IVariable, IConstant> srcLst : src.stack) { + + /* + * Copy the source bindings. + * + * Note: If a restriction exists on the variables to be copied, then + * it is applied onto the the top level of the stack. If the symbol + * table is saved when it is pop()'d, then the modified bindings + * will replace the parent symbol table on the stack. + */ + final LinkedHashMap<IVariable,IConstant> tmp = copy(srcLst, + depth == stackSize ? variablesToKeep : null); + + // Push onto the stack. + stack.push(tmp); + + } + + } + + /** + * Return a copy of the source list. + * + * @param src + * The source list. + * @param variablesToKeep + * When non-<code>null</code>, only the bindings for the + * variables listed in this array will copied. + * + * @return The copy. + */ + private LinkedHashMap<IVariable, IConstant> copy( + final LinkedHashMap<IVariable, IConstant> src, + final IVariable[] variablesToKeep) { + + final LinkedHashMap<IVariable, IConstant> dst = new LinkedHashMap<IVariable, IConstant>( + variablesToKeep != null ? variablesToKeep.length : src.size()); + + final Iterator<Map.Entry<IVariable, IConstant>> itr = src.entrySet() + .iterator(); + + while (itr.hasNext()) { + + final Map.Entry<IVariable, IConstant> e = itr.next(); + + boolean keep = true; + + if (variablesToKeep != null) { + + keep = false; + + for (IVariable<?> x : variablesToKeep) { + + if (x == e.getKey()) { + + keep = true; + + break; + + } + + } + + } + + if (keep) + dst.put(e.getKey(), e.getValue()); + + } + + return dst; + + } + /** - * Copy constructor. + * Package private constructor used by the unit tests. * * @param src */ - public HashBindingSet(final IBindingSet src) { + HashBindingSet(final IBindingSet src) { - map = new LinkedHashMap<IVariable, IConstant>(src.size()); - + this(); + final Iterator<Map.Entry<IVariable, IConstant>> itr = src.iterator(); while (itr.hasNext()) { final Map.Entry<IVariable, IConstant> e = itr.next(); - map.put(e.getKey(), e.getValue()); + set(e.getKey(), e.getValue()); } } - public HashBindingSet(final IVariable[] vars, final IConstant[] vals) { + /** + * Package private constructor used by the unit tests. + * @param vars + * @param vals + */ + HashBindingSet(final IVariable[] vars, final IConstant[] vals) { + this(); + if (vars == null) throw new IllegalArgumentException(); @@ -114,22 +274,32 @@ if (vars.length != vals.length) throw new IllegalArgumentException(); - map = new LinkedHashMap<IVariable, IConstant>(vars.length); - for (int i = 0; i < vars.length; i++) { - map.put(vars[i], vals[i]); + set(vars[i], vals[i]); } } + public HashBindingSet clone() { + + return new HashBindingSet(this, null /* variablesToKeep */); + + } + + public HashBindingSet copy(final IVariable[] variablesToKeep) { + + return new HashBindingSet(this/* src */, variablesToKeep); + + } + public boolean isBound(final IVariable var) { if (var == null) throw new IllegalArgumentException(); - return map.containsKey(var); + return current().containsKey(var); } @@ -138,7 +308,7 @@ if (var == null) throw new IllegalArgumentException(); - return map.get(var); + return current().get(var); } @@ -150,7 +320,7 @@ if (val == null) throw new IllegalArgumentException(); - map.put(var,val); + current().put(var,val); // clear the hash code. hash = 0; @@ -162,7 +332,7 @@ if (var == null) throw new IllegalArgumentException(); - map.remove(var); + current().remove(var); // clear the hash code. hash = 0; @@ -171,7 +341,7 @@ public void clearAll() { - map.clear(); + current().clear(); // clear the hash code. hash = 0; @@ -186,7 +356,7 @@ int i = 0; - final Iterator<Map.Entry<IVariable, IConstant>> itr = map.entrySet() + final Iterator<Map.Entry<IVariable, IConstant>> itr = current().entrySet() .iterator(); while (itr.hasNext()) { @@ -217,52 +387,22 @@ */ public Iterator<Entry<IVariable, IConstant>> iterator() { - return Collections.unmodifiableMap(map).entrySet().iterator(); + return Collections.unmodifiableMap(current()).entrySet().iterator(); } public Iterator<IVariable> vars() { - return Collections.unmodifiableSet(map.keySet()).iterator(); + return Collections.unmodifiableSet(current().keySet()).iterator(); } public int size() { - return map.size(); + return current().size(); } - public HashBindingSet clone() { - - return new HashBindingSet( this ); - - } - - /** - * Return a shallow copy of the binding set, eliminating unecessary - * variables. - */ - public HashBindingSet copy(final IVariable[] variablesToKeep) { - - final HashBindingSet bs = new HashBindingSet(); - - for (IVariable<?> var : variablesToKeep) { - - final IConstant<?> val = map.get(var); - - if (val != null) { - - bs.map.put(var, val); - - } - - } - - return bs; - - } - public boolean equals(final Object t) { if (this == t) @@ -276,7 +416,7 @@ if (size() != o.size()) return false; - final Iterator<Map.Entry<IVariable,IConstant>> itr = map.entrySet().iterator(); + final Iterator<Map.Entry<IVariable,IConstant>> itr = current().entrySet().iterator(); while(itr.hasNext()) { @@ -288,7 +428,7 @@ // if (!o.isBound(vars[i])) // return false; - IConstant<?> o_val = o.get ( var ) ; + final IConstant<?> o_val = o.get ( var ) ; if (null == o_val || !val.equals(o_val)) return false; @@ -304,7 +444,7 @@ int result = 0; - for(IConstant<?> c : map.values()) { + for(IConstant<?> c : current().values()) { if (c == null) continue; Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ListBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ListBindingSet.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bindingSet/ListBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -0,0 +1,527 @@ +package com.bigdata.bop.bindingSet; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.Var; + +import cutthecrap.utils.striterators.Resolver; +import cutthecrap.utils.striterators.Striterator; + +/** + * <p>An {@link IBindingSet} based on a {@link LinkedList}. Since {@link Var}s may + * be compared using <code>==</code> this should be faster than a hash map for + * most operations unless the binding set has a large number of entries. + * </p><p> + * Note: {@link #push()} and {@link #pop(boolean)} are implemented by making a + * copy of the current symbol table with distinct {@link Map.Entry} objects. If + * the symbol table is saved when it is {@link #pop(boolean) popped), then it + * simply replaces the pre-existing symbol table which was uncovered when it + * was popped off of the stack. This design has several advantages, including: + * <ul> + * <li>Methods such as {@link #get(IVariable)}, {@link #set(IVariable, IConstant)}, + * and {@link #size()} can be written solely in terms of the current symbol table.</li> + * <li>{@link #clear(IVariable)} removes the {@link Map.Entry} from the + * current symbol table rather than introducing <code>null</code> values or + * delete markers.</li> + * </ul> + * </p> + * The only down side to this approach is that the serialized representation of + * the {@link IBindingSet} is more complex. However, java default serialization + * will do a good job by providing back references for the object graph. + * + * @version $Id: HashBindingSet.java 3836 2010-10-22 11:59:15Z thompsonbry $ + */ +public class ListBindingSet implements IBindingSet { + + private static final long serialVersionUID = 1L; + + /** + * A (var,val) entry. + */ + private static class E implements Map.Entry<IVariable<?>, IConstant<?>>, Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final IVariable<?> var; + + private IConstant<?> val; + + E(final IVariable<?> var, final IConstant<?> val) { + this.var = var; + this.val = val; + } + + public IVariable<?> getKey() { + return var; + } + + public IConstant<?> getValue() { + return val; + } + + public IConstant<?> setValue(final IConstant<?> value) { + if (value == null) { + // Null bindings are not permitted. + throw new IllegalArgumentException(); + } + final IConstant<?> tmp = this.val; + this.val = value; + return tmp; + } + }; + + /** + * The stack of symbol tables. Each symbol table is a mapping from an + * {@link IVariable} onto its non-<code>null</code> bound {@link IConstant}. + * The stack is initialized with an empty symbol table. Symbol tables may be + * pushed onto the stack or popped off of the stack, but the stack MAY NOT + * become empty. + */ + private final Stack<List<E>> stack; + + /** + * Return the symbol table on the top of the stack. + */ + private List<E> current() { + + return stack.peek(); + + } + + public void push() { + + // The current symbol table. + final List<E> cur = current(); + + // Create a new symbol table. + final List<E> tmp = new LinkedList<E>(); + + // Push the new symbol table onto the stack. + stack.push(tmp); + + /* + * Make a copy of each entry in the symbol table which was on the top of + * the stack when we entered this method, inserting the entries into the + * new symbol table as we go. This avoids side effects of mutation on + * the nested symbol tables and also ensures that we do not need to read + * through to the nested symbol tables when answering a query about the + * current symbol table. The only down side of this is that naive + * serialization is that much less compact. + */ + for (E e : cur) { + + tmp.add(new E(e.var, e.val)); + + } + + } + + public void pop(final boolean save) { + + if (stack.size() < 2) { + /* + * The stack may never become empty. Therefore there must be at + * least two symbol tables on the stack for a pop() request. + */ + throw new IllegalArgumentException(); + } + + // Pop the symbol table off of the top of the stack. + final List<E> old = stack.pop(); + + if (save) { + + // discard the current symbol table. + stack.pop(); + + // replacing it with the symbol table which we popped off the stack. + stack.push(old); + + } else { + + // clear the hash code. + hash = 0; + + } + + } + + /** + * Create an empty binding set. + */ + public ListBindingSet() { + + stack = new Stack<List<E>>(); + + stack.push(new LinkedList<E>()); + + } + + /** + * Package private constructor used by the unit tests. + * @param vars + * @param vals + */ + ListBindingSet(final IVariable[] vars, final IConstant[] vals) { + + this(); + + if (vars == null) + throw new IllegalArgumentException(); + + if (vals == null) + throw new IllegalArgumentException(); + + if (vars.length != vals.length) + throw new IllegalArgumentException(); + + for (int i = 0; i < vars.length; i++) { + + set(vars[i], vals[i]); + + } + + } + + /** + * Copy constructor (used by clone, copy). + * + * @param src + * The source to be copied. + * @param variablesToKeep + * The variables to be retained for the symbol table on the top + * of the stack (optional). + */ + protected ListBindingSet(final ListBindingSet src, + final IVariable[] variablesToKeep) { + + stack = new Stack<List<E>>(); + + final int stackSize = src.stack.size(); + + int depth = 1; + + for (List<E> srcLst : src.stack) { + + /* + * Copy the source bindings. + * + * Note: If a restriction exists on the variables to be copied, then + * it is applied onto the the top level of the stack. If the symbol + * table is saved when it is pop()'d, then the modified bindings + * will replace the parent symbol table on the stack. + */ + final List<E> tmp = copy(srcLst, + depth == stackSize ? variablesToKeep : null); + + // Push onto the stack. + stack.push(tmp); + + } + + } + + /** + * Return a copy of the source list. The copy will use new {@link E}s to + * represent the bindings so changes to the copy will not effect the source. + * + * @param src + * The source list. + * @param variablesToKeep + * When non-<code>null</code>, only the bindings for the + * variables listed in this array will copied. + * + * @return The copy. + */ + private List<E> copy(final List<E> src, final IVariable[] variablesToKeep) { + + final List<E> dst = new LinkedList<E>(); + + final Iterator<E> itr = src.iterator(); + + while (itr.hasNext()) { + + final E e = itr.next(); + + boolean keep = true; + + if (variablesToKeep != null) { + + keep = false; + + for (IVariable<?> x : variablesToKeep) { + + if (x == e.var) { + + keep = true; + + break; + + } + + } + + } + + if (keep) + dst.add(new E(e.var, e.val)); + + } + + return dst; + + } + + public ListBindingSet clone() { + + return new ListBindingSet(this, null /* variablesToKeep */); + + } + + public IBindingSet copy(final IVariable[] variablesToKeep) { + + return new ListBindingSet(this/*src*/, variablesToKeep); + + } + + public void clear(final IVariable var) { + + if (var == null) + throw new IllegalArgumentException(); + + final List<E> cur = current(); + + for(E e : cur) { + + if(e.var == var) { + + cur.remove(e); + + // clear the hash code. + hash = 0; + + return; + + } + + } + + } + + public void clearAll() { + + current().clear(); + + // clear the hash code. + hash = 0; + + } + + public IConstant get(final IVariable var) { + + if (var == null) + throw new IllegalArgumentException(); + + final List<E> cur = current(); + + for(E e : cur) { + + if(e.var == var) { + + return e.val; + + } + + } + + return null; + + } + + public boolean isBound(IVariable var) { + + if (var == null) + throw new IllegalArgumentException(); + + final List<E> cur = current(); + + for(E e : cur) { + + if(e.var == var) { + + return true; + + } + + } + + return false; + + } + + @SuppressWarnings("unchecked") + public Iterator<Map.Entry<IVariable, IConstant>> iterator() { + + return (Iterator<Map.Entry<IVariable, IConstant>>) ((List) Collections + .unmodifiableList(current())).iterator(); + + } + + public void set(final IVariable var, final IConstant val) { + + if (var == null) + throw new IllegalArgumentException(); + + if (val == null) + throw new IllegalArgumentException(); + + final List<E> cur = current(); + + for (E e : cur) { + + if (e.var == var) { + + e.val = val; + + // clear the hash code. + hash = 0; + + return; + + } + + } + + cur.add(new E(var, val)); + + // clear the hash code. + hash = 0; + + } + + public int size() { + + return current().size(); + + } + + @SuppressWarnings("unchecked") + public Iterator<IVariable> vars() { + return (Iterator<IVariable>) new Striterator(Collections + .unmodifiableList(current()).iterator()) + .addFilter(new Resolver() { + private static final long serialVersionUID = 1L; + + @Override + protected Object resolve(Object obj) { + return ((E) obj).var; + } + }); + } + + public String toString() { + + final StringBuilder sb = new StringBuilder(); + + sb.append("{ "); + + int i = 0; + + final Iterator<E> itr = current().iterator(); + + while (itr.hasNext()) { + + if (i > 0) + sb.append(", "); + + final E entry = itr.next(); + + sb.append(entry.getKey()); + + sb.append("="); + + sb.append(entry.getValue()); + + i++; + + } + + sb.append(" }"); + + return sb.toString(); + + } + + public boolean equals(final Object t) { + + if (this == t) + return true; + + if(!(t instanceof IBindingSet)) + return false; + + final IBindingSet o = (IBindingSet) t; + + if (size() != o.size()) + return false; + + final Iterator<E> itr = current().iterator(); + + while(itr.hasNext()) { + + final E entry = itr.next(); + + final IVariable<?> var = entry.getKey(); + + final IConstant<?> val = entry.getValue(); + +// if (!o.isBound(vars[i])) +// return false; + final IConstant<?> o_val = o.get ( var ) ; + if (null == o_val || !val.equals(o_val)) + return false; + + } + + return true; + + } + + public int hashCode() { + + if (hash == 0) { + + int result = 0; + + final List<E> cur = current(); + + for(E e : cur) { + + if (e.val == null) + continue; + + result ^= e.val.hashCode(); + + } + + hash = result; + + } + return hash; + + } + private int hash; + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -250,8 +250,16 @@ // System.err.println("accepted: " // + Arrays.toString(vals)); - accepted.add(new HashBindingSet(vars, vals)); + final HashBindingSet tmp = new HashBindingSet(); + + for (int i = 0; i < vars.length; i++) { + tmp.set(vars[i], vals[i]); + + } + + accepted.add(tmp); + naccepted++; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestAll.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestAll.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -60,9 +60,12 @@ final TestSuite suite = new TestSuite("binding sets"); + // @todo test EmptyBindingSet + // test binding set impls. suite.addTestSuite(TestArrayBindingSet.class); suite.addTestSuite(TestHashBindingSet.class); + suite.addTestSuite(TestListBindingSet.class); return suite; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestArrayBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestArrayBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestArrayBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -59,24 +59,24 @@ */ public TestArrayBindingSet ( String name ) { super ( name ) ; } - /** - * Unit test for {@link ArrayBindingSet#ArrayBindingSet(ArrayBindingSet)} - */ - public void testConstructorArrayBindingSet () - { - try { assertTrue ( null != new ArrayBindingSet ( null ) ) ; fail ( "IllegalArgumentException expected, copy from was null" ) ; } - catch ( IllegalArgumentException e ) {} +// /** +// * Unit test for {@link ArrayBindingSet#ArrayBindingSet(ArrayBindingSet)} +// */ +// public void testConstructorArrayBindingSet () +// { +// try { assertTrue ( null != new ArrayBindingSet ( null ) ) ; fail ( "IllegalArgumentException expected, copy from was null" ) ; } +// catch ( IllegalArgumentException e ) {} +// +// Var<?> var1 = Var.var ( "a" ) ; +// Var<?> var2 = Var.var ( "b" ) ; +// Constant<Integer> val1 = new Constant<Integer> ( 1 ) ; +// Constant<Integer> val2 = new Constant<Integer> ( 2 ) ; +// IVariable<?> vars [] = new IVariable [] { var1, var2 } ; +// IConstant<?> vals [] = new IConstant [] { val1, val2 } ; +// +// assertEqual ( new ArrayBindingSet ( new ArrayBindingSet ( vars, vals ) ), vars, vals ) ; +// } - Var<?> var1 = Var.var ( "a" ) ; - Var<?> var2 = Var.var ( "b" ) ; - Constant<Integer> val1 = new Constant<Integer> ( 1 ) ; - Constant<Integer> val2 = new Constant<Integer> ( 2 ) ; - IVariable<?> vars [] = new IVariable [] { var1, var2 } ; - IConstant<?> vals [] = new IConstant [] { val1, val2 } ; - - assertEqual ( new ArrayBindingSet ( new ArrayBindingSet ( vars, vals ) ), vars, vals ) ; - } - /** * Unit test for {@link ArrayBindingSet#ArrayBindingSet(IVariable[],IConstant[])} */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestIBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestIBindingSet.java 2010-12-11 16:04:20 UTC (rev 4003) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bindingSet/TestIBindingSet.java 2010-12-12 22:22:01 UTC (rev 4004) @@ -31,23 +31,29 @@ import java.util.Iterator; import java.util.Map; +import junit.framework.TestCase2; + import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; import com.bigdata.bop.Var; +import com.bigdata.io.SerializerUtil; -import junit.framework.TestCase2; - /** * Unit tests for {@link IBindingSet}. - * + * <p> * Note: - * a) these tests assume that the values held for a given key are not cloned, - * i.e. comparison is done by '==' and not '.equals' - * b) keys with the same 'name' are a unique object. + * <ul> + * <li>a) these tests assume that the values held for a given key are not + * cloned, i.e. comparison is done by '==' and not '.equals' (this is true + * except for the Serializatoin tests, where the {@link Var} references will be + * preserved but the {@link IConstant}s will be distinct).</li> + * <li>b) keys with the same 'name' are a unique object.</li> + * </ul> * * @author <a href="mailto:dm...@us...">David MacMillan</a> + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public abstract class TestIBindingSet extends TestCase2 { @@ -259,7 +265,13 @@ IBindingSet bs = newBindingSet ( new IVariable [] { var1, var2, var3, var4, var5 } , new IConstant [] { val1, val2, val3, val4, val5 } ) ; - + + assertEqual( + bs.copy(null/* variablesToKeep */), // + new IVariable[] { var1, var2, var3, var4, var5 }, + new IConstant[] { val1, val2, val3, val4, val5 }// + ); + IBindingSet bs2 = bs.copy ( new IVariable [] { var1, var3, var5 } ) ; assertTrue ( 3 == bs2.size () ) ; @@ -321,13 +333,141 @@ assertTrue ( "expected equal: same bindings after mutation", bs1.hashCode () == bs4.hashCode () ) ; } + /* + * push()/pop() tests. + * + * Note: In addition to testing push() and pop(save:boolean), we have to + * test that copy() and clone() operate correctly in the presence of nested + * symbol tables, and that the visitation patterns for the bindings operate + * correctly when there are nested symbol tables. For example, if there "y" + * is bound at level zero, a push() is executed, and then "x" is bound at + * level one. The visitation pattern must visit both "x" and "y". + */ + + public void test_nestedSymbolTables() { + + final Var<?> var1 = Var.var ( "a" ) ; + final Var<?> var2 = Var.var ( "b" ) ; + final Constant<Integer> val1 = new Constant<Integer> ( 1 ) ; + final Constant<Integer> val2 = new Constant<Integer> ( 2 ) ; + + final IBindingSet bs1 = newBindingSet(2/* size */); + + bs1.set(var1,val1); + + /* + * push a symbol table onto the stack + */ + bs1.push(); + + bs1.set(var2, val2); + + bs1.pop(false/* save */); + + // verify the modified bindings were discarded. + assertEqual(bs1, new IVariable[] { var1 }, new IConstant[] { val1 }); + + /* + * push a symbol table onto the stack + */ + bs1.push(); + + bs1.set(var2, val2); + + bs1.pop(true/* save */); + + // verify the modified bindings were saved. + assertEqual(bs1, new IVariable[] { var1, var2 }, new IConstant[] { + val1, val2 }); + } + + public void test_serialization() { + + final Var<?> var1 = Var.var ( "a" ) ; + final Var<?> var2 = Var.var ( "b" ) ; + final Constant<Integer> val1 = new Constant<Integer> ( 1 ) ; + final Constant<Integer> val2 = new Constant<Integer> ( 2 ) ; + + final IBindingSet bs1 = newBindingSet(2/* size */); + + bs1.set(var1, val1); + + bs1.set(var2, val2); + + assertEqual(bs1, new IVariable[] { var1, var2 }, new IConstant[] { + val1, val2 }); + + final IBindingSet bs2 = (IBindingSet) SerializerUtil + .deserialize(SerializerUtil.serialize(bs1)); + + assertEquals(bs1, bs1); + + } + + /* + * Hooks for testing specific implementations. + */ + protected abstract IBindingSet newBindingSet ( IVariable<?> vars [], IConstant<?> vals [] ) ; protected abstract IBindingSet newBindingSet ( int size ) ; + /** + * Compare actual and expected, where the latter is expressed using + * (vars,vals). + * <p> + * Note: This does not follow the junit pattern for asserts, which puts the + * expected data first. + * + * @param actual + * @param vars + * @param vals + */ protected void assertEqual ( IBindingSet actual, IVariable<?> vars [], IConstant<?> vals [] ) { assertTrue ( "wrong size", actual.size () == vars.length ) ; for ( int i = 0; i < vars.length; i++ ) assertTrue ( "wrong value", vals [ i ] == actual.get ( vars [ i ] ) ) ; } -} \ No newline at end of file + + protected void assertEquals(IBindingSet expected, IBindingSet actual) { + + // expected variables in some order. + final Iterator<IVariable> evars = expected.vars(); + + // actual variables in some order (the order MAY be different). + final Iterator<IVariable> avars = actual.vars(); + + while(evars.hasNext()) { + + // Some variable for which we expect a binding. + f... [truncated message content] |
From: <tho...@us...> - 2010-12-11 16:04:26
|
Revision: 4003 http://bigdata.svn.sourceforge.net/bigdata/?rev=4003&view=rev Author: thompsonbry Date: 2010-12-11 16:04:20 +0000 (Sat, 11 Dec 2010) Log Message: ----------- Restored the previous logic which correctly handled cases where there were open transactions but lacks the optimization to immediately recycle data using the session mechanism when there are no open transactions and the history retention period is zero. Added Paths: ----------- branches/JOURNAL_HA_BRANCH/bigdata/src/architecture/RWStore.xls Added: branches/JOURNAL_HA_BRANCH/bigdata/src/architecture/RWStore.xls =================================================================== (Binary files differ) Property changes on: branches/JOURNAL_HA_BRANCH/bigdata/src/architecture/RWStore.xls ___________________________________________________________________ Added: svn:mime-type + application/octet-stream This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-12-11 14:11:56
|
Revision: 4002 http://bigdata.svn.sourceforge.net/bigdata/?rev=4002&view=rev Author: thompsonbry Date: 2010-12-11 14:11:49 +0000 (Sat, 11 Dec 2010) Log Message: ----------- Commented out System.out println. Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-09 22:58:55 UTC (rev 4001) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2010-12-11 14:11:49 UTC (rev 4002) @@ -435,7 +435,7 @@ while ((nints * intAllocation) % modAllocation != 0) nints++; - System.out.println("calcBitSize for " + alloc + " returns " + nints); +// System.out.println("calcBitSize for " + alloc + " returns " + nints); return nints; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-12-09 22:59:02
|
Revision: 4001 http://bigdata.svn.sourceforge.net/bigdata/?rev=4001&view=rev Author: btmurphy Date: 2010-12-09 22:58:55 +0000 (Thu, 09 Dec 2010) Log Message: ----------- [branch dev-btm]: CHECKPOINT - changes & fixes to allow com.bigdata.jini tests to use the smart proxy based zookeeper; in particular, changes to the zookeeper shutdown code in JiniServicesHelper Modified Paths: -------------- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/executor/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -457,7 +457,7 @@ Boolean.FALSE); this.sdm = new ServiceDiscoveryManager(ldm, null, config); - if (zookeeperAccessor == null) { + if ( (zookeeperAccessor == null) || !(zookeeperAccessor.isOpen()) ) { setZookeeperConfigInfo(config, this.sdm); zookeeperAccessor = new ZooKeeperAccessor Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/loadbalancer/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -350,7 +350,13 @@ } //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/process/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -346,7 +346,13 @@ this.scheduledExecutor = Executors.newScheduledThreadPool(1); // Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/quorum/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -289,7 +289,13 @@ serviceId = configStateInfo.getServiceId(); //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -57,6 +57,16 @@ import java.util.ArrayList; import java.util.UUID; +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN +import com.bigdata.service.QuorumPeerService; +import com.bigdata.util.config.NicUtil; +import com.sun.jini.admin.DestroyAdmin; +import net.jini.admin.Administrable; +import net.jini.core.lookup.ServiceItem; +import net.jini.core.lookup.ServiceTemplate; +import net.jini.lookup.ServiceDiscoveryManager; +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END + /** * A helper class that starts all the necessary services for a Jini federation. * This is used when testing, but NOT for benchmarking performance. For @@ -509,8 +519,12 @@ clientPort = getPort(2181/* suggestedPort */); final int peerPort = getPort(2888/* suggestedPort */); final int leaderPort = getPort(3888/* suggestedPort */); - final String servers = "1=localhost:" + peerPort + ":" - + leaderPort; +//BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN +//BTM - PRE_ZOOKEEPER_SMART_PROXY final String servers = "1=localhost:" + peerPort + ":" +//BTM - PRE_ZOOKEEPER_SMART_PROXY + leaderPort; + String hostname = NicUtil.getIpAddress("default.nic", "default", true); + final String servers = "1="+hostname+":" + peerPort + ":" + leaderPort; +//BTM - PRE_ZOOKEEPER_SMART_PROXY - END //BTM - FOR_CLIENT_SERVICE - BEGIN //BTM - FOR_CLIENT_SERVICE options = new String[] { @@ -553,8 +567,7 @@ .getInstance(concat(args, options)); // start zookeeper (a server instance). -//BTM log.warn("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER\n"); +System.out.println("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER: args[0] = "+args[0]+"\n"); //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY final int nstarted = ZookeeperProcessHelper.startZookeeper( //BTM - PRE_ZOOKEEPER_SMART_PROXY config, serviceListener); @@ -563,8 +576,7 @@ (com.bigdata.quorum.ServiceImpl.class, //BTM - was QuorumPeerMain.class config, serviceListener); //BTM - PRE_ZOOKEEPER_SMART_PROXY - END -//BTM log.warn("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); +System.out.println("\n---------------- JiniServicesHelper.innerStart >>> START ZOOKEEPER - DONE\n"); if (nstarted != 1) { @@ -995,22 +1007,92 @@ //BTM log.warn("\n---------------- JiniServicesHelper.destroy BEGIN DESTROY ----------------\n"); //BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy BEGIN DESTROY ----------------\n"); - ZooKeeper zookeeper = null; - - ZookeeperClientConfig zooConfig = null; - +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN --------------------------------------------------- +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZooKeeper zookeeper = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZookeeperClientConfig zooConfig = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY if (client != null && client.isConnected()) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY zooConfig = client.getFederation().getZooConfig(); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY zookeeper = client.getFederation().getZookeeper(); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY client.disconnect(true/* immediateShutdown */); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY client = null; +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY + ServiceItem[] items = null; if (client != null && client.isConnected()) { - zooConfig = client.getFederation().getZooConfig(); - - zookeeper = client.getFederation().getZookeeper(); - + // 1. Clear out everything in zookeeper + ZookeeperClientConfig zooConfig = client.getFederation().getZooConfig(); + ZooKeeper zookeeper = client.getFederation().getZookeeper(); + try { +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE"); + zookeeper.delete(zooConfig.zroot, -1); // version +System.out.println("---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); + } catch (Exception e) { + log.warn("zroot=" + zooConfig.zroot + " : "+ e.getLocalizedMessage(), e); +System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); + } + + //2. For graceful shutdown of QuorumPeerService + ServiceDiscoveryManager sdm = + client.getFederation().getServiceDiscoveryManager(); + Class[] quorumServiceType = + new Class[] {QuorumPeerService.class}; + ServiceTemplate quorumServiceTmpl = + new ServiceTemplate(null, quorumServiceType, null); + items = sdm.lookup(quorumServiceTmpl, Integer.MAX_VALUE, null); + // Graceful shutdown of QuorumPeerService + if (items != null) { + for (int i=0; i<items.length; i++) { + QuorumPeerService zk = (QuorumPeerService)(items[i].service); + try { + Object admin = ((Administrable)zk).getAdmin(); +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN QuorumPeerService DESTROY"); + ((DestroyAdmin)admin).destroy(); +System.out.println("---------------- JiniServicesHelper.destroy END QuorumPeerService DESTROY\n"); + } catch(Exception e) { + log.warn("failure on zookeeper destroy ["+zk+"]", e); +System.out.println("\n---------------- JiniServicesHelper.destroy END QuorumPeerService DESTROY\n"); + } + } + } else {//items == null + try { +System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN org.apache.zookeeper.server.quorum.QuorumPeerMain KILL"); + ZooHelper.kill(clientPort); +System.out.println("---------------- JiniServicesHelper.destroy END org.apache.zookeeper.server.quorum.QuorumPeerMain KILL\n"); + } catch(Exception e) { + log.warn("failure on zookeeper kill " + +"[clientPort="+clientPort+"]", e); +System.out.println("\n---------------- JiniServicesHelper.destroy END org.apache.zookeeper.server.quorum.QuorumPeerMain KILL\n"); + } + } + + //3. Kill the process(es) in which zookeeper is running + for (ProcessHelper t : ((ServiceListener)serviceListener).running) { + if (t instanceof ZookeeperProcessHelper) { +System.out.println("\n*** KILLING ProcessHelper: "+t+"\n"); + try { + t.kill(true); + } catch(Exception e) { + log.error("exception during process ["+t+"]", e); + } + } + } + + //4. Disconnect client.disconnect(true/* immediateShutdown */); - client = null; - } +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END ----------------------------------------------------- + + //BTM if (metadataServer0 != null) { //BTM //BTM metadataServer0.destroy(); @@ -1109,43 +1191,48 @@ txnService0 = null; } - if (zookeeper != null && zooConfig != null) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN +//BTM - FOR_ZOOKEEPER_SMART_PROXY - Note: moved the shutdown/killing of zookeeper to beginning +//BTM - FOR_ZOOKEEPER_SMART_PROXY - of this method. Performing it here left the zookeeper +//BTM - FOR_ZOOKEEPER_SMART_PROXY - process still running; and when the new smart proxy +//BTM - FOR_ZOOKEEPER_SMART_PROXY - based QuorumPeerService is used instead of the +//BTM - FOR_ZOOKEEPER_SMART_PROXY - org.apache.zookeeper.server.quorum.QuorumPeerMain, +//BTM - FOR_ZOOKEEPER_SMART_PROXY - the process left running by the first test will +//BTM - FOR_ZOOKEEPER_SMART_PROXY - cause all subsequent tests to fail because of +//BTM - FOR_ZOOKEEPER_SMART_PROXY - BindException (port already in use). Destroying +//BTM - FOR_ZOOKEEPER_SMART_PROXY - the service and killing the process, as is done +//BTM - FOR_ZOOKEEPER_SMART_PROXY - above, addresses this issue. +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY if (zookeeper != null && zooConfig != null) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY try { +//BTM - FOR_ZOOKEEPER_SMART_PROXY // clear out everything in zookeeper for this federation. +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY zookeeper.delete(zooConfig.zroot, -1); // version +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } catch (Exception e) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY // ignore. +//BTM - FOR_ZOOKEEPER_SMART_PROXY log.warn("zroot=" + zooConfig.zroot + " : " +//BTM - FOR_ZOOKEEPER_SMART_PROXY + e.getLocalizedMessage(), e); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY try { +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY ZooHelper.kill(clientPort); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } catch (Throwable t) { +//BTM - FOR_ZOOKEEPER_SMART_PROXY +//BTM - FOR_ZOOKEEPER_SMART_PROXY log.error("Could not kill zookeeper: clientPort=" + clientPort +//BTM - FOR_ZOOKEEPER_SMART_PROXY + " : " + t, t); +//BTM - FOR_ZOOKEEPER_SMART_PROXY System.out.println("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); +//BTM - FOR_ZOOKEEPER_SMART_PROXY } +//BTM - FOR_ZOOKEEPER_SMART_PROXY - END - try { - - // clear out everything in zookeeper for this federation. -//BTM log.warn("\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy BEGIN ZOOKEEPER DELETE\n"); - zookeeper.delete(zooConfig.zroot, -1/* version */); -//BTM log.warn("\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy END ZOOKEEPER DELETE\n"); - - } catch (Exception e) { - - // ignore. - log.warn("zroot=" + zooConfig.zroot + " : " - + e.getLocalizedMessage(), e); - - } - - } - - try { - -//BTM log.warn("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+")\n"); - - ZooHelper.kill(clientPort); - -//BTM log.warn("\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); -//BTM com.bigdata.util.Util.printStr("TestBigdata.debug","\n---------------- JiniServicesHelper.destroy >>> ZooHelper.kill(clientPort="+clientPort+") - DONE\n"); - - } catch (Throwable t) { - - log.error("Could not kill zookeeper: clientPort=" + clientPort - + " : " + t, t); - } - if (zooDataDir != null && zooDataDir.exists()) { /* * Wait a bit and then try and delete the zookeeper directory. Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/transaction/ServiceImpl.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -349,7 +349,13 @@ } //Service export and proxy creation - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + ServerEndpoint endpoint = + TcpServerEndpoint.getInstance(exportIpAddr, 0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = new BasicJeriExporter(endpoint, ilFactory, Modified: branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java =================================================================== --- branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/java/com/bigdata/util/Util.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -47,6 +47,7 @@ import com.bigdata.service.proxy.ClientRunnableBuffer; import com.bigdata.util.config.ConfigDeployUtil; import com.bigdata.util.config.LogUtil; +import com.bigdata.util.config.NicUtil; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -541,7 +542,6 @@ throw new NullPointerException("null entryName"); } Exporter exporter = null; - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); InvocationLayerFactory ilFactory = new BasicILFactory(); Exporter defaultExporter = getExporter(defaultEnableDgc, defaultKeepAlive); @@ -561,8 +561,17 @@ boolean keepAlive) { - Exporter exporter = null; - ServerEndpoint endpoint = TcpServerEndpoint.getInstance(0); + ServerEndpoint endpoint = null; + try { + String exportIpAddr = + NicUtil.getIpAddress + ( "default.nic", + ConfigDeployUtil.getString("node.serviceNetwork"), + false ); + endpoint = TcpServerEndpoint.getInstance(exportIpAddr, 0); + } catch(Exception e) { + endpoint = TcpServerEndpoint.getInstance(0); + } InvocationLayerFactory ilFactory = new BasicILFactory(); return new BasicJeriExporter (endpoint, ilFactory, enableDgc, keepAlive); Modified: branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config =================================================================== --- branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/resources/config/bigdataStandaloneTesting.config 2010-12-09 22:58:55 UTC (rev 4001) @@ -152,7 +152,8 @@ * * Note: The default policy is completely open. */ - private static policy = "policy.all"; +//BTM private static policy = "policy.all"; + private static policy = ConfigMath.getAbsolutePath(new File("policy.all")); /** * Where jini is installed. Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/jini/start/AbstractFedZooTestCase.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -154,8 +154,7 @@ // if necessary, start zookeeper (a server instance). //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY ZookeeperProcessHelper.startZookeeper(config, listener); - ZookeeperProcessHelper.startZookeeper(com.bigdata.quorum.ServiceImpl.class, config, listener); -//ZookeeperProcessHelper.startZookeeper(org.apache.zookeeper.server.quorum.QuorumPeerMain.class, config, listener); + ZookeeperProcessHelper.startZookeeper(com.bigdata.quorum.ServiceImpl.class, config, listener);//was QuorumPeerMain.class //BTM - PRE_ZOOKEEPER_SMART_PROXY - END /* Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/service/jini/master/TestMappedRDFDataLoadMaster.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -155,8 +155,9 @@ try { helper = new JiniServicesHelper(args, serviceImplRemote); - +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start BEGIN --------\n"); helper.start(); +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start END_1 --------\n"); //BTM - PRE_CLIENT_SERVICE - BEGIN //BTM - PRE_CLIENT_SERVICE new MappedRDFDataLoadMaster(helper.getFederation()).execute(); @@ -172,6 +173,7 @@ //BTM - PRE_CLIENT_SERVICE - END } finally { +System.out.println("\n\n------------------------------------- TestMappedRDFDataLoadMaster helper.start END_2 --------\n"); // delete the temp file containing the federation configuration. tempConfigFile.delete(); Modified: branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java =================================================================== --- branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2010-12-08 17:39:23 UTC (rev 4000) +++ branches/dev-btm/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2010-12-09 22:58:55 UTC (rev 4001) @@ -60,6 +60,7 @@ //BTM - FOR_ZOOKEEPER_SMART_PROXY - BEGIN import com.bigdata.service.QuorumPeerService; +import com.bigdata.util.config.NicUtil; import com.sun.jini.admin.DestroyAdmin; import net.jini.admin.Administrable; import net.jini.core.discovery.LookupLocator; @@ -176,7 +177,7 @@ final int leaderPort = getPort(3888/* suggestedPort */); //BTM - PRE_ZOOKEEPER_SMART_PROXY - BEGIN //BTM - PRE_ZOOKEEPER_SMART_PROXY final String servers = "1=localhost:" + peerPort + ":" + leaderPort; - hostname = com.bigdata.util.config.NicUtil.getIpAddress("default.nic", "default", true); + hostname = NicUtil.getIpAddress("default.nic", "default", true); final String servers = "1="+hostname+":" + peerPort + ":" + leaderPort; //BTM - PRE_ZOOKEEPER_SMART_PROXY - END This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2010-12-08 17:39:29
|
Revision: 4000 http://bigdata.svn.sourceforge.net/bigdata/?rev=4000&view=rev Author: mrpersonick Date: 2010-12-08 17:39:23 +0000 (Wed, 08 Dec 2010) Log Message: ----------- adding test for optional join groups Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java 2010-12-07 23:03:37 UTC (rev 3999) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngineOptionalJoins.java 2010-12-08 17:39:23 UTC (rev 4000) @@ -706,11 +706,12 @@ final int startId = 1; final int joinId1 = 2; final int predId1 = 3; - final int joinId2 = 4; - final int predId2 = 5; - final int joinId3 = 6; - final int predId3 = 7; - final int sliceId = 8; + final int condId = 4; + final int joinId2 = 5; + final int predId2 = 6; + final int joinId3 = 7; + final int predId3 = 8; + final int sliceId = 9; final IVariable<?> a = Var.var("a"); final IVariable<?> b = Var.var("b"); @@ -736,8 +737,68 @@ BOpEvaluationContext.CONTROLLER),// })); + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { a, b }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { b, c }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred3Op = new Predicate<E>( + new IVariableOrConstant[] { c, d }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId3),// + new NV(Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); + + final IConstraint condition = new EQConstant(a, new Constant<String>("Paul")); + + final ConditionalRoutingOp condOp = new ConditionalRoutingOp(new BOp[]{join1Op}, + NV.asMap(new NV[]{// + new NV(BOp.Annotations.BOP_ID,condId), + new NV(PipelineOp.Annotations.SINK_REF, joinId2), + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId), + new NV(ConditionalRoutingOp.Annotations.CONDITION, condition), + })); + + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { condOp },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op),// + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL, true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId)); + + final PipelineOp join3Op = new PipelineJoin<E>(// + new BOp[] { join2Op },// + new NV(Predicate.Annotations.BOP_ID, joinId3),// + new NV(PipelineJoin.Annotations.PREDICATE, pred3Op),// + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL, true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId)); + final PipelineOp sliceOp = new SliceOp(// - new BOp[]{startOp}, + new BOp[]{join3Op}, NV.asMap(new NV[] {// new NV(BOp.Annotations.BOP_ID, sliceId),// new NV(BOp.Annotations.EVALUATION_CONTEXT, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |