From: <mar...@us...> - 2013-11-27 14:23:38
|
Revision: 7599 http://bigdata.svn.sourceforge.net/bigdata/?rev=7599&view=rev Author: martyncutcher Date: 2013-11-27 14:23:29 +0000 (Wed, 27 Nov 2013) Log Message: ----------- Updates for pipeline resync and postHACommit for jenkins CI job submission Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -1807,6 +1807,9 @@ final Checkpoint checkpoint; try { checkpoint = Checkpoint.load(store, addrCheckpoint); + + if (log.isDebugEnabled()) + log.debug("Checkpoint rootAddr: " + checkpoint.getRootAddr()); } catch (Throwable t) { throw new RuntimeException("Could not load Checkpoint: store=" + store + ", addrCheckpoint=" Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -1474,19 +1474,20 @@ } catch (Throwable t) { - final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); - if (pe != null) { - log.error("Really need to remove service " + pe.getProblemServiceId()); - final UUID psid = pe.getProblemServiceId(); - - try { - member.getActor().forceRemoveService(psid); - } catch (Exception e) { - log.warn("Problem on node removal", e); - - throw new RuntimeException(e); - } - } + // ORIGINAL TESTED GREEN for KillB and KillC +// final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); +// if (pe != null) { +// log.error("Really need to remove service " + pe.getProblemServiceId()); +// final UUID psid = pe.getProblemServiceId(); +// +// try { +// member.getActor().forceRemoveService(psid); +// } catch (Exception e) { +// log.warn("Problem on node removal", e); +// +// throw new RuntimeException(e); +// } +// } // Note: Also see retrySend()'s catch block. @@ -1560,7 +1561,25 @@ return; - } finally { + } catch (Exception t) { + + // THIS LOCATION WORKS! +// final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); +// if (pe != null) { +// log.error("Really need to remove service " + pe.getProblemServiceId()); +// final UUID psid = pe.getProblemServiceId(); +// +// try { +// member.getActor().forceRemoveService(psid); +// } catch (Exception e) { +// log.warn("Problem on node removal", e); +// +// throw new RuntimeException(e); +// } +// } + + throw t; + } finally { unlock(); @@ -1675,120 +1694,142 @@ * Task to send() a buffer to the follower. */ static private class SendBufferTask<S extends HAPipelineGlue> implements - Callable<Void> { + Callable<Void> { - private final QuorumMember<S> member; - private final long token; // member MUST remain leader for token. - private final IHASyncRequest req; - private final IHAWriteMessage msg; - private final ByteBuffer b; - private final PipelineState<S> downstream; - private final HASendService sendService; - private final Lock sendLock; + private final QuorumMember<S> member; + private final long token; // member MUST remain leader for token. + private final IHASyncRequest req; + private final IHAWriteMessage msg; + private final ByteBuffer b; + private final PipelineState<S> downstream; + private final HASendService sendService; + private final Lock sendLock; - public SendBufferTask(final QuorumMember<S> member, final long token, - final IHASyncRequest req, final IHAWriteMessage msg, - final ByteBuffer b, final PipelineState<S> downstream, - final HASendService sendService, final Lock sendLock) { + public SendBufferTask(final QuorumMember<S> member, final long token, + final IHASyncRequest req, final IHAWriteMessage msg, + final ByteBuffer b, final PipelineState<S> downstream, + final HASendService sendService, final Lock sendLock) { - this.member = member; - this.token = token; - this.req = req; // Note: MAY be null. - this.msg = msg; - this.b = b; - this.downstream = downstream; - this.sendService = sendService; - this.sendLock = sendLock; + this.member = member; + this.token = token; + this.req = req; // Note: MAY be null. + this.msg = msg; + this.b = b; + this.downstream = downstream; + this.sendService = sendService; + this.sendLock = sendLock; - } + } - public Void call() throws Exception { + public Void call() throws Exception { - /* - * Lock ensures that we do not have more than one request on the - * write pipeline at a time. - */ + /* + * Lock ensures that we do not have more than one request on the + * write pipeline at a time. + */ - sendLock.lock(); + sendLock.lock(); - try { + try { - doRunWithLock(); - - return null; - - } finally { - - sendLock.unlock(); - - } + doRunWithLock(); - } - - private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + return null; - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b, msg.getToken()); + } finally { - try { + sendLock.unlock(); - // Get Future for receive outcome on the remote service (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + } - try { + } - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for ALL - * writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + private void doRunWithLock() throws InterruptedException, + ExecutionException, IOException { - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + try { + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService.send(b, msg.getToken()); - } catch (Throwable t) { - // check inner cause for downstream PipelineException - final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); - if (pe != null) { - throw pe; // throw it upstream - } - - // determine next pipeline service id - final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], t); - - throw new PipelineException(priorAndNext[1], t); - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + try { - } - - } + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, msg); + + try { + + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); + + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } + + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + // throw pe; // throw it upstream - already should have been + // handled + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + // determine next pipeline service id + log.warn("Problem with downstream service: " + problemService, + t); + + // Carry out remedial work directly - BAD + log.error("Really need to remove service " + problemService); + + try { + member.getActor().forceRemoveService(problemService); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + + throw new PipelineException(problemService, t); + + } + } + } /** * Lock used to ensure that at most one message is being sent along the @@ -1934,8 +1975,8 @@ final HAMessageWrapper wrappedMsg = new HAMessageWrapper( req, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + // Get Future for receive() outcome on local service. + final Future<Void> futRcv = receiveService.receiveData(wrappedMsg, b); try { @@ -1946,7 +1987,7 @@ // Verify token remains valid. member.getQuorum().assertQuorum(token); // Await the future. - return futSnd.get(1000, TimeUnit.MILLISECONDS); + return futRcv.get(1000, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { // Timeout. Ignore and retry loop. Thread.sleep(100/* ms */); @@ -1957,7 +1998,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/*mayInterruptIfRunning*/); + futRcv.cancel(true/*mayInterruptIfRunning*/); } @@ -1995,74 +2036,76 @@ this.receiveService = receiveService; } - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for receive() outcome on local service. + final Future<Void> futRcv = receiveService.receiveData(wrappedMsg, + b); + try { + try { - try { + // Get future for receive outcome on the remote + // service. + final Future<Void> futDRcv = downstream.service + .receiveAndReplicate(req, msg); - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + try { - try { + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futRcv.isDone() && !futDRcv.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futRcv.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futDRcv.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futRcv.get(); + futDRcv.get(); - /* - * Await the Futures, but spend more time - * waiting on the local Future and only check - * the remote Future every second. Timeouts are - * ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service is - * not joined with the met quorum. If fact, services - * MUST replicate writes regardless of whether or not - * they are joined with the met quorum, but only while - * there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + } finally { + if (!futDRcv.isDone()) { + // cancel remote Future unless done. + futDRcv.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec - .cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + // Is it possible that this cancel conflicts with throwing + // the PipelineException? + // cancel the local Future. + futRcv.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], + t); - } catch (Throwable t) { - // determine next pipeline service id - final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], t); - - throw new PipelineException(priorAndNext[1], t); - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + throw new PipelineException(priorAndNext[1], t); + } - // done - return null; - } + // done + return null; + } } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -27,14 +27,18 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -52,6 +56,7 @@ import org.apache.log4j.Logger; import com.bigdata.btree.BytesUtil; +import com.bigdata.ha.PipelineException; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -962,8 +967,35 @@ boolean success = false; try { - doReceiveAndReplicate(client); - success = true; + while (!success) { + try { + log.warn("Receiving"); + doReceiveAndReplicate(client); + log.warn("DONE"); + success = true; + } catch (ClosedChannelException cce) { // closed then re-open + + final ServerSocket socket = server.socket(); + + log.warn("Closed upstream? " + socket.getChannel().isOpen(), cce); + + socket.bind(socket.getLocalSocketAddress()); + + server.socket().getChannel().isOpen(); + + awaitAccept(); + + log.warn("Creating new client"); + + client = new Client(server);//, sendService, addrNext); + + // save off reference and round we go + clientRef.set(client); + } catch (Throwable t) { + log.warn("Unexpected Error", t); + throw new RuntimeException(t); + } + } // success. return null; } finally { @@ -1110,7 +1142,8 @@ final int rdLen = client.client.read(tokenBB); for (int i = 0; i < rdLen; i++) { if (tokenBuffer[i] != token[tokenIndex]) { - log.warn("TOKEN MISMATCH"); + if (ntokenreads < 2) + log.warn("TOKEN MISMATCH"); tokenIndex = 0; if (tokenBuffer[i] == token[tokenIndex]) { tokenIndex++; @@ -1222,7 +1255,7 @@ } catch(Throwable t) { log.warn("Send downstream error", t); - throw new RuntimeException(t); + throw new RuntimeException(t); } } break; Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -509,7 +509,7 @@ while (nwritten < remaining) { - log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); + // log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); if (token != null && ntoken < token.length) { final ByteBuffer tokenBB = ByteBuffer.wrap(token); tokenBB.position(ntoken); @@ -551,10 +551,26 @@ * buffer. */ - final int nbytes = socketChannel.write(data); + final int nbytes; + if (log.isDebugEnabled()) { // add debug latency + final int limit = data.limit(); + if (data.position() < (limit-50000)) { + data.limit(data.position()+50000); + } + nbytes = socketChannel.write(data); + data.limit(limit); + + nwritten += nbytes; + log.debug("Written " + nwritten + " of total " + data.limit()); + + if (nwritten < limit) { + Thread.sleep(2); + } + } else { + nbytes = socketChannel.write(data); + nwritten += nbytes; + } - nwritten += nbytes; - if (log.isTraceEnabled()) log.trace("Sent " + nbytes + " bytes with " + nwritten + " of out " + remaining + " written so far"); @@ -729,4 +745,21 @@ } + public void resetSocket() { + try { + final SocketChannel socketChannel = this.socketChannel.get(); + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException ex) { + log.error("Ignoring exception during reetSocket: " + ex, ex); + } finally { + this.socketChannel.set(null); + } + } + } finally { + reopenChannel(); + } + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; import com.bigdata.io.FileChannelUtility; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.rawstore.AbstractRawWormStore; import com.bigdata.rawstore.Bytes; @@ -687,4 +688,14 @@ // public boolean isFlushed() { // return true; // } + + + /** + * Default StoreState implementation to be overridden + * as appropriate. + */ + synchronized public StoreState getStoreState() { + return new StoreState(); + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -3380,7 +3380,9 @@ metaStartAddr, metaBitsAddr, old.getStoreType(), old.getCreateTime(), old.getCloseTime(), old.getVersion(), store.checker); - + + + log.warn("CommitRecordIndexAddr: " + commitRecordIndexAddr + ", strategy: " + _bufferStrategy.getClass() + ", physicalAddress: " + _bufferStrategy.getPhysicalAddress(commitRecordIndexAddr)); } /** @@ -3545,7 +3547,18 @@ private void commitHA() { try { + + if (log.isDebugEnabled()) { + final long rootAddr = store._commitRecordIndex.getRootAddr(); + log.debug("CommitRecordIndex RootAddr: " + rootAddr + ", physical address: " + store.getPhysicalAddress(rootAddr)); + + if (_bufferStrategy instanceof IRWStrategy) { + final RWStore rwstore = ((RWStrategy) _bufferStrategy).getStore(); + log.debug(rwstore.showAllocatorList()); + } + } + if(!prepare2Phase()) { // PREPARE rejected. @@ -4623,6 +4636,8 @@ * * Note: For this code path we DO NOT cache the index view. */ + if (log.isDebugEnabled()) + log.debug("reading CommitRecordIndex from PhysicalAddress: " + _bufferStrategy.getPhysicalAddress(addr)); ndx = (CommitRecordIndex) BTree.load(this, addr, false/* readOnly */); @@ -7589,6 +7604,9 @@ } + if (log.isDebugEnabled()) + log.debug("RBV with CommitRecordIndex at PhysicalAddress: " + _bufferStrategy.getPhysicalAddress(rootBlock.getCommitRecordIndexAddr())); + } // doInnerRun() } // Commit2PhaseTask Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -30,6 +30,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.io.FileChannelUtility; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; /** * Implements logic to read from and write on a buffer. This is sufficient Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -381,6 +381,17 @@ final boolean dumpHistory, final boolean dumpPages, final boolean dumpIndices, final boolean showTuples) { + /** + * Start a transaction. This will bracket all index access and protect + * the data on the journal from concurrent recycling. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> + * DumpJournal does not protect against concurrent updates (NSS) + * </a> + */ + final long tx = journal.newTx(ITx.READ_COMMITTED); + try { + final FileMetadata fmd = journal.getFileMetadata(); if (fmd != null) { @@ -600,6 +611,9 @@ dumpPages, dumpIndices, showTuples); } + } finally { + journal.abort(tx); + } } @@ -614,7 +628,7 @@ } - public void dumpGlobalRowStore(final PrintWriter out) { + private void dumpGlobalRowStore(final PrintWriter out) { final SparseRowStore grs = journal.getGlobalRowStore(journal .getLastCommitTime()); @@ -826,7 +840,7 @@ * * @return */ - public String dumpRawRecord(final long addr) { + private String dumpRawRecord(final long addr) { if (journal.getBufferStrategy() instanceof IRWStrategy) { /** @@ -984,6 +998,7 @@ } } case Stream: + @SuppressWarnings("unused") final Stream stream = (Stream) ndx; /* * Note: We can't do anything here with a Stream, but we do @@ -1004,41 +1019,4 @@ } - /** - * Return the data in the buffer. - */ - public static byte[] getBytes(ByteBuffer buf) { - - if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0 - && buf.limit() == buf.capacity()) { - - /* - * Return the backing array. - */ - - return buf.array(); - - } - - /* - * Copy the expected data into a byte[] using a read-only view on the - * buffer so that we do not mess with its position, mark, or limit. - */ - final byte[] a; - { - - buf = buf.asReadOnlyBuffer(); - - final int len = buf.remaining(); - - a = new byte[len]; - - buf.get(a); - - } - - return a; - - } - } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import com.bigdata.counters.CounterSet; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rawstore.IMRMW; import com.bigdata.rawstore.IRawStore; @@ -276,6 +277,15 @@ */ public boolean useChecksums(); + /** + * A StoreState object references critical transient data that can be used + * to determine a degree of consistency between stores, specifically for an + * HA context. + * + * @return the StoreState + */ + public StoreState getStoreState(); + // /** // * Determines whether there are outstanding writes to the underlying store // */ Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -49,6 +49,7 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.IBufferAccess; import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumException; @@ -905,6 +906,11 @@ public WriteCacheService getWriteCacheService() { return m_store.getWriteCacheService(); } + + @Override + public StoreState getStoreState() { + return m_store.getStoreState(); + } // @Override // public boolean isFlushed() { Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -178,7 +178,9 @@ "Address committed but not set in transients"); } - m_store.showWriteCacheDebug(paddr); + m_store.showWriteCacheDebug(paddr); + + log.warn("Physical address " + paddr + " not accessible for Allocator of size " + m_size); return 0L; } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -101,6 +101,7 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockView; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumException; import com.bigdata.rawstore.IAllocationContext; @@ -1222,9 +1223,9 @@ /* * Utility to encapsulate RootBlock interpreation */ - static class RootBlockInfo { + class RootBlockInfo { - static int nextAllocation(final IRootBlockView rb) { + int nextAllocation(final IRootBlockView rb) { final long nxtOffset = rb.getNextOffset(); // next allocation to be made (in -32K units). @@ -1237,13 +1238,11 @@ return ret == 0 ? -(1 + META_ALLOCATION) : ret; } - /* - * Meta-Allocations stored as {int address; int[8] bits}, so each block - * holds 8*32=256 allocation slots of 1K totaling 256K. - * - * The returned int array is a flattened list of these int[9] blocks - */ - static int[] metabits(final IRootBlockView rb, final ReopenFileChannel reopener) throws IOException { + final int[] m_metabits; + final long m_storageStatsAddr; + final long m_lastDeferredReleaseTime; + + RootBlockInfo(final IRootBlockView rb) throws IOException { final long rawmbaddr = rb.getMetaBitsAddr(); /* @@ -1265,17 +1264,17 @@ */ final byte[] buf = new byte[metaBitsStore * 4]; - FileChannelUtility.readAll(reopener, ByteBuffer.wrap(buf), pmaddr); + FileChannelUtility.readAll(m_reopener, ByteBuffer.wrap(buf), pmaddr); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); // Can handle minor store version incompatibility strBuf.readInt(); // STORE VERSION - strBuf.readLong(); // Last Deferred Release Time + m_lastDeferredReleaseTime = strBuf.readLong(); // Last Deferred Release Time strBuf.readInt(); // cDefaultMetaBitsSize final int allocBlocks = strBuf.readInt(); - strBuf.readLong(); // m_storageStatsAddr + m_storageStatsAddr = strBuf.readLong(); // m_storageStatsAddr // step over those reserved ints for (int i = 0; i < cReservedMetaBits; i++) { @@ -1300,8 +1299,15 @@ * Meta-Allocations stored as {int address; int[8] bits}, so each block * holds 8*32=256 allocation slots of 1K totaling 256K. */ - return ret; + m_metabits = ret; } + + /* + * Meta-Allocations stored as {int address; int[8] bits}, so each block + * holds 8*32=256 allocation slots of 1K totaling 256K. + * + * The returned int array is a flattened list of these int[9] blocks + */ } /** @@ -1451,7 +1457,9 @@ for (int i = 0; i < m_metaBitsSize; i++) { m_metaBits[i] = strBuf.readInt(); } - m_metaTransientBits = (int[]) m_metaBits.clone(); + // m_metaTransientBits = (int[]) m_metaBits.clone(); + + syncMetaTransients(); final int numFixed = m_allocSizes.length; @@ -1478,6 +1486,18 @@ + ", " + m_metaBitsAddr); } } + + /** + * Uses System.arraycopy rather than clone() to duplicate the + * metaBits to the metaTransientBits, which will be faster. + */ + private void syncMetaTransients() { + if (m_metaTransientBits == null || m_metaTransientBits.length != m_metaBits.length) { + m_metaTransientBits = (int[]) m_metaBits.clone(); + } else { + System.arraycopy(m_metaBits, 0, m_metaTransientBits, 0, m_metaTransientBits.length); + } + } // /* // * Called when store is opened to make sure any deferred frees are @@ -2842,6 +2862,11 @@ isolatedWrites = isolatedWrites || fa.reset(m_writeCacheService, m_committedNextAllocation); } + /** + * Now clone the transient metabits for protection if this service becomes leader + */ + syncMetaTransients(); + if (!isolatedWrites) { /** * Now we should be able to unwind any unused allocators and unused @@ -3114,7 +3139,7 @@ // to provide control // writeFileSpec(); - m_metaTransientBits = (int[]) m_metaBits.clone(); + syncMetaTransients(); // Must be called from AbstractJournal commitNow after writeRootBlock // postCommit(); @@ -3500,6 +3525,9 @@ (b * cDefaultMetaBitsSize) + 1, cDefaultMetaBitsSize-1); if (ret != -1) { + // The assumption is that this bit is also NOT set in m_metaBits + assert !tstBit(m_metaBits, ret); + return ret; } } @@ -6194,14 +6222,40 @@ log.trace("Allocator " + index + ", size: " + xfa.m_size + ", startAddress: " + xfa.getStartAddr() + ", allocated: " + (xfa.getAllocatedSlots()/xfa.m_size)); } } - + + // Update m_metaBits addr and m_nextAllocation to ensure able to allocate as well as read! + { + final long nxtOffset = rbv.getNextOffset(); + + // next allocation to be made (in -32K units). + m_nextAllocation = -(int) (nxtOffset >> 32); + + if (m_nextAllocation == 0) { + throw new IllegalStateException("Invalid state for non-empty store"); + } + + m_committedNextAllocation = m_nextAllocation; + + final long savedMetaBitsAddr = m_metaBitsAddr; + // latched offset of the metabits region. + m_metaBitsAddr = -(int) nxtOffset; + + if (savedMetaBitsAddr != m_metaBitsAddr) + log.warn("Old metaBitsAddr: " + savedMetaBitsAddr + ", new metaBitsAddr: " + m_metaBitsAddr); + } + final ArrayList<FixedAllocator> nallocs = new ArrayList<FixedAllocator>(); // current metabits final int[] oldmetabits = m_metaBits; // new metabits - m_metaBits = RootBlockInfo.metabits(rbv, m_reopener); + final RootBlockInfo rbi = new RootBlockInfo(rbv); + m_metaBits = rbi.m_metabits; + // and grab the last deferred release and storageStats! + m_lastDeferredReleaseTime = rbi.m_lastDeferredReleaseTime; + m_storageStatsAddr = rbi.m_storageStatsAddr; + if(log.isTraceEnabled()) log.trace("Metabits length: " + m_metaBits.length); @@ -6903,6 +6957,16 @@ } + public String showAllocatorList() { + final StringBuilder sb = new StringBuilder(); + + for (int index = 0; index < m_allocs.size(); index++) { + final FixedAllocator xfa = m_allocs.get(index); + sb.append("Allocator " + index + ", size: " + xfa.m_size + ", startAddress: " + xfa.getStartAddr() + ", allocated: " + xfa.getAllocatedSlots() + "\n"); + } + + return sb.toString(); + } // /** // * // * @return whether WCS is flushed @@ -6913,6 +6977,75 @@ // return this.m_writeCacheService.isFlushed(); // } + public static class RWStoreState extends StoreState { + + /** + * Generated ID + */ + private static final long serialVersionUID = 4315400143557397323L; + + /* + * Transient state necessary for consistent ha leader transition + */ + int m_fileSize; + int m_nextAllocation; + int m_committedNextAllocation; + long m_minReleaseAge; + long m_lastDeferredReleaseTime; + long m_storageStatsAddr; + int m_allocsSize; + int m_metaBitsAddr; + int m_metaBitsSize; + + public boolean equals(final Object obj) { + if (obj == null || !(obj instanceof RWStoreState)) + return false; + final RWStoreState other = (RWStoreState) obj; + return m_fileSize == other.m_fileSize + && m_nextAllocation == other.m_nextAllocation + && m_committedNextAllocation == other.m_committedNextAllocation + && m_minReleaseAge == other.m_minReleaseAge + && m_lastDeferredReleaseTime == other.m_lastDeferredReleaseTime + && m_storageStatsAddr == other.m_storageStatsAddr + && m_allocsSize == other.m_allocsSize + && m_metaBitsAddr == other.m_metaBitsAddr + && m_metaBitsSize == other.m_metaBitsSize; + } + + public String toString() { + final StringBuilder sb = new StringBuilder(); + + sb.append("RWStoreState\n"); + sb.append("fileSize: " + m_fileSize + "\n"); + sb.append("nextAllocation: " + m_nextAllocation + "\n"); + sb.append("committedNextAllocation: " + m_committedNextAllocation + "\n"); + sb.append("minReleaseAge: " + m_minReleaseAge + "\n"); + sb.append("lastDeferredReleaseTime: " + m_lastDeferredReleaseTime + "\n"); + sb.append("storageStatsAddr: " + m_storageStatsAddr + "\n"); + sb.append("allocsSize: " + m_allocsSize + "\n"); + sb.append("metaBitsAddr: " + m_metaBitsAddr + "\n"); + sb.append("metaBitsSize: " + m_metaBitsSize + "\n"); + + return sb.toString(); + } + } + + public StoreState getStoreState() { + final RWStoreState ret = new RWStoreState(); + + ret.m_fileSize = m_fileSize; + ret.m_nextAllocation = m_nextAllocation; + ret.m_committedNextAllocation = m_committedNextAllocation; + ret.m_minReleaseAge = m_minReleaseAge; + ret.m_lastDeferredReleaseTime = m_lastDeferredReleaseTime; + ret.m_storageStatsAddr = m_storageStatsAddr; + ret.m_allocsSize = m_allocs.size(); + ret.m_metaBitsAddr = m_metaBitsAddr; + ret.m_metaBitsSize = m_metaBits.length; + + return ret; + } + // public void prepareForRebuild(final HARebuildRequest req) { // assert m_rebuildRequest == null; // Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -41,6 +41,7 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockView; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rawstore.IAllocationContext; @@ -497,6 +498,11 @@ return m_dirty; } + @Override + public StoreState getStoreState() { + throw new UnsupportedOperationException(); + } + // @Override // public boolean isFlushed() { // return true; Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -25,14 +25,25 @@ package com.bigdata.ha.pipeline; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.msg.HAWriteMessageBase; import com.bigdata.ha.msg.IHAWriteMessageBase; import com.bigdata.io.DirectBufferPool; @@ -169,22 +180,29 @@ public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException { - final long timeout = 5000; // ms + doSimpleExchange(); + } + + private void doSimpleExchange() throws InterruptedException, + ExecutionException, TimeoutException { + + final long timeout = 5000; // ms final ByteBuffer tst1 = getRandomData(50); - final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, + chk.checksum(tst1)); final ByteBuffer rcv1 = ByteBuffer.allocate(2000); final ByteBuffer rcv2 = ByteBuffer.allocate(2000); // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); - futSnd.get(timeout,TimeUnit.MILLISECONDS); - futRec1.get(timeout,TimeUnit.MILLISECONDS); - futRec2.get(timeout,TimeUnit.MILLISECONDS); + futSnd.get(timeout, TimeUnit.MILLISECONDS); + futRec1.get(timeout, TimeUnit.MILLISECONDS); + futRec2.get(timeout, TimeUnit.MILLISECONDS); assertEquals(tst1, rcv1); assertEquals(rcv1, rcv2); } - + public void testChecksumError() throws InterruptedException, ExecutionException { @@ -711,4 +729,139 @@ } } + public void testSimpleReset() throws InterruptedException, + ExecutionException, TimeoutException { + + doSimpleExchange(); + + sendServiceA.resetSocket(); + + doSimpleExchange(); + } + + /** + * The use of threaded tasks in the send/receive service makes it difficult to + * observer the socket state changes. + * + * So let's begin by writing some tests over the raw sockets. + * + * @throws IOException + */ + public void testDirectSockets() throws IOException { + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + ss.bind(serverAddr); + + final SocketChannel cs = SocketChannel.open(); + + cs.connect(serverAddr); + + final Random r = new Random(); + final byte[] data = new byte[200]; + r.nextBytes(data); + + final ByteBuffer src = ByteBuffer.wrap(data); + + cs.write(src); + + final byte[] dst = new byte[200]; + + final Socket readSckt1 = ss.accept(); + + InputStream instr = readSckt1.getInputStream(); + + instr.read(dst); + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // now write some more data into the channel and then close it + cs.write(ByteBuffer.wrap(data)); + + // close the client socket + cs.close(); + + // and see what happens when we try to accept the data + // we expect it to hang and timeout! + assertTimout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + ss.accept(); + + return null; + }}); + + // Now try writing some more data + try { + cs.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + // the old stream should be closed + try { + final int rdlen = instr.read(dst); // should be closed + fail("Expected closed socket exception, rdlen: " + rdlen); + } catch (Exception e) { + // expected; + } + + // if so then should we explicitly close its socket? + readSckt1.close(); + + final SocketChannel cs2 = SocketChannel.open(); + cs2.connect(serverAddr); + cs2.write(ByteBuffer.wrap(data)); + + // Now we should be able to accept the new write + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + av.set(ss.accept()); + + return null; + }}); + + // the new socket and associated stream should be good to go + av.get().getInputStream().read(dst); + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + + } + + private void assertTimout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + fail("Expected timeout"); + } catch (TimeoutException e) { + // that is expected + return; + } catch (Exception e) { + fail("Expected timeout"); + } finally { + es.shutdown(); + } + } + + private void assertNoTimout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + } catch (TimeoutException e) { + fail("Unexpected timeout"); + } catch (Exception e) { + fail("Unexpected Exception", e); + } finally { + es.shutdown(); + } + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -29,15 +29,23 @@ package com.bigdata.journal; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import com.bigdata.btree.AbstractBTreeTestCase; import com.bigdata.btree.BTree; import com.bigdata.btree.HTreeIndexMetadata; import com.bigdata.btree.IndexMetadata; import com.bigdata.btree.keys.KV; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.htree.HTree; +import com.bigdata.rwstore.IRWStrategy; +import com.bigdata.util.concurrent.LatchedExecutor; /** * Test suite for {@link DumpJournal}. @@ -66,8 +74,10 @@ /** * @param name */ - public TestDumpJournal(String name) { + public TestDumpJournal(final String name) { + super(name); + } /** @@ -361,4 +371,229 @@ } + /** + * Unit test for {@link DumpJournal} with concurrent updates against the + * backing store. This is intended primarily to detect failures to protect + * against the recycling model associated with the {@link IRWStrategy}. + * + * @throws Exception + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> + * DumpJournal does not protect against concurrent updates (NSS) </a> + */ + public void test_dumpJournal_concurrent_updates() throws Exception { + + final String PREFIX = "testIndex#"; + final int NUM_INDICES = 4; + + Journal src = getStore(getProperties()); + + try { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + src.registerIndex(new IndexMetadata(name, UUID.randomUUID())); + { + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ndx.getCounter().incrementAndGet(); + + } + + } + + } + + } + + // commit the journal (!) + src.commit(); + + /** + * Task to run various DumpJournal requests. + */ + final class DumpTask implements Callable<Void> { + + private final Journal src; + + public DumpTask(final Journal src) { + + this.src = src; + + } + + public Void call() throws Exception { + + new DumpJournal(src) + .dumpJournal(false/* dumpHistory */, + true/* dumpPages */, + false/* dumpIndices */, false/* showTuples */); + + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + true/* dumpPages */, true/* dumpIndices */, + false/* showTuples */); + + // test again w/o dumpPages + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + false/* dumpPages */, + true/* dumpIndices */, false/* showTuples */); + + return (Void) null; + + } + + } + + final class UpdateTask implements Callable<Void> { + + private final Journal src; + + public UpdateTask(final Journal src) { + + this.src = src; + + } + + public Void call() throws Exception { + + /* + * Now write some more data, going through a series of commit + * points. This let's us check access to historical commit points. + */ + for (int j = 0; j < 10; j++) { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ... [truncated message content] |