|
From: <tho...@us...> - 2014-07-16 15:50:48
|
Revision: 8557
http://sourceforge.net/p/bigdata/code/8557
Author: thompsonbry
Date: 2014-07-16 15:50:37 +0000 (Wed, 16 Jul 2014)
Log Message:
-----------
I have added support for hierarchical locking of the resources in a namespace to the AbstractTask class. There is a new test suite for hierarchical locking support.
We need to improve the GIST capabilities of AbstractTask before we can use it with RDF KBs. Right now, it only supports the B+Tree class and makes assumptions about ILocalBTreeView as the index type. The named solution set mechanisms use the HTree and Stream objects as well. Similar GIST support was introduced to the Journal about a year ago.
The journal, service, and NSS test suites are green.
See #585 GIST (Generalized Indices)
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractTask.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/service/TestAll.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestHierarchicalLockingTasks.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractTask.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2014-07-16 15:37:58 UTC (rev 8556)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractTask.java 2014-07-16 15:50:37 UTC (rev 8557)
@@ -387,10 +387,12 @@
for(String s : resource) {
final Name2Addr.Entry tmp = name2Addr.getEntry(s);
+
+ if (tmp != null) {
- if(tmp != null) {
-
/*
+ * Exact match on a named index.
+ *
* Add a read-only copy of the entry with additional state
* for tracking registration and dropping of named indices.
*
@@ -400,6 +402,37 @@
n2a.put(s, new Entry(tmp));
+ } else {
+
+ /**
+ * Add a read-only copy of the Name2Addr entry for all
+ * entries spanned by that namespace. This provides the
+ * additional state for tracking registration and dropping
+ * of named indices and also supports hierarchical locking
+ * pattersn.
+ *
+ * Note: We do NOT fetch the indices here, just copy their
+ * last checkpoint metadata from Name2Addr.
+ *
+ * @see <a
+ * href="http://trac.bigdata.com/ticket/566"
+ * > Concurrent unisolated operations against multiple
+ * KBs </a>
+ */
+
+ final Iterator<String> itr = Name2Addr.indexNameScan(s,
+ name2Addr);
+
+ while (itr.hasNext()) {
+
+ final String t = itr.next();
+
+ final Name2Addr.Entry tmp2 = name2Addr.getEntry(t);
+
+ n2a.put(t, new Entry(tmp2));
+
+ }
+
}
}
@@ -517,7 +550,7 @@
* delegated to this method. First, the task can use this method directly.
* Second, the task can use {@link #getJournal()} and then use
* {@link IJournal#getIndex(String)} on that journal, which is simply
- * delegated to this method. See {@link IsolatedActionJournal}.
+ * delegated to this method. See {@link IsolatedActionJournal}.
*
* @param name
* The name of the index.
@@ -534,8 +567,12 @@
*
* @return The index.
*
- * @todo modify to return <code>null</code> if the index is not
- * registered?
+ * @todo modify to return <code>null</code> if the index is not registered?
+ *
+ * FIXME GIST. This will throw a ClassCastException if the returned
+ * index is an ILocalBTreeView.
+ *
+ * @see http://trac.bigdata.com/ticket/585 (GIST)
*/
@Override
synchronized final public ILocalBTreeView getIndex(final String name) {
@@ -955,7 +992,7 @@
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
*
- * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/675" >
+ * @see <a href="http://trac.bigdata.com/ticket/675" >
* Flush indices in parallel during checkpoint to reduce IO latency</a>
*/
private class CheckpointIndexTask implements Callable<Void> {
@@ -1003,7 +1040,7 @@
*
* @return The elapsed time in nanoseconds for this operation.
*
- * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/675"
+ * @see <a href="http://trac.bigdata.com/ticket/675"
* >Flush indices in parallel during checkpoint to reduce IO
* latency</a>
*/
@@ -1315,13 +1352,18 @@
* The transaction identifier -or- {@link ITx#UNISOLATED} IFF the
* operation is NOT isolated by a transaction -or-
* <code> - timestamp </code> to read from the most recent commit
- * point not later than the absolute value of <i>timestamp</i>
- * (a historical read).
+ * point not later than the absolute value of <i>timestamp</i> (a
+ * historical read).
* @param resource
* The resource on which the task will operate. E.g., the names
* of the index. When the task is an unisolated write task an
* exclusive lock will be requested on the named resource and the
* task will NOT run until it has obtained that lock.
+ * <p>
+ * The name may identify either a namespace or a concrete index
+ * object. If a concrete index object is discovered, only that
+ * index is isolated. Otherwise all indices having the same
+ * prefix as the namespace are isolated.
*/
protected AbstractTask(final IConcurrencyManager concurrencyManager,
final long timestamp, final String resource) {
@@ -1346,6 +1388,11 @@
* task an exclusive lock will be requested on each named
* resource and the task will NOT run until it has obtained those
* lock(s).
+ * <p>
+ * The name may identify either a namespace or a concrete index
+ * object. If a concrete index object is discovered, only that
+ * index is isolated. Otherwise all indices having the same
+ * prefix as the namespace are isolated.
*/
protected AbstractTask(final IConcurrencyManager concurrencyManager,
final long timestamp, final String[] resource) {
@@ -1453,7 +1500,7 @@
if (transactionManager.getTx(timestamp) == null) {
- /*
+ /**
* Start tx on this data service.
*
* FIXME This should be passing the [readsOnCommitTime] into
@@ -1464,11 +1511,11 @@
* submitted primarily for the clustered database
* deployment.
*
- * @see https://sourceforge.net/apps/trac/bigdata/ticket/266
+ * @see http://trac.bigdata.com/ticket/266
* (refactor native long tx id to thin object)
*
* @see <a
- * href="http://sourceforge.net/apps/trac/bigdata/ticket/546"
+ * href="http://trac.bigdata.com/ticket/546"
* > Add cache for access to historical index views on the
* Journal by name and commitTime. </a>
*/
@@ -1588,41 +1635,40 @@
}
/**
- * FIXME GROUP_COMMIT: Supporting this requires us to support
- * efficient scans of the indices in Name2Addr having the prefix
- * values declared by [resources] since getIndex(name) will fail if
- * the Name2Addr entry has not been buffered within the [n2a] cache.
+ * Look for a prefix that spans one or more resources.
*
- * @see <a
- * href="http://sourceforge.net/apps/trac/bigdata/ticket/753" >
- * HA doLocalAbort() should interrupt NSS requests and
+ * Note: Supporting this requires us to support efficient scans of
+ * the indices in Name2Addr since getIndex(name) will fail if the
+ * Name2Addr entry has not been buffered within the [n2a] cache.
+ *
+ * @see <a href="http://trac.bigdata.com/ticket/753" > HA
+ * doLocalAbort() should interrupt NSS requests and
* AbstractTasks </a>
- * @see <a
- * href="- http://sourceforge.net/apps/trac/bigdata/ticket/566"
- * > Concurrent unisolated operations against multiple KBs </a>
+ * @see <a href="http://trac.bigdata.com/ticket/566" > Concurrent
+ * unisolated operations against multiple KBs </a>
*/
-// if (theRequestedResource.startsWith(theDeclaredResource)) {
-//
-// // Possible prefix match.
-//
-// if (theRequestedResource.charAt(theDeclaredResource.length()) == '.') {
-//
-// /*
-// * Prefix match.
-// *
-// * E.g., name:="kb.spo.osp" and the task declared the
-// * resource "kb". In this case, "kb" is a PREFIX of the
-// * declared resource and the next character is the separator
-// * character for the resource names (this last point is
-// * important to avoid unintended contention between
-// * namespaces such as "kb" and "kb1").
-// */
-// return true;
-//
-// }
-//
-// }
+ if (theRequestedResource.startsWith(theDeclaredResource)) {
+
+ // Possible prefix match.
+
+ if (theRequestedResource.charAt(theDeclaredResource.length()) == '.') {
+ /*
+ * Prefix match.
+ *
+ * E.g., name:="kb.spo.osp" and the task declared the
+ * resource "kb". In this case, "kb" is a PREFIX of the
+ * declared resource and the next character is the separator
+ * character for the resource names (this last point is
+ * important to avoid unintended contention between
+ * namespaces such as "kb" and "kb1").
+ */
+ return true;
+
+ }
+
+ }
+
}
return false;
@@ -2488,7 +2534,7 @@
* FIXME GIST : Support registration of index types other than BTree
* (HTree, Stream, etc).
*
- * @see https://sourceforge.net/apps/trac/bigdata/ticket/585 (GIST)
+ * @see http://trac.bigdata.com/ticket/585 (GIST)
*/
throw new UnsupportedOperationException();
@@ -2522,14 +2568,14 @@
* Note: access to an unisolated index is governed by the AbstractTask.
*/
@Override
- public ICheckpointProtocol getUnisolatedIndex(String name) {
+ public ICheckpointProtocol getUnisolatedIndex(final String name) {
try {
/*
* FIXME GIST. This will throw a ClassCastException if the
* returned index is an ILocalBTreeView.
*
- * @see https://sourceforge.net/apps/trac/bigdata/ticket/585 (GIST)
+ * @see http://trac.bigdata.com/ticket/585 (GIST)
*/
return (ICheckpointProtocol) AbstractTask.this.getIndex(name);
@@ -2599,7 +2645,7 @@
* in a ClassCastException.
*
* @see <a
- * href="https://sourceforge.net/apps/trac/bigdata/ticket/585"
+ * href="http://trac.bigdata.com/ticket/585"
* > GIST </a>
*/
return (ICheckpointProtocol) resourceManager.getIndex(name, commitTime);
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestHierarchicalLockingTasks.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestHierarchicalLockingTasks.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestHierarchicalLockingTasks.java 2014-07-16 15:50:37 UTC (rev 8557)
@@ -0,0 +1,378 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Oct 15, 2007
+ */
+
+package com.bigdata.journal;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.bigdata.btree.IIndex;
+import com.bigdata.btree.ILocalBTreeView;
+import com.bigdata.btree.IndexMetadata;
+
+/**
+ * Test suite for hierarchical locking of indices based on namespace prefixes.
+ * This test suite was introduced as part of the support for group commit for
+ * the RDF database layer in the non-scale-out modes. In order to be able to run
+ * RDF operations using job-based concurrency, we need to predeclare the
+ * namespace (rather than the individual indices) and then isolate all indices
+ * spanned by that namespace.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ *
+ * @see <a href="- http://sourceforge.net/apps/trac/bigdata/ticket/566" >
+ * Concurrent unisolated operations against multiple KBs </a>
+ *
+ * FIXME GROUP COMMIT (hierarchical locking): We need to test each
+ * different kind of index access here (unisolated, isolated by a
+ * read/write tx, and isolated by a read-only tx, and read-historical (no
+ * isolation)).
+ */
+public class TestHierarchicalLockingTasks extends ProxyTestCase<Journal> {
+
+ /**
+ *
+ */
+ public TestHierarchicalLockingTasks() {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public TestHierarchicalLockingTasks(final String name) {
+ super(name);
+ }
+
+ /**
+ * Test creates several named indices some of which have a shared namespace
+ * prefix and verifies that a lock declared for the namespace prefix permits
+ * that task to access any index in that namespace. The test also verifies
+ * that an index that does not share the namespace prefix may not be
+ * accessed by the task.
+ *
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ public void test_hierarchicalLocking_001() throws InterruptedException,
+ ExecutionException {
+
+ // namespace for declared locks.
+ final String[] namespace = new String[] { "foo" };
+
+ // indices that we create, write on, and verify.
+ final String[] allowed_indices = new String[] { "foo.bar", "foo.baz" };
+
+ // indices that are outside of the declared namespace. we verify that we
+ // can not access these indices.
+ final String[] disallowed_indices = new String[] { "goo", "goo.bar" };
+
+ final Journal journal = getStore();
+
+ try {
+
+ /*
+ * Create indices that we should not be able to see when holding
+ * just the lock for the namespace.
+ */
+ for (int i = 0; i < disallowed_indices.length; i++) {
+
+ final String name = disallowed_indices[i];
+
+ journal.submit(new RegisterIndexTask(journal
+ .getConcurrencyManager(), name, new IndexMetadata(name,
+ UUID.randomUUID()))).get();
+
+ }
+
+ /*
+ * Create indices that we should be able to see when holding the
+ * lock for the namespace.
+ */
+ for (int i = 0; i < allowed_indices.length; i++) {
+
+ final String name = allowed_indices[i];
+
+ journal.submit(new RegisterIndexTask(journal
+ .getConcurrencyManager(), name, new IndexMetadata(name,
+ UUID.randomUUID()))).get();
+
+ }
+
+ /*
+ * Submit task holding the lock for the namespace and verify that we
+ * can see the indices that are spanned by the namespace, but not
+ * those that are not spanned by the namespace.
+ */
+
+ final Future<Void> ft = journal
+ .submit(new AbstractTask<Void>(journal
+ .getConcurrencyManager(), ITx.UNISOLATED, namespace) {
+
+ @Override
+ protected Void doTask() throws Exception {
+
+ // Verify access to the indices in that namespace.
+ for (String name : allowed_indices) {
+
+ assertNotNull(name, getIndex(name));
+
+ }
+
+ /*
+ * Verify no access to the indices outside of that
+ * namespace.
+ */
+ for (String name : disallowed_indices) {
+
+ try {
+ // Attempt to access index.
+ getIndex(name);
+ fail("Expecting: "
+ + IllegalStateException.class
+ + " for " + name);
+ } catch (IllegalStateException ex) {
+ // log and ignore expected exception.
+ if (log.isInfoEnabled())
+ log.info("Ignoring expected exception");
+ }
+
+ }
+
+ // Done.
+ return null;
+ }
+ });
+
+ try {
+ // Await outcome.
+ ft.get();
+ } finally {
+ ft.cancel(true/* mayInterruptIfRunning */);
+ }
+
+ } finally {
+
+ journal.destroy();
+
+ }
+
+ }
+
+ /**
+ * Unit test for hierarchical locking verifies that we can declared a
+ * namespace for a task which then creates multiple indices spanned by that
+ * namespace.
+ *
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ public void test_hierarchicalLocking_create_destroy()
+ throws InterruptedException, ExecutionException {
+
+ // namespace for declared locks.
+ final String[] namespace = new String[] { "foo" };
+
+ // indices that we create, write on, and verify.
+ final String[] allowed_indices = new String[] { "foo.bar", "foo.baz" };
+
+ final Journal journal = getStore();
+
+ try {
+
+ final UUID[] uuids = journal.submit(
+ new AbstractTask<UUID[]>(journal.getConcurrencyManager(),
+ ITx.UNISOLATED, namespace) {
+
+ @Override
+ protected UUID[] doTask() throws Exception {
+
+ final UUID[] indexUUIDs = new UUID[allowed_indices.length];
+
+ /*
+ * Create indices that we should be able to see when
+ * holding the lock for the namespace.
+ */
+ for (int i = 0; i < allowed_indices.length; i++) {
+
+ final String name = allowed_indices[i];
+
+ IIndex ndx = getJournal().getIndex(name);
+
+ if (ndx != null) {
+
+ final UUID indexUUID = ndx.getIndexMetadata().getIndexUUID();
+
+ if (log.isInfoEnabled())
+ log.info("Index exists: name=" + name + ", indexUUID=" + indexUUID);
+
+ indexUUIDs[i] = indexUUID;
+
+ }
+
+ // register the index.
+ ndx = getJournal().registerIndex(
+ name,
+ new IndexMetadata(name, UUID
+ .randomUUID()));
+
+ final UUID indexUUID = ndx.getIndexMetadata().getIndexUUID();
+
+ if (log.isInfoEnabled())
+ log.info("Registered index: name=" + name + ", class="
+ + ndx.getClass() + ", indexUUID=" + indexUUID);
+
+ indexUUIDs[i] = indexUUID;
+
+ }
+
+ // Done
+ return indexUUIDs;
+ }
+
+ }).get();
+
+ // should be non-null.
+ assertNotNull(uuids);
+
+ // Should be non-null.
+ for (UUID uuid : uuids) {
+
+ assertNotNull(uuid);
+
+ }
+
+ /*
+ * Verify access to the newly created indices.
+ */
+ journal.submit(
+ new AbstractTask<Void>(journal.getConcurrencyManager(),
+ ITx.UNISOLATED, namespace) {
+
+ @Override
+ protected Void doTask() throws Exception {
+
+ /*
+ * Verify access to the newly created indices.
+ */
+ for (int i = 0; i < allowed_indices.length; i++) {
+
+ final String name = allowed_indices[i];
+
+ final ILocalBTreeView ndx = getIndex(name);
+
+ assertNotNull(ndx);
+
+ assertEquals(0L, ndx.rangeCount());
+
+ // add a key.
+ ndx.insert(new byte[] {},
+ new byte[] { (byte) i });
+
+ }
+
+ // Done
+ return null;
+ }
+
+ }).get();
+
+ /*
+ * Destroy the newly created indices.
+ */
+ journal.submit(
+ new AbstractTask<Void>(journal.getConcurrencyManager(),
+ ITx.UNISOLATED, namespace) {
+
+ @Override
+ protected Void doTask() throws Exception {
+
+ /*
+ * Create indices that we should be able to see when
+ * holding the lock for the namespace.
+ */
+ for (int i = 0; i < allowed_indices.length; i++) {
+
+ final String name = allowed_indices[i];
+
+ getJournal().dropIndex(name);
+
+ }
+
+ // Done
+ return null;
+ }
+
+ }).get();
+
+ /*
+ * Verify that the indices can no longer be accessed.
+ */
+ journal.submit(
+ new AbstractTask<Void>(journal.getConcurrencyManager(),
+ ITx.UNISOLATED, namespace) {
+
+ @Override
+ protected Void doTask() throws Exception {
+
+ /*
+ * Verify no access to the dropped indices.
+ */
+ for (int i = 0; i < allowed_indices.length; i++) {
+
+ final String name = allowed_indices[i];
+
+ try {
+ // Attempt to access index.
+ getIndex(name);
+ fail("Expecting: "
+ + IllegalStateException.class
+ + " for " + name);
+ } catch (IllegalStateException ex) {
+ // log and ignore expected exception.
+ if (log.isInfoEnabled())
+ log.info("Ignoring expected exception");
+ }
+
+ }
+
+ // Done
+ return null;
+ }
+
+ }).get();
+
+ } finally {
+
+ journal.destroy();
+
+ }
+
+ }
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java 2014-07-16 15:37:58 UTC (rev 8556)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestJournalBasics.java 2014-07-16 15:50:37 UTC (rev 8557)
@@ -125,15 +125,25 @@
// suite.addTestSuite(TestAddDropIndexTask.class);
// test writing on one or more unisolated indices and verify read back after the commit.
suite.addTestSuite(TestUnisolatedWriteTasks.class);
+ // test suite for hierarchical locking (namespace prefixes).
+ suite.addTestSuite(TestHierarchicalLockingTasks.class);
+ // test suite for GIST operations using group commit.
+// suite.addTestSuite(TestGISTTasks.class);
// stress test of throughput when lock contention serializes unisolated writers.
suite.addTestSuite(StressTestLockContention.class);
// stress test of group commit.
suite.addTestSuite(StressTestGroupCommit.class);
// stress tests of writes on unisolated named indices using ConcurrencyManager.
suite.addTestSuite(StressTestConcurrentUnisolatedIndices.class);
- // stress tests of writes on unisolated named indices using UnisolatedReadWriteIndex.
- suite.addTestSuite(StressTestConcurrentUnisolatedIndices.class);
/*
+ * Stress tests of writes on unisolated named indices using
+ * UnisolatedReadWriteIndex.
+ *
+ * FIXME This test appears to cause #343 (Stochastic assert in
+ * AbstractBTree#writeNodeOrLeaf() in CI)
+ */
+// suite.addTestSuite(StressTestUnisolatedReadWriteIndex.class);
+ /*
* Stress test of concurrent transactions.
*
* Note: transactions use unisolated operations on the live indices when
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/service/TestAll.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/service/TestAll.java 2014-07-16 15:37:58 UTC (rev 8556)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/service/TestAll.java 2014-07-16 15:50:37 UTC (rev 8557)
@@ -28,8 +28,6 @@
package com.bigdata.service;
-
-
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
@@ -38,7 +36,6 @@
* Test suite for embedded services.
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
*/
public class TestAll extends TestCase {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|