|
From: <tho...@us...> - 2013-11-13 17:19:07
|
Revision: 7537
http://bigdata.svn.sourceforge.net/bigdata/?rev=7537&view=rev
Author: thompsonbry
Date: 2013-11-13 17:19:00 +0000 (Wed, 13 Nov 2013)
Log Message:
-----------
Refactored IIndexManagerCallable out of the HAGlue interface.
Some changes related to DumpJournal and TestDumpJournal for an as-yet unresolved issue with the inability to run DumpJournal with a concurrent mutation on the Journal.
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-11-13 17:14:29 UTC (rev 7536)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -25,11 +25,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.Serializable;
import java.rmi.Remote;
import java.security.DigestException;
import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -48,7 +46,6 @@
import com.bigdata.ha.msg.IHASnapshotRequest;
import com.bigdata.ha.msg.IHASnapshotResponse;
import com.bigdata.journal.AbstractJournal;
-import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.jini.ha.HAJournalServer;
import com.bigdata.quorum.AsynchronousQuorumCloseException;
import com.bigdata.quorum.QuorumException;
@@ -299,7 +296,6 @@
Future<Void> rebuildFromLeader(IHARemoteRebuildRequest req)
throws IOException;
-
/**
* Run the caller's task on the service.
*
@@ -314,35 +310,5 @@
*/
public <T> Future<T> submit(IIndexManagerCallable<T> callable,
boolean asyncFuture) throws IOException;
-
-
- public interface IIndexManagerCallable<T> extends Serializable, Callable<T> {
-
- /**
- * Invoked before the task is executed to provide a reference to the
- * {@link IIndexManager} on which it is executing.
- *
- * @param indexManager
- * The index manager on the service.
- *
- * @throws IllegalArgumentException
- * if the argument is <code>null</code>
- * @throws IllegalStateException
- * if {@link #setIndexManager(IIndexManager)} has already been
- * invoked and was set with a different value.
- */
- void setIndexManager(IIndexManager indexManager);
-
- /**
- * Return the {@link IIndexManager}.
- *
- * @return The data service and never <code>null</code>.
- *
- * @throws IllegalStateException
- * if {@link #setIndexManager(IIndexManager)} has not been invoked.
- */
- IIndexManager getIndexManager();
-
- }
}
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -0,0 +1,66 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+package com.bigdata.ha;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import com.bigdata.journal.IIndexManager;
+
+/**
+ * Interface allows arbitrary tasks to be submitted to an {@link HAGlue} service
+ * for evaluation.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @param <T>
+ */
+public interface IIndexManagerCallable<T> extends Serializable, Callable<T> {
+
+ /**
+ * Invoked before the task is executed to provide a reference to the
+ * {@link IIndexManager} on which it is executing.
+ *
+ * @param indexManager
+ * The index manager on the service.
+ *
+ * @throws IllegalArgumentException
+ * if the argument is <code>null</code>
+ * @throws IllegalStateException
+ * if {@link #setIndexManager(IIndexManager)} has already been
+ * invoked and was set with a different value.
+ */
+ void setIndexManager(IIndexManager indexManager);
+
+ /**
+ * Return the {@link IIndexManager}.
+ *
+ * @return The data service and never <code>null</code>.
+ *
+ * @throws IllegalStateException
+ * if {@link #setIndexManager(IIndexManager)} has not been
+ * invoked.
+ */
+ IIndexManager getIndexManager();
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java 2013-11-13 17:14:29 UTC (rev 7536)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -2,7 +2,6 @@
import org.apache.log4j.Logger;
-import com.bigdata.ha.HAGlue.IIndexManagerCallable;
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.jini.ha.HAJournal;
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-13 17:14:29 UTC (rev 7536)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -100,6 +100,7 @@
import com.bigdata.ha.HAGlue;
import com.bigdata.ha.HAStatusEnum;
import com.bigdata.ha.HATXSGlue;
+import com.bigdata.ha.IIndexManagerCallable;
import com.bigdata.ha.IJoinedAndNonJoinedServices;
import com.bigdata.ha.JoinedAndNonJoinedServices;
import com.bigdata.ha.PrepareRequest;
@@ -3106,8 +3107,8 @@
*/
private final long commitToken;
- /** The #of bytes on the journal as of the previous commit point. */
- private final long byteCountBefore;
+// /** The #of bytes on the journal as of the previous commit point. */
+// private final long byteCountBefore;
/**
* The commit counter that will be assigned to the new commit point.
@@ -3150,8 +3151,8 @@
this.old = store._rootBlock;
- // #of bytes on the journal as of the previous commit point.
- this.byteCountBefore = store._rootBlock.getNextOffset();
+// // #of bytes on the journal as of the previous commit point.
+// this.byteCountBefore = store._rootBlock.getNextOffset();
this.newCommitCounter = old.getCommitCounter() + 1;
@@ -5857,12 +5858,14 @@
/*
* We also need to discard any active read/write tx since there
- * is no longer a quorum and a read/write tx was running on the
+ * is no longer a quorum. This will hit both read-only
+ * transactions running on any service (not necessarily the
+ * leader) and read/write transactions if this service was the
* old leader.
*
- * We do not need to discard read-only tx since the committed
- * state should remain valid even when a quorum is lost.
- * However, it would be a bit odd to leave read-only
+ * Note: We do not need to discard read-only tx since the
+ * committed state should remain valid even when a quorum is
+ * lost. However, it would be a bit odd to leave read-only
* transactions running if you could not start a new read-only
* because the quorum is not met.
*/
@@ -5874,7 +5877,17 @@
*
* FIXME HA : Abort the unisolated connection? (esp for group
* commit and the NSS level SPARQL and REST API unisolated
- * operations).
+ * operations). Maybe we can wrap the execute of the UpdateTask
+ * and the execution of the REST Mutation API methods in a
+ * well-known ThreadGuard and then do interruptAll() to force
+ * the cancelation of any running task? We could also wrap any
+ * IIndexManagerCallable in HAGlue.submit() with a FutureTask
+ * implementation that uses the appropriate ThreadGuard to
+ * ensure that any unisolated tasks are cancelled (that is
+ * actually overkill since it would not differentiate TX based
+ * operations from unisolated operations - we could also use
+ * that ThreadGuard in AbstractTask). Add unit tests for both
+ * UPDATE and other REST mutation methods.
*
* @see <a
* href="https://sourceforge.net/apps/trac/bigdata/ticket/753"
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-13 17:14:29 UTC (rev 7536)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -381,6 +381,18 @@
final boolean dumpHistory, final boolean dumpPages,
final boolean dumpIndices, final boolean showTuples) {
+// Note: This does not fix the issue.
+// /**
+// * Start a transaction. This will bracket all index access and protect
+// * the data on the journal from concurrent recycling.
+// *
+// * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762">
+// * DumpJournal does not protect against concurrent updates (NSS)
+// * </a>
+// */
+// final long tx = journal.newTx(ITx.READ_COMMITTED);
+// try {
+//
final FileMetadata fmd = journal.getFileMetadata();
if (fmd != null) {
@@ -600,6 +612,9 @@
dumpPages, dumpIndices, showTuples);
}
+// } finally {
+// journal.abort(tx);
+// }
}
@@ -614,7 +629,7 @@
}
- public void dumpGlobalRowStore(final PrintWriter out) {
+ private void dumpGlobalRowStore(final PrintWriter out) {
final SparseRowStore grs = journal.getGlobalRowStore(journal
.getLastCommitTime());
@@ -826,7 +841,7 @@
*
* @return
*/
- public String dumpRawRecord(final long addr) {
+ private String dumpRawRecord(final long addr) {
if (journal.getBufferStrategy() instanceof IRWStrategy) {
/**
@@ -984,6 +999,7 @@
}
}
case Stream:
+ @SuppressWarnings("unused")
final Stream stream = (Stream) ndx;
/*
* Note: We can't do anything here with a Stream, but we do
@@ -1004,41 +1020,4 @@
}
- /**
- * Return the data in the buffer.
- */
- public static byte[] getBytes(ByteBuffer buf) {
-
- if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0
- && buf.limit() == buf.capacity()) {
-
- /*
- * Return the backing array.
- */
-
- return buf.array();
-
- }
-
- /*
- * Copy the expected data into a byte[] using a read-only view on the
- * buffer so that we do not mess with its position, mark, or limit.
- */
- final byte[] a;
- {
-
- buf = buf.asReadOnlyBuffer();
-
- final int len = buf.remaining();
-
- a = new byte[len];
-
- buf.get(a);
-
- }
-
- return a;
-
- }
-
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-13 17:14:29 UTC (rev 7536)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-13 17:19:00 UTC (rev 7537)
@@ -29,15 +29,23 @@
package com.bigdata.journal;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
import com.bigdata.btree.AbstractBTreeTestCase;
import com.bigdata.btree.BTree;
import com.bigdata.btree.HTreeIndexMetadata;
import com.bigdata.btree.IndexMetadata;
import com.bigdata.btree.keys.KV;
+import com.bigdata.concurrent.FutureTaskMon;
import com.bigdata.htree.HTree;
+import com.bigdata.rwstore.IRWStrategy;
+import com.bigdata.util.concurrent.LatchedExecutor;
/**
* Test suite for {@link DumpJournal}.
@@ -66,8 +74,10 @@
/**
* @param name
*/
- public TestDumpJournal(String name) {
+ public TestDumpJournal(final String name) {
+
super(name);
+
}
/**
@@ -361,4 +371,254 @@
}
+ /**
+ * Unit test for {@link DumpJournal} with concurrent updates against the
+ * backing store. This is intended primarily to detect failures to protect
+ * against the recycling model associated with the {@link IRWStrategy}.
+ *
+ * @throws Exception
+ *
+ * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762">
+ * DumpJournal does not protect against concurrent updates (NSS) </a>
+ */
+ public void test_dumpJournal_concurrent_updates() throws Exception {
+
+ final String PREFIX = "testIndex#";
+ final int NUM_INDICES = 4;
+
+ Journal src = getStore(getProperties());
+
+ try {
+
+ for (int i = 0; i < NUM_INDICES; i++) {
+
+ // register an index
+ final String name = PREFIX + i;
+
+ src.registerIndex(new IndexMetadata(name, UUID.randomUUID()));
+ {
+
+ // lookup the index.
+ final BTree ndx = src.getIndex(name);
+
+ // #of tuples to write.
+ final int ntuples = r.nextInt(1000);
+
+ // generate random data.
+ final KV[] a = AbstractBTreeTestCase
+ .getRandomKeyValues(ntuples);
+
+ // write tuples (in random order)
+ for (KV kv : a) {
+
+ ndx.insert(kv.key, kv.val);
+
+ if (r.nextInt(100) < 10) {
+
+ // randomly increment the counter (10% of the time).
+ ndx.getCounter().incrementAndGet();
+
+ }
+
+ }
+
+ }
+
+ }
+
+ // commit the journal (!)
+ src.commit();
+
+ /**
+ * Task to run various DumpJournal requests.
+ */
+ final class DumpTask implements Callable<Void> {
+
+ private final Journal src;
+
+ public DumpTask(final Journal src) {
+
+ this.src = src;
+
+ }
+ @Override
+ public Void call() throws Exception {
+
+ new DumpJournal(src)
+ .dumpJournal(false/* dumpHistory */,
+ true/* dumpPages */,
+ false/* dumpIndices */, false/* showTuples */);
+
+ new DumpJournal(src)
+ .dumpJournal(true/* dumpHistory */,
+ true/* dumpPages */, true/* dumpIndices */,
+ false/* showTuples */);
+
+ // test again w/o dumpPages
+ new DumpJournal(src)
+ .dumpJournal(true/* dumpHistory */,
+ false/* dumpPages */,
+ true/* dumpIndices */, false/* showTuples */);
+
+ return (Void) null;
+
+ }
+
+ }
+
+ final class UpdateTask implements Callable<Void> {
+
+ private final Journal src;
+
+ public UpdateTask(final Journal src) {
+
+ this.src = src;
+
+ }
+ @Override
+ public Void call() throws Exception {
+
+ /*
+ * Now write some more data, going through a series of commit
+ * points. This let's us check access to historical commit points.
+ */
+ for (int j = 0; j < 10; j++) {
+
+ for (int i = 0; i < NUM_INDICES; i++) {
+
+ // register an index
+ final String name = PREFIX + i;
+
+ // lookup the index.
+ final BTree ndx = src.getIndex(name);
+
+ // #of tuples to write.
+ final int ntuples = r.nextInt(1000);
+
+ // generate random data.
+ final KV[] a = AbstractBTreeTestCase
+ .getRandomKeyValues(ntuples);
+
+ // write tuples (in random order)
+ for (KV kv : a) {
+
+ ndx.insert(kv.key, kv.val);
+
+ if (r.nextInt(100) < 10) {
+
+ // randomly increment the counter (10% of the time).
+ ndx.getCounter().incrementAndGet();
+
+ }
+
+ }
+
+ }
+
+ log.info("Will commit");
+ src.commit();
+ log.info("Did commit");
+
+ }
+
+ return (Void) null;
+ }
+ }
+
+ final List<FutureTask<Void>> tasks1 = new LinkedList<FutureTask<Void>>();
+ final List<FutureTask<Void>> tasks2 = new LinkedList<FutureTask<Void>>();
+ final List<FutureTask<Void>> tasksAll = new LinkedList<FutureTask<Void>>();
+
+ // Setup executor that limits parallelism.
+ final LatchedExecutor executor1 = new LatchedExecutor(
+ src.getExecutorService(), 1/* nparallel */);
+
+ // Setup executor that limits parallelism.
+ final LatchedExecutor executor2 = new LatchedExecutor(
+ src.getExecutorService(), 1/* nparallel */);
+
+ try {
+
+ // Tasks to run.
+ tasks1.add(new FutureTaskMon<Void>(new DumpTask(src)));
+ tasks1.add(new FutureTaskMon<Void>(new DumpTask(src)));
+ tasks1.add(new FutureTaskMon<Void>(new DumpTask(src)));
+
+ tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src)));
+ tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src)));
+ tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src)));
+
+ // Schedule the tasks.
+ for (FutureTask<Void> ft : tasks1)
+ executor1.execute(ft);
+ for (FutureTask<Void> ft : tasks2)
+ executor2.execute(ft);
+
+ log.info("Blocking for futures");
+
+ // Wait for tasks.
+ tasksAll.addAll(tasks1);
+ tasksAll.addAll(tasks2);
+ int ndone = 0;
+ for (FutureTask<Void> ft : tasksAll) {
+
+ /*
+ * Check Future.
+ *
+ * Note: sanity check for test termination w/ timeout.
+ */
+
+ try {
+ ft.get(2, TimeUnit.MINUTES);
+ } catch (ExecutionException ex) {
+ log.error("ndone=" + ndone, ex);
+ throw ex;
+ }
+
+ log.info("ndone=" + ndone);
+ ndone++;
+ }
+
+ } finally {
+
+ // Ensure tasks are terminated.
+ for (FutureTask<Void> ft : tasksAll) {
+
+ ft.cancel(true/*mayInterruptIfRunning*/);
+
+ }
+
+ }
+
+ if (src.isStable()) {
+
+ src = reopenStore(src);
+
+ // Try running the DumpTask again.
+ new DumpTask(src).call();
+
+ }
+
+ } finally {
+
+ src.destroy();
+
+ }
+
+ }
+ /** Stress test to look for different failure modes. */
+ public void _testStress_dumpJournal_concurrent_updates() throws Exception {
+ final int LIMIT = 20;
+ for (int i = 0; i < LIMIT; i++) {
+ if (i > 1)
+ setUp();
+ try {
+ test_dumpJournal_concurrent_updates();
+ } catch (Exception ex) {
+ log.fatal("FAILURE: i=" + i + ", cause=" + ex);
+ }
+ if (i + 1 < LIMIT)
+ tearDown();
+ }
+ }
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|