From: <tho...@us...> - 2013-11-13 17:19:07
|
Revision: 7537 http://bigdata.svn.sourceforge.net/bigdata/?rev=7537&view=rev Author: thompsonbry Date: 2013-11-13 17:19:00 +0000 (Wed, 13 Nov 2013) Log Message: ----------- Refactored IIndexManagerCallable out of the HAGlue interface. Some changes related to DumpJournal and TestDumpJournal for an as-yet unresolved issue with the inability to run DumpJournal with a concurrent mutation on the Journal. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-11-13 17:14:29 UTC (rev 7536) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/HAGlue.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -25,11 +25,9 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.Serializable; import java.rmi.Remote; import java.security.DigestException; import java.security.NoSuchAlgorithmException; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -48,7 +46,6 @@ import com.bigdata.ha.msg.IHASnapshotRequest; import com.bigdata.ha.msg.IHASnapshotResponse; import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.IIndexManager; import com.bigdata.journal.jini.ha.HAJournalServer; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.QuorumException; @@ -299,7 +296,6 @@ Future<Void> rebuildFromLeader(IHARemoteRebuildRequest req) throws IOException; - /** * Run the caller's task on the service. * @@ -314,35 +310,5 @@ */ public <T> Future<T> submit(IIndexManagerCallable<T> callable, boolean asyncFuture) throws IOException; - - - public interface IIndexManagerCallable<T> extends Serializable, Callable<T> { - - /** - * Invoked before the task is executed to provide a reference to the - * {@link IIndexManager} on which it is executing. - * - * @param indexManager - * The index manager on the service. - * - * @throws IllegalArgumentException - * if the argument is <code>null</code> - * @throws IllegalStateException - * if {@link #setIndexManager(IIndexManager)} has already been - * invoked and was set with a different value. - */ - void setIndexManager(IIndexManager indexManager); - - /** - * Return the {@link IIndexManager}. - * - * @return The data service and never <code>null</code>. - * - * @throws IllegalStateException - * if {@link #setIndexManager(IIndexManager)} has not been invoked. - */ - IIndexManager getIndexManager(); - - } } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IIndexManagerCallable.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -0,0 +1,66 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import java.io.Serializable; +import java.util.concurrent.Callable; + +import com.bigdata.journal.IIndexManager; + +/** + * Interface allows arbitrary tasks to be submitted to an {@link HAGlue} service + * for evaluation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @param <T> + */ +public interface IIndexManagerCallable<T> extends Serializable, Callable<T> { + + /** + * Invoked before the task is executed to provide a reference to the + * {@link IIndexManager} on which it is executing. + * + * @param indexManager + * The index manager on the service. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code> + * @throws IllegalStateException + * if {@link #setIndexManager(IIndexManager)} has already been + * invoked and was set with a different value. + */ + void setIndexManager(IIndexManager indexManager); + + /** + * Return the {@link IIndexManager}. + * + * @return The data service and never <code>null</code>. + * + * @throws IllegalStateException + * if {@link #setIndexManager(IIndexManager)} has not been + * invoked. + */ + IIndexManager getIndexManager(); + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java 2013-11-13 17:14:29 UTC (rev 7536) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/IndexManagerCallable.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -2,7 +2,6 @@ import org.apache.log4j.Logger; -import com.bigdata.ha.HAGlue.IIndexManagerCallable; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.jini.ha.HAJournal; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-13 17:14:29 UTC (rev 7536) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -100,6 +100,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.HATXSGlue; +import com.bigdata.ha.IIndexManagerCallable; import com.bigdata.ha.IJoinedAndNonJoinedServices; import com.bigdata.ha.JoinedAndNonJoinedServices; import com.bigdata.ha.PrepareRequest; @@ -3106,8 +3107,8 @@ */ private final long commitToken; - /** The #of bytes on the journal as of the previous commit point. */ - private final long byteCountBefore; +// /** The #of bytes on the journal as of the previous commit point. */ +// private final long byteCountBefore; /** * The commit counter that will be assigned to the new commit point. @@ -3150,8 +3151,8 @@ this.old = store._rootBlock; - // #of bytes on the journal as of the previous commit point. - this.byteCountBefore = store._rootBlock.getNextOffset(); +// // #of bytes on the journal as of the previous commit point. +// this.byteCountBefore = store._rootBlock.getNextOffset(); this.newCommitCounter = old.getCommitCounter() + 1; @@ -5857,12 +5858,14 @@ /* * We also need to discard any active read/write tx since there - * is no longer a quorum and a read/write tx was running on the + * is no longer a quorum. This will hit both read-only + * transactions running on any service (not necessarily the + * leader) and read/write transactions if this service was the * old leader. * - * We do not need to discard read-only tx since the committed - * state should remain valid even when a quorum is lost. - * However, it would be a bit odd to leave read-only + * Note: We do not need to discard read-only tx since the + * committed state should remain valid even when a quorum is + * lost. However, it would be a bit odd to leave read-only * transactions running if you could not start a new read-only * because the quorum is not met. */ @@ -5874,7 +5877,17 @@ * * FIXME HA : Abort the unisolated connection? (esp for group * commit and the NSS level SPARQL and REST API unisolated - * operations). + * operations). Maybe we can wrap the execute of the UpdateTask + * and the execution of the REST Mutation API methods in a + * well-known ThreadGuard and then do interruptAll() to force + * the cancelation of any running task? We could also wrap any + * IIndexManagerCallable in HAGlue.submit() with a FutureTask + * implementation that uses the appropriate ThreadGuard to + * ensure that any unisolated tasks are cancelled (that is + * actually overkill since it would not differentiate TX based + * operations from unisolated operations - we could also use + * that ThreadGuard in AbstractTask). Add unit tests for both + * UPDATE and other REST mutation methods. * * @see <a * href="https://sourceforge.net/apps/trac/bigdata/ticket/753" Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-13 17:14:29 UTC (rev 7536) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -381,6 +381,18 @@ final boolean dumpHistory, final boolean dumpPages, final boolean dumpIndices, final boolean showTuples) { +// Note: This does not fix the issue. +// /** +// * Start a transaction. This will bracket all index access and protect +// * the data on the journal from concurrent recycling. +// * +// * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> +// * DumpJournal does not protect against concurrent updates (NSS) +// * </a> +// */ +// final long tx = journal.newTx(ITx.READ_COMMITTED); +// try { +// final FileMetadata fmd = journal.getFileMetadata(); if (fmd != null) { @@ -600,6 +612,9 @@ dumpPages, dumpIndices, showTuples); } +// } finally { +// journal.abort(tx); +// } } @@ -614,7 +629,7 @@ } - public void dumpGlobalRowStore(final PrintWriter out) { + private void dumpGlobalRowStore(final PrintWriter out) { final SparseRowStore grs = journal.getGlobalRowStore(journal .getLastCommitTime()); @@ -826,7 +841,7 @@ * * @return */ - public String dumpRawRecord(final long addr) { + private String dumpRawRecord(final long addr) { if (journal.getBufferStrategy() instanceof IRWStrategy) { /** @@ -984,6 +999,7 @@ } } case Stream: + @SuppressWarnings("unused") final Stream stream = (Stream) ndx; /* * Note: We can't do anything here with a Stream, but we do @@ -1004,41 +1020,4 @@ } - /** - * Return the data in the buffer. - */ - public static byte[] getBytes(ByteBuffer buf) { - - if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0 - && buf.limit() == buf.capacity()) { - - /* - * Return the backing array. - */ - - return buf.array(); - - } - - /* - * Copy the expected data into a byte[] using a read-only view on the - * buffer so that we do not mess with its position, mark, or limit. - */ - final byte[] a; - { - - buf = buf.asReadOnlyBuffer(); - - final int len = buf.remaining(); - - a = new byte[len]; - - buf.get(a); - - } - - return a; - - } - } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-13 17:14:29 UTC (rev 7536) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-13 17:19:00 UTC (rev 7537) @@ -29,15 +29,23 @@ package com.bigdata.journal; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import com.bigdata.btree.AbstractBTreeTestCase; import com.bigdata.btree.BTree; import com.bigdata.btree.HTreeIndexMetadata; import com.bigdata.btree.IndexMetadata; import com.bigdata.btree.keys.KV; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.htree.HTree; +import com.bigdata.rwstore.IRWStrategy; +import com.bigdata.util.concurrent.LatchedExecutor; /** * Test suite for {@link DumpJournal}. @@ -66,8 +74,10 @@ /** * @param name */ - public TestDumpJournal(String name) { + public TestDumpJournal(final String name) { + super(name); + } /** @@ -361,4 +371,254 @@ } + /** + * Unit test for {@link DumpJournal} with concurrent updates against the + * backing store. This is intended primarily to detect failures to protect + * against the recycling model associated with the {@link IRWStrategy}. + * + * @throws Exception + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> + * DumpJournal does not protect against concurrent updates (NSS) </a> + */ + public void test_dumpJournal_concurrent_updates() throws Exception { + + final String PREFIX = "testIndex#"; + final int NUM_INDICES = 4; + + Journal src = getStore(getProperties()); + + try { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + src.registerIndex(new IndexMetadata(name, UUID.randomUUID())); + { + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ndx.getCounter().incrementAndGet(); + + } + + } + + } + + } + + // commit the journal (!) + src.commit(); + + /** + * Task to run various DumpJournal requests. + */ + final class DumpTask implements Callable<Void> { + + private final Journal src; + + public DumpTask(final Journal src) { + + this.src = src; + + } + @Override + public Void call() throws Exception { + + new DumpJournal(src) + .dumpJournal(false/* dumpHistory */, + true/* dumpPages */, + false/* dumpIndices */, false/* showTuples */); + + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + true/* dumpPages */, true/* dumpIndices */, + false/* showTuples */); + + // test again w/o dumpPages + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + false/* dumpPages */, + true/* dumpIndices */, false/* showTuples */); + + return (Void) null; + + } + + } + + final class UpdateTask implements Callable<Void> { + + private final Journal src; + + public UpdateTask(final Journal src) { + + this.src = src; + + } + @Override + public Void call() throws Exception { + + /* + * Now write some more data, going through a series of commit + * points. This let's us check access to historical commit points. + */ + for (int j = 0; j < 10; j++) { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ndx.getCounter().incrementAndGet(); + + } + + } + + } + + log.info("Will commit"); + src.commit(); + log.info("Did commit"); + + } + + return (Void) null; + } + } + + final List<FutureTask<Void>> tasks1 = new LinkedList<FutureTask<Void>>(); + final List<FutureTask<Void>> tasks2 = new LinkedList<FutureTask<Void>>(); + final List<FutureTask<Void>> tasksAll = new LinkedList<FutureTask<Void>>(); + + // Setup executor that limits parallelism. + final LatchedExecutor executor1 = new LatchedExecutor( + src.getExecutorService(), 1/* nparallel */); + + // Setup executor that limits parallelism. + final LatchedExecutor executor2 = new LatchedExecutor( + src.getExecutorService(), 1/* nparallel */); + + try { + + // Tasks to run. + tasks1.add(new FutureTaskMon<Void>(new DumpTask(src))); + tasks1.add(new FutureTaskMon<Void>(new DumpTask(src))); + tasks1.add(new FutureTaskMon<Void>(new DumpTask(src))); + + tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src))); + tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src))); + tasks2.add(new FutureTaskMon<Void>(new UpdateTask(src))); + + // Schedule the tasks. + for (FutureTask<Void> ft : tasks1) + executor1.execute(ft); + for (FutureTask<Void> ft : tasks2) + executor2.execute(ft); + + log.info("Blocking for futures"); + + // Wait for tasks. + tasksAll.addAll(tasks1); + tasksAll.addAll(tasks2); + int ndone = 0; + for (FutureTask<Void> ft : tasksAll) { + + /* + * Check Future. + * + * Note: sanity check for test termination w/ timeout. + */ + + try { + ft.get(2, TimeUnit.MINUTES); + } catch (ExecutionException ex) { + log.error("ndone=" + ndone, ex); + throw ex; + } + + log.info("ndone=" + ndone); + ndone++; + } + + } finally { + + // Ensure tasks are terminated. + for (FutureTask<Void> ft : tasksAll) { + + ft.cancel(true/*mayInterruptIfRunning*/); + + } + + } + + if (src.isStable()) { + + src = reopenStore(src); + + // Try running the DumpTask again. + new DumpTask(src).call(); + + } + + } finally { + + src.destroy(); + + } + + } + /** Stress test to look for different failure modes. */ + public void _testStress_dumpJournal_concurrent_updates() throws Exception { + final int LIMIT = 20; + for (int i = 0; i < LIMIT; i++) { + if (i > 1) + setUp(); + try { + test_dumpJournal_concurrent_updates(); + } catch (Exception ex) { + log.fatal("FAILURE: i=" + i + ", cause=" + ex); + } + if (i + 1 < LIMIT) + tearDown(); + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |