From: <tho...@us...> - 2013-10-31 13:29:53
|
Revision: 7503 http://bigdata.svn.sourceforge.net/bigdata/?rev=7503&view=rev Author: thompsonbry Date: 2013-10-31 13:29:45 +0000 (Thu, 31 Oct 2013) Log Message: ----------- Import of Martyn's code with my initial review edits. Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Added Paths: ----------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1575,7 +1575,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf) { - + + if (buf == null) + return "NULL"; + return toHexString(buf, buf.length); } @@ -1591,6 +1594,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf, int n) { + + if (buf == null) + return "NULL"; + n = n < buf.length ? n : buf.length; final StringBuffer out = new StringBuffer(); for (int i = 0; i < n; i++) { Added: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha; + +import java.util.UUID; + +/** + * PipelineException is thrown from RMI calls to communicate + * the root cause of a pipeline problem. The caller is then able + * to take action: for example to remove the problem service + * from the quorum. + */ +public class PipelineException extends RuntimeException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** The UUID of the service that could not be reached. */ + private final UUID serviceId; + + public PipelineException(final UUID serviceId, final Throwable t) { + super(t); + + this.serviceId = serviceId; + } + + /** Return the UUID of the service that could not be reached. */ + public UUID getProblemServiceId() { + return serviceId; + } + +} Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1184,7 +1184,7 @@ final IHAWriteMessage msg) { // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); + super(msg.getSize(),msg.getChk(),msg.getToken()); this.req = req; // MAY be null; this.msg = msg; @@ -1473,7 +1473,22 @@ innerReplicate(0/* retryCount */); } catch (Throwable t) { + + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + log.error("Really need to remove service " + pe.getProblemServiceId()); + final UUID psid = pe.getProblemServiceId(); + + try { + member.getActor().forceRemoveService(psid); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + } + // Note: Also see retrySend()'s catch block. if (InnerCause.isInnerCause(t, InterruptedException.class) // || InnerCause.isInnerCause(t, CancellationException.class) @@ -1714,7 +1729,7 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b); + final Future<Void> futSnd = sendService.send(b, msg.getToken()); try { @@ -1754,6 +1769,18 @@ } } + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + throw pe; // throw it upstream + } + + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); @@ -2022,6 +2049,12 @@ } } + } catch (Throwable t) { + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -302,7 +302,7 @@ this.compressorKey = compressorKey; } - + /** * The initial version. * Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -28,7 +28,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Random; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.pipeline.HAReceiveService; import com.bigdata.ha.pipeline.HASendService; @@ -55,7 +57,38 @@ /** The Alder32 checksum of the bytes to be transfered. */ private int chk; + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + private byte[] token; + + static private Random r = new Random(); + static final private int TOKEN_SIZE = 8; + + static private byte[] genToken() { + final byte[] token = new byte[TOKEN_SIZE]; + + while (!unique1(token)) { + r.nextBytes(token); + } + + return token; + } + /** + * Checks that the first byte is not repeated in the + * remaining bytes, this simplifies search for the token + * in the input stream. + */ + static private boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** * * @param sze * The #of bytes of data to be transfered. @@ -63,6 +96,10 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { + this(sze, chk, genToken()); + } + + public HAWriteMessageBase(final int sze, final int chk, final byte[] token) { if (sze <= 0) throw new IllegalArgumentException(); @@ -71,6 +108,8 @@ this.chk = chk; + this.token = token; + } /** @@ -115,7 +154,7 @@ final IHAWriteMessageBase t = (IHAWriteMessageBase) obj; - return sze == t.getSize() && chk == t.getChk(); + return sze == t.getSize() && chk == t.getChk() && (BytesUtil.compareBytes(t.getToken(), getToken()) == 0); } @@ -143,6 +182,15 @@ chk = in.readInt(); + // read token + final int tlen = in.readInt(); + if (tlen == 0) { + token = null; + } else { + token = new byte[tlen]; + in.read(token); + } + } public void writeExternal(final ObjectOutput out) throws IOException { @@ -153,6 +201,17 @@ out.writeInt(chk); + if (token == null) { + out.writeInt(0); + } else { + out.writeInt(token.length); + out.write(token); + } } + @Override + public byte[] getToken() { + return token; + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -37,4 +37,6 @@ /** The Alder32 checksum of the bytes to be transfered. */ int getChk(); + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + byte[] getToken(); } \ No newline at end of file Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -51,6 +51,7 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -564,7 +565,7 @@ try { readFuture.get(); } catch (Exception e) { - log.warn(e, e); + log.warn("Pipeline should have been drained", e); } lock.lockInterruptibly(); @@ -579,6 +580,7 @@ } finally { final Client client = clientRef.get(); if (client != null) { + log.warn("Closing client"); client.close(); } } @@ -958,29 +960,44 @@ } -// boolean success = false; -// try { + boolean success = false; + try { doReceiveAndReplicate(client); -// success = true; + success = true; // success. return null; -// } finally { -// try { -// if(success) { -// ack(client); -// } else { -// nack(client); -// } -// } catch (IOException ex) { -// // log and ignore. -// log.error(ex, ex); -// } -// } + } finally { + try { + if (!success) { + // Drain, assuming that localBuffer is sized to be able to receive the full message + // TODO; confirm this assumption + if (localBuffer.capacity() < message.getSize()) + log.error("Insufficient buffer capacity"); + + // TODO: confirm that it is not possible for the next message to be sent to the pipeline since the + // RMI may have already failed and the next message could be on the way. If so the drain may read the + // start of the next message. + final int startDrain = localBuffer.position(); + final int msgSize = message.getSize(); + log.warn("Start drain at " + startDrain + ", message size: " + msgSize + ", blocking mode: " + client.client.isBlocking()); + while(localBuffer.position() < msgSize) { + if (client.client.read(localBuffer) <= 0) // either -1 or no bytes available + break; + } + log.warn("Drained the pipe of " + (localBuffer.position()-startDrain) + " bytes"); + } + } catch (IOException ex) { + // log and ignore. + log.error(ex, ex); + } + } } // call. private void doReceiveAndReplicate(final Client client) throws Exception { + + log.warn("doReceiveAndReplicate"); /* * We should now have parameters ready in the WriteMessage and can @@ -998,6 +1015,17 @@ // for debug retain number of low level reads int reads = 0; + // setup token values to search for any provided token prefix + final byte[] token = message.getToken(); + + boolean foundStart = token == null; // if null then not able to check + int tokenIndex = 0; + final byte[] tokenBuffer = token == null ? null : new byte[token.length]; + final ByteBuffer tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(token)); + while (rem > 0 && !EOS) { // block up to the timeout. @@ -1049,15 +1077,68 @@ } + final Set<SelectionKey> keys = client.clientSelector .selectedKeys(); final Iterator<SelectionKey> iter = keys.iterator(); + int ntokenreads = 0; + int ntokenbytematches = 0; + while (iter.hasNext()) { iter.next(); iter.remove(); + + if (!foundStart) { + if (log.isDebugEnabled()) + log.debug("Looking for token, reads: " + ntokenreads); + // Note that the logic for finding the token bytes depends on the + // first byte in the token being unique! + // We have to be a bit clever to be sure we do not read beyond the + // token and therefore complicate the reading into the localBuffer. + // This is optimized for the normal case where the key token is read + // as the next bytes from the stream. In the worst case scenario this + // could read large amounts of data only a few bytes at a time, however + // this is not in reality a significant overhead. + while (tokenIndex < token.length ) { + final int remtok = token.length - tokenIndex; + tokenBB.limit(remtok); + tokenBB.position(0); + + final int rdLen = client.client.read(tokenBB); + for (int i = 0; i < rdLen; i++) { + if (tokenBuffer[i] != token[tokenIndex]) { + log.warn("TOKEN MISMATCH"); + tokenIndex = 0; + if (tokenBuffer[i] == token[tokenIndex]) { + tokenIndex++; + } + } else { + tokenIndex++; + ntokenbytematches++; + } + } + + ntokenreads++; + if (ntokenreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + ntokenreads); + } + + foundStart = tokenIndex == token.length; + } + + if (!foundStart) { // not sufficient data ready + // if (log.isDebugEnabled()) + log.warn("Not found token yet!"); + continue; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); + } + } final int rdlen = client.client.read(localBuffer); @@ -1099,41 +1180,53 @@ * * Note: loop since addrNext might change asynchronously. */ - while(true) { - if (rdlen != 0 && addrNextRef.get() != null) { - if (log.isTraceEnabled()) - log.trace("Incremental send of " + rdlen + " bytes"); - final ByteBuffer out = localBuffer.asReadOnlyBuffer(); - out.position(localBuffer.position() - rdlen); - out.limit(localBuffer.position()); - synchronized (sendService) { - /* - * Note: Code block is synchronized on [downstream] - * to make the decision to start the HASendService - * that relays to [addrNext] atomic. The - * HASendService uses [synchronized] for its public - * methods so we can coordinate this lock with its - * synchronization API. - */ - if (!sendService.isRunning()) { - /* - * Prepare send service for incremental - * transfers to the specified address. - */ - // Check for termination. - client.checkFirstCause(); - // Note: use then current addrNext! - sendService.start(addrNextRef.get()); - continue; - } - } - // Check for termination. - client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); - } - break; - } + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { + if (log.isTraceEnabled()) + log.trace("Incremental send of " + rdlen + + " bytes"); + final ByteBuffer out = localBuffer + .asReadOnlyBuffer(); + out.position(localBuffer.position() - rdlen); + out.limit(localBuffer.position()); + synchronized (sendService) { + /* + * Note: Code block is synchronized on + * [downstream] to make the decision to start + * the HASendService that relays to [addrNext] + * atomic. The HASendService uses [synchronized] + * for its public methods so we can coordinate + * this lock with its synchronization API. + */ + if (!sendService.isRunning()) { + /* + * Prepare send service for incremental + * transfers to the specified address. + */ + // Check for termination. + client.checkFirstCause(); + // Note: use then current addrNext! + sendService.start(addrNextRef.get()); + continue; + } + } + // Check for termination. + client.checkFirstCause(); + + // Send and await Future, sending message token if at start of buffer + if (out.position() == 0) { + log.warn("Sending token " + BytesUtil.toHexString(message.getToken())); + } + try { + sendService.send(out, out.position() == 0 ? message.getToken() : null).get(); + } catch(Throwable t) { + log.warn("Send downstream error", t); + + throw new RuntimeException(t); + } + } + break; + } } } // while( rem > 0 ) @@ -1295,6 +1388,7 @@ localBuffer.limit(message.getSize()); localBuffer.position(0); messageReady.signalAll(); + if (log.isTraceEnabled()) log.trace("Will accept data for message: msg=" + msg); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -38,6 +38,8 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; + /** * A service for sending raw {@link ByteBuffer}s across a socket. This service * supports the HA write pipeline. This service is designed to be paired with an @@ -281,7 +283,11 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer) { +// public Future<Void> send(final ByteBuffer buffer) { +// return send(buffer, null); +// } + + public Future<Void> send(final ByteBuffer buffer, final byte[] token) { if (buffer == null) throw new IllegalArgumentException(); @@ -300,7 +306,9 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + log.warn("Sending message with token: " + BytesUtil.toHexString(token)); + + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), token)); } @@ -402,13 +410,14 @@ * * @param buffer * The buffer whose data are to be sent. + * @param token * * @return The task which will send the data to the configured * {@link InetSocketAddress}. */ - protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { + protected Callable<Void> newIncSendTask(final ByteBuffer buffer, final byte[] token) { - return new IncSendTask(buffer); + return new IncSendTask(buffer, token); } @@ -460,8 +469,9 @@ // private final SocketChannel socketChannel; private final ByteBuffer data; + private final byte[] token; - public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { + public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data, final byte[] token) { // if (socketChannel == null) // throw new IllegalArgumentException(); @@ -472,12 +482,12 @@ // this.socketChannel = socketChannel; this.data = data; - + this.token = token; } public Void call() throws Exception { - // defer until we actually run. + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); if (!isRunning()) @@ -494,9 +504,22 @@ try { + int ntoken = 0; int nwritten = 0; while (nwritten < remaining) { + + log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); + if (token != null && ntoken < token.length) { + final ByteBuffer tokenBB = ByteBuffer.wrap(token); + tokenBB.position(ntoken); + + ntoken += socketChannel.write(tokenBB); + + log.warn("Wrote " + ntoken + " token bytes"); + + continue; + } /* * Write the data. Depending on the channel, will either Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -2289,16 +2289,36 @@ abstract protected void doMemberAdd(); - abstract protected void doMemberRemove(); + final protected void doMemberRemove() { + doMemberRemove(serviceId); + } + abstract protected void doMemberRemove(UUID serviceId); + abstract protected void doCastVote(long lastCommitTime); - abstract protected void doWithdrawVote(); + final protected void doWithdrawVote() { + doWithdrawVote(serviceId); + } + abstract protected void doWithdrawVote(UUID serviceId); + abstract protected void doPipelineAdd(); - abstract protected void doPipelineRemove(); + final protected void doPipelineRemove() { + doPipelineRemove(serviceId); + } + abstract protected void doPipelineRemove(UUID serviceId); + + @Override + public void forceRemoveService(final UUID psid) { + doMemberRemove(psid); + doWithdrawVote(psid); + doPipelineRemove(psid); + doServiceLeave(psid); + } + /** * Invoked when our client will become the leader to (a) reorganize the * write pipeline such that our client is the first service in the write @@ -2396,8 +2416,12 @@ abstract protected void doServiceJoin(); - abstract protected void doServiceLeave(); + final protected void doServiceLeave() { + doServiceLeave(serviceId); + } + abstract protected void doServiceLeave(UUID serviceId); + abstract protected void doSetToken(long newToken); // abstract protected void doSetLastValidToken(long newToken); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -205,4 +205,21 @@ */ void clearToken(); + /** + * Remove the service from the quorum. This should be called when a problem + * with the service is reported to the quorum leader, for example as a + * result of a failed RMI request or failed socket level write replication. + * Such errors arise either from network connectivity or service death. + * These problems will generally be cured, but the heatbeat timeout to cure + * the problem can cause write replication to block. This method may be used + * to force the timely reordering of the pipeline in order to work around + * the replication problem. This is not a permenant disabling of the service + * - the service may be restarted or may recover and reenter the quorum at + * any time. + * + * @param serviceId + * The UUID of the service to be removed. + */ + public void forceRemoveService(UUID serviceId); + } Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -60,6 +60,9 @@ { final TestSuite suite = new TestSuite("write pipeline"); + + // Test message buffer framing idiom (not required for CI). + // suite.addTestSuite(TestBufferFraming.class); // Test of HASendService and HAReceiveService (2 nodes). suite.addTestSuite(TestHASendAndReceive.class); Added: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,191 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.ha.pipeline; + +import java.util.Random; + +import junit.framework.TestCase; + +/** + * This is a test suite for the buffer framing idiom. + * <p> + * BufferFraming is required to ensure that pipeline buffers are correctly + * identified by RMI messages. + * <p> + * There is currently a problem where receive tasks can be interrupted leaving + * data in the pipeline and subsequent data reads are unable to process the + * correct data. + * <p> + * A proposed solution is to prefix the buffer with an 8 byte identifier + * suitably unique to stochastically avoid problems of random matching errors. + * <p> + * This test class tests finding the offset of the long value in otherwise + * random data. Since it must read each byte this is complicated by the + * requirement to window shift. This complexity can be somewhat simplified by + * ensuring that each byte in the long key is unique. + * <p> + * Note: only the first byte in the key needs to be unique, guaranteeing that + * if a match attempt fails it is only necessary to check the failing character + * to see if that could be the start of a new match attempt. + * + * @author Martyn Cutcher + */ +public class TestBufferFraming extends TestCase { + +// private static final Logger log = Logger +// .getLogger(junit.framework.Test.class); + + boolean unique(final byte[] bytes) { + for (int i = 0; i < bytes.length; i++) { + final byte b = bytes[i]; + for (int t = i + 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + } + + return true; + } + + boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** + * Returns n bytes of unique values. + * + * The unique values are important to simplify testing + * against data streams. + * + * In fact the only important aspect is that the initial byte + * is unique! This is sufficient to identify the start point + * of the key in a data stream. + */ + byte[] genKey(Random r, final int size) { + final byte[] ret = new byte[size]; + + while (!unique1(ret)) { + r.nextBytes(ret); + } + + return ret; + } + + /** + * Functional test on performance of key generation + */ + public void testGenKey() { + final Random r = new Random(); + + final int keys = 100000000; // 100M + + final long start = System.currentTimeMillis(); + for (int i = 0; i < keys; i++) { + genKey(r, 8); + } + final long end = System.currentTimeMillis(); + + final long throughputms = (keys / (end - start)); + + assertTrue(throughputms > 10000L); // should be able to produce more than 10M keys per second + } + + /** + * Let's write a string into the middle of a load + * of random data and identify it with our generated key. + */ + public void testEmbeddedMessage() { + doEmbeddedMessage(); + } + + public void testStressEmbeddedMessage() { + for (int t = 0; t < 1000; t++) { + doEmbeddedMessage(); + } + } + + public void doEmbeddedMessage() { + final Random r = new Random(); + final byte[] buffer = new byte[10000000]; // 10M bytes + r.nextBytes(buffer); + + final String tst = "Hello World"; + final byte[] tstbytes = tst.getBytes(); + + final byte[] key = genKey(r, 8); + + int offset = r.nextInt(9000000); // somewhere in first 9M bytes + + // copy string into buffer + copy(key, 0, buffer, offset); + copy(tstbytes, 0, buffer, offset+key.length); + + final int position = find(key, buffer); + + assertTrue(position == offset); + + final byte[] copy = new byte[tstbytes.length]; + copy(buffer, position+key.length, copy, 0); + + final String tstRead = new String(copy); + + assertTrue(tstRead.equals(tst)); + } + + void copy(byte[] src, int srcOffset, byte[] dst, int dstOffset) { + int len = Math.min(src.length, dst.length); + + for (int i = 0; i < len; i++) { + dst[dstOffset+i] = src[srcOffset+i]; + } + } + + int find(final byte[] key, final byte[] buffer) { + final int endPos = buffer.length - key.length; + for (int i = 0; i < endPos; i++) { + if (buffer[i] == key[0]) { + boolean match = true; + for (int t = 1; match && t < key.length; t++) { + match = buffer[i+t] == key[t]; + + if (!match) { + i += t-1; + } + } + if (match) { + return i; + } + } + } + + return -1; + } +} Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -24,6 +24,11 @@ package com.bigdata.ha.pipeline; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Random; @@ -108,6 +113,32 @@ chk = null; } + + /** + * Need to check base message serialization + * + * @throws IOException + * @throws ClassNotFoundException + */ + public void testMessageSerialization() throws IOException, ClassNotFoundException { + final ByteArrayOutputStream boutstr = new ByteArrayOutputStream(); + final ObjectOutputStream obout = new ObjectOutputStream(boutstr); + + final HAWriteMessageBase src_msg = new HAWriteMessageBase(50, 23); + + obout.writeObject(src_msg); + obout.flush(); + + final ByteArrayInputStream binstr = new ByteArrayInputStream(boutstr.toByteArray()); + final ObjectInputStream obin = new ObjectInputStream(binstr); + + final Object dst_msg = obin.readObject(); + + assertTrue(src_msg.equals(dst_msg)); + + // now check that it would falsely compare against a different message + assertFalse(src_msg.equals(new HAWriteMessageBase(50, 23))); + } /** * Should we expect concurrency of the Socket send and RMI? It seems that we @@ -130,7 +161,7 @@ final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); final ByteBuffer rcv = ByteBuffer.allocate(2000); final Future<Void> futRec = receiveService.receiveData(msg1, rcv); - final Future<Void> futSnd = sendService.send(tst1); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst1, rcv); @@ -140,7 +171,7 @@ final ByteBuffer tst2 = getRandomData(100); final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); final ByteBuffer rcv2 = ByteBuffer.allocate(2000); - final Future<Void> futSnd = sendService.send(tst2); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); @@ -149,6 +180,40 @@ } + public void testSimpleExchangeWithTokens() throws InterruptedException, ExecutionException, TimeoutException { + + final long timeout = 5000;// ms + { + final ByteBuffer tst1 = getRandomData(50); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); + final ByteBuffer rcv = ByteBuffer.allocate(2000); + final Future<Void> futRec = receiveService.receiveData(msg1, rcv); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst1, rcv); + } + + { + // how throw some random data into the stream to force the tokens to do something + final ByteBuffer junk = getRandomData(10000); + final Future<Void> futSnd = sendService.send(junk, null); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + } + + { + final ByteBuffer tst2 = getRandomData(100); + final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); + final ByteBuffer rcv2 = ByteBuffer.allocate(2000); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); + final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst2, rcv2); + } + + } + /** * Sends a large number of random buffers, confirming successful * transmission. @@ -168,7 +233,7 @@ final ByteBuffer rcv = ByteBuffer.allocate(sze + r.nextInt(1024)); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst, rcv); // make sure buffer has been transmitted @@ -199,7 +264,7 @@ assertEquals(sze,tst.limit()); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); // make sure buffer has been transmitted Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -177,7 +177,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -195,7 +195,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); while (!futSnd.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); @@ -284,7 +284,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -317,7 +317,7 @@ .receiveData(msg1, rcv1); // final Future<Void> futRec2 = receiveService2 // .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1.duplicate()); + final Future<Void> futSnd = sendServiceA.send(tst1.duplicate(), msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -421,7 +421,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -498,7 +498,7 @@ // .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); // futRec1.get(); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -520,7 +520,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -553,7 +553,7 @@ rcv1); // final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, // rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); // futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -576,7 +576,7 @@ rcv1); final Future<Void> futRec2 = receiveServiceA.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -665,7 +665,7 @@ // FutureTask return ensures remote ready for Socket data final Future<Void> futRec1 = receiveServiceB.receiveData(msg, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst); + final Future<Void> futSnd = sendServiceA.send(tst, msg.getToken()); while (!futSnd.isDone() && !futRec1.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -846,32 +846,20 @@ fixture.memberAdd(serviceId); } - protected void doMemberRemove() { - fixture.memberRemove(serviceId); - } - protected void doCastVote(final long lastCommitTime) { fixture.castVote(serviceId, lastCommitTime); } - protected void doWithdrawVote() { - fixture.withdrawVote(serviceId); - } - protected void doPipelineAdd() { fixture.pipelineAdd(serviceId); } - protected void doPipelineRemove() { - fixture.pipelineRemove(serviceId); - } - protected void doServiceJoin() { fixture.serviceJoin(serviceId); } - protected void doServiceLeave() { - fixture.serviceLeave(serviceId); + protected void doServiceLeave(final UUID service) { + fixture.serviceLeave(service); } protected void doSetToken(final long newToken) { @@ -890,6 +878,21 @@ fixture.clearToken(); } + @Override + protected void doMemberRemove(UUID service) { + fixture.memberRemove(service); + } + + @Override + protected void doWithdrawVote(UUID service) { + fixture.withdrawVote(service); + } + + @Override + protected void doPipelineRemove(UUID service) { + fixture.pipelineRemove(service); + } + // /** // * {@inheritDoc} // * <p> Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -327,7 +327,7 @@ } @Override - protected void doMemberRemove() { + protected void doMemberRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -340,7 +340,7 @@ try { zk.delete(logicalServiceId + "/" + QUORUM + "/" + QUORUM_MEMBER + "/" + QUORUM_MEMBER_PREFIX - + serviceIdStr, -1/* anyVersion */); + + service.toString(), -1/* anyVersion */); } catch (NoNodeException e) { // ignore. } catch (KeeperException e) { @@ -414,7 +414,7 @@ } @Override - protected void doPipelineRemove() { + protected void doPipelineRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -446,7 +446,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { zk.delete(zpath + "/" + s, -1/* anyVersion */); return; } @@ -636,7 +636,7 @@ * handles a concurrent delete by a simple retry loop. */ @Override - protected void doWithdrawVote() { + protected void doWithdrawVote(final UUID service) { // zpath for votes. final String votesZPath = getVotesZPath(); if (log.isInfoEnabled()) @@ -724,7 +724,7 @@ Thread.currentThread().interrupt(); return; } - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // found our vote. try { // delete our vote. @@ -761,7 +761,7 @@ } // done. if (log.isInfoEnabled()) - log.info("withdrawn: serviceId=" + serviceIdStr + log.info("withdrawn: serviceId=" + service.toString() + ", lastCommitTime=" + lastCommitTime); return; } catch (NoNodeException e) { @@ -836,7 +836,7 @@ } @Override - protected void doServiceLeave() { + protected void doServiceLeave(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -871,7 +871,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // Found this service. zk.delete(zpath + "/" + s, -1/* anyVersion */); return; @@ -2492,5 +2492,4 @@ } } - } Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -45,6 +45,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.halog.HALogWriter; import com.bigdata.ha.halog.IHALogReader; import com.bigdata.ha.msg.HARootBlockRequest; @@ -1031,6 +1032,180 @@ } /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill C (the last + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+B). + */ + public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverC); + + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); + + awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + public void _testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { + for (int i = 0; i < 5; i++) { + try { + testABC_LiveLoadRemainsMet_kill_C(); + } catch (Throwable t) { + fail("Run " + i, t); + } finally { + Thread.sleep(2000); + destroyAll(); + } + } + } + + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill B (the first + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+C), after the leader re-orders the pipeline. + */ + public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverB); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // also check members and joined + awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** + * Instead of killing B this forces its removal from Zookeeper using the + * forceRemoveService method in a task submitted to the leader. + * + * @throws Exception + */ + public void testABC_LiveLoadRemainsMet_remove_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + startup.serverA.submit(new ForceRemoveService(startup.serverB.getServiceId()), false); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // ...and in this case we might also expect the service to rejoin + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC, startup.serverB}); + assertEquals(token, awaitFullyMetQuorum()); + + } + + static class... [truncated message content] |
From: <mar...@us...> - 2013-11-27 14:23:38
|
Revision: 7599 http://bigdata.svn.sourceforge.net/bigdata/?rev=7599&view=rev Author: martyncutcher Date: 2013-11-27 14:23:29 +0000 (Wed, 27 Nov 2013) Log Message: ----------- Updates for pipeline resync and postHACommit for jenkins CI job submission Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BTree.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -1807,6 +1807,9 @@ final Checkpoint checkpoint; try { checkpoint = Checkpoint.load(store, addrCheckpoint); + + if (log.isDebugEnabled()) + log.debug("Checkpoint rootAddr: " + checkpoint.getRootAddr()); } catch (Throwable t) { throw new RuntimeException("Could not load Checkpoint: store=" + store + ", addrCheckpoint=" Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -1474,19 +1474,20 @@ } catch (Throwable t) { - final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); - if (pe != null) { - log.error("Really need to remove service " + pe.getProblemServiceId()); - final UUID psid = pe.getProblemServiceId(); - - try { - member.getActor().forceRemoveService(psid); - } catch (Exception e) { - log.warn("Problem on node removal", e); - - throw new RuntimeException(e); - } - } + // ORIGINAL TESTED GREEN for KillB and KillC +// final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); +// if (pe != null) { +// log.error("Really need to remove service " + pe.getProblemServiceId()); +// final UUID psid = pe.getProblemServiceId(); +// +// try { +// member.getActor().forceRemoveService(psid); +// } catch (Exception e) { +// log.warn("Problem on node removal", e); +// +// throw new RuntimeException(e); +// } +// } // Note: Also see retrySend()'s catch block. @@ -1560,7 +1561,25 @@ return; - } finally { + } catch (Exception t) { + + // THIS LOCATION WORKS! +// final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); +// if (pe != null) { +// log.error("Really need to remove service " + pe.getProblemServiceId()); +// final UUID psid = pe.getProblemServiceId(); +// +// try { +// member.getActor().forceRemoveService(psid); +// } catch (Exception e) { +// log.warn("Problem on node removal", e); +// +// throw new RuntimeException(e); +// } +// } + + throw t; + } finally { unlock(); @@ -1675,120 +1694,142 @@ * Task to send() a buffer to the follower. */ static private class SendBufferTask<S extends HAPipelineGlue> implements - Callable<Void> { + Callable<Void> { - private final QuorumMember<S> member; - private final long token; // member MUST remain leader for token. - private final IHASyncRequest req; - private final IHAWriteMessage msg; - private final ByteBuffer b; - private final PipelineState<S> downstream; - private final HASendService sendService; - private final Lock sendLock; + private final QuorumMember<S> member; + private final long token; // member MUST remain leader for token. + private final IHASyncRequest req; + private final IHAWriteMessage msg; + private final ByteBuffer b; + private final PipelineState<S> downstream; + private final HASendService sendService; + private final Lock sendLock; - public SendBufferTask(final QuorumMember<S> member, final long token, - final IHASyncRequest req, final IHAWriteMessage msg, - final ByteBuffer b, final PipelineState<S> downstream, - final HASendService sendService, final Lock sendLock) { + public SendBufferTask(final QuorumMember<S> member, final long token, + final IHASyncRequest req, final IHAWriteMessage msg, + final ByteBuffer b, final PipelineState<S> downstream, + final HASendService sendService, final Lock sendLock) { - this.member = member; - this.token = token; - this.req = req; // Note: MAY be null. - this.msg = msg; - this.b = b; - this.downstream = downstream; - this.sendService = sendService; - this.sendLock = sendLock; + this.member = member; + this.token = token; + this.req = req; // Note: MAY be null. + this.msg = msg; + this.b = b; + this.downstream = downstream; + this.sendService = sendService; + this.sendLock = sendLock; - } + } - public Void call() throws Exception { + public Void call() throws Exception { - /* - * Lock ensures that we do not have more than one request on the - * write pipeline at a time. - */ + /* + * Lock ensures that we do not have more than one request on the + * write pipeline at a time. + */ - sendLock.lock(); + sendLock.lock(); - try { + try { - doRunWithLock(); - - return null; - - } finally { - - sendLock.unlock(); - - } + doRunWithLock(); - } - - private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + return null; - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b, msg.getToken()); + } finally { - try { + sendLock.unlock(); - // Get Future for receive outcome on the remote service (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + } - try { + } - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for ALL - * writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + private void doRunWithLock() throws InterruptedException, + ExecutionException, IOException { - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + try { + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService.send(b, msg.getToken()); - } catch (Throwable t) { - // check inner cause for downstream PipelineException - final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); - if (pe != null) { - throw pe; // throw it upstream - } - - // determine next pipeline service id - final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], t); - - throw new PipelineException(priorAndNext[1], t); - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + try { - } - - } + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec = downstream.service + .receiveAndReplicate(req, msg); + + try { + + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); + + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } + + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + // throw pe; // throw it upstream - already should have been + // handled + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + // determine next pipeline service id + log.warn("Problem with downstream service: " + problemService, + t); + + // Carry out remedial work directly - BAD + log.error("Really need to remove service " + problemService); + + try { + member.getActor().forceRemoveService(problemService); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + + throw new PipelineException(problemService, t); + + } + } + } /** * Lock used to ensure that at most one message is being sent along the @@ -1934,8 +1975,8 @@ final HAMessageWrapper wrappedMsg = new HAMessageWrapper( req, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + // Get Future for receive() outcome on local service. + final Future<Void> futRcv = receiveService.receiveData(wrappedMsg, b); try { @@ -1946,7 +1987,7 @@ // Verify token remains valid. member.getQuorum().assertQuorum(token); // Await the future. - return futSnd.get(1000, TimeUnit.MILLISECONDS); + return futRcv.get(1000, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { // Timeout. Ignore and retry loop. Thread.sleep(100/* ms */); @@ -1957,7 +1998,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/*mayInterruptIfRunning*/); + futRcv.cancel(true/*mayInterruptIfRunning*/); } @@ -1995,74 +2036,76 @@ this.receiveService = receiveService; } - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for receive() outcome on local service. + final Future<Void> futRcv = receiveService.receiveData(wrappedMsg, + b); + try { + try { - try { + // Get future for receive outcome on the remote + // service. + final Future<Void> futDRcv = downstream.service + .receiveAndReplicate(req, msg); - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + try { - try { + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futRcv.isDone() && !futDRcv.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futRcv.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futDRcv.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futRcv.get(); + futDRcv.get(); - /* - * Await the Futures, but spend more time - * waiting on the local Future and only check - * the remote Future every second. Timeouts are - * ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service is - * not joined with the met quorum. If fact, services - * MUST replicate writes regardless of whether or not - * they are joined with the met quorum, but only while - * there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + } finally { + if (!futDRcv.isDone()) { + // cancel remote Future unless done. + futDRcv.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec - .cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + // Is it possible that this cancel conflicts with throwing + // the PipelineException? + // cancel the local Future. + futRcv.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], + t); - } catch (Throwable t) { - // determine next pipeline service id - final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], t); - - throw new PipelineException(priorAndNext[1], t); - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + throw new PipelineException(priorAndNext[1], t); + } - // done - return null; - } + // done + return null; + } } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -27,14 +27,18 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -52,6 +56,7 @@ import org.apache.log4j.Logger; import com.bigdata.btree.BytesUtil; +import com.bigdata.ha.PipelineException; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -962,8 +967,35 @@ boolean success = false; try { - doReceiveAndReplicate(client); - success = true; + while (!success) { + try { + log.warn("Receiving"); + doReceiveAndReplicate(client); + log.warn("DONE"); + success = true; + } catch (ClosedChannelException cce) { // closed then re-open + + final ServerSocket socket = server.socket(); + + log.warn("Closed upstream? " + socket.getChannel().isOpen(), cce); + + socket.bind(socket.getLocalSocketAddress()); + + server.socket().getChannel().isOpen(); + + awaitAccept(); + + log.warn("Creating new client"); + + client = new Client(server);//, sendService, addrNext); + + // save off reference and round we go + clientRef.set(client); + } catch (Throwable t) { + log.warn("Unexpected Error", t); + throw new RuntimeException(t); + } + } // success. return null; } finally { @@ -1110,7 +1142,8 @@ final int rdLen = client.client.read(tokenBB); for (int i = 0; i < rdLen; i++) { if (tokenBuffer[i] != token[tokenIndex]) { - log.warn("TOKEN MISMATCH"); + if (ntokenreads < 2) + log.warn("TOKEN MISMATCH"); tokenIndex = 0; if (tokenBuffer[i] == token[tokenIndex]) { tokenIndex++; @@ -1222,7 +1255,7 @@ } catch(Throwable t) { log.warn("Send downstream error", t); - throw new RuntimeException(t); + throw new RuntimeException(t); } } break; Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -509,7 +509,7 @@ while (nwritten < remaining) { - log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); + // log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); if (token != null && ntoken < token.length) { final ByteBuffer tokenBB = ByteBuffer.wrap(token); tokenBB.position(ntoken); @@ -551,10 +551,26 @@ * buffer. */ - final int nbytes = socketChannel.write(data); + final int nbytes; + if (log.isDebugEnabled()) { // add debug latency + final int limit = data.limit(); + if (data.position() < (limit-50000)) { + data.limit(data.position()+50000); + } + nbytes = socketChannel.write(data); + data.limit(limit); + + nwritten += nbytes; + log.debug("Written " + nwritten + " of total " + data.limit()); + + if (nwritten < limit) { + Thread.sleep(2); + } + } else { + nbytes = socketChannel.write(data); + nwritten += nbytes; + } - nwritten += nbytes; - if (log.isTraceEnabled()) log.trace("Sent " + nbytes + " bytes with " + nwritten + " of out " + remaining + " written so far"); @@ -729,4 +745,21 @@ } + public void resetSocket() { + try { + final SocketChannel socketChannel = this.socketChannel.get(); + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException ex) { + log.error("Ignoring exception during reetSocket: " + ex, ex); + } finally { + this.socketChannel.set(null); + } + } + } finally { + reopenChannel(); + } + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; import com.bigdata.io.FileChannelUtility; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.rawstore.AbstractRawWormStore; import com.bigdata.rawstore.Bytes; @@ -687,4 +688,14 @@ // public boolean isFlushed() { // return true; // } + + + /** + * Default StoreState implementation to be overridden + * as appropriate. + */ + synchronized public StoreState getStoreState() { + return new StoreState(); + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -3380,7 +3380,9 @@ metaStartAddr, metaBitsAddr, old.getStoreType(), old.getCreateTime(), old.getCloseTime(), old.getVersion(), store.checker); - + + + log.warn("CommitRecordIndexAddr: " + commitRecordIndexAddr + ", strategy: " + _bufferStrategy.getClass() + ", physicalAddress: " + _bufferStrategy.getPhysicalAddress(commitRecordIndexAddr)); } /** @@ -3545,7 +3547,18 @@ private void commitHA() { try { + + if (log.isDebugEnabled()) { + final long rootAddr = store._commitRecordIndex.getRootAddr(); + log.debug("CommitRecordIndex RootAddr: " + rootAddr + ", physical address: " + store.getPhysicalAddress(rootAddr)); + + if (_bufferStrategy instanceof IRWStrategy) { + final RWStore rwstore = ((RWStrategy) _bufferStrategy).getStore(); + log.debug(rwstore.showAllocatorList()); + } + } + if(!prepare2Phase()) { // PREPARE rejected. @@ -4623,6 +4636,8 @@ * * Note: For this code path we DO NOT cache the index view. */ + if (log.isDebugEnabled()) + log.debug("reading CommitRecordIndex from PhysicalAddress: " + _bufferStrategy.getPhysicalAddress(addr)); ndx = (CommitRecordIndex) BTree.load(this, addr, false/* readOnly */); @@ -7589,6 +7604,9 @@ } + if (log.isDebugEnabled()) + log.debug("RBV with CommitRecordIndex at PhysicalAddress: " + _bufferStrategy.getPhysicalAddress(rootBlock.getCommitRecordIndexAddr())); + } // doInnerRun() } // Commit2PhaseTask Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -30,6 +30,7 @@ import com.bigdata.counters.CounterSet; import com.bigdata.io.FileChannelUtility; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; /** * Implements logic to read from and write on a buffer. This is sufficient Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/DumpJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -381,6 +381,17 @@ final boolean dumpHistory, final boolean dumpPages, final boolean dumpIndices, final boolean showTuples) { + /** + * Start a transaction. This will bracket all index access and protect + * the data on the journal from concurrent recycling. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> + * DumpJournal does not protect against concurrent updates (NSS) + * </a> + */ + final long tx = journal.newTx(ITx.READ_COMMITTED); + try { + final FileMetadata fmd = journal.getFileMetadata(); if (fmd != null) { @@ -600,6 +611,9 @@ dumpPages, dumpIndices, showTuples); } + } finally { + journal.abort(tx); + } } @@ -614,7 +628,7 @@ } - public void dumpGlobalRowStore(final PrintWriter out) { + private void dumpGlobalRowStore(final PrintWriter out) { final SparseRowStore grs = journal.getGlobalRowStore(journal .getLastCommitTime()); @@ -826,7 +840,7 @@ * * @return */ - public String dumpRawRecord(final long addr) { + private String dumpRawRecord(final long addr) { if (journal.getBufferStrategy() instanceof IRWStrategy) { /** @@ -984,6 +998,7 @@ } } case Stream: + @SuppressWarnings("unused") final Stream stream = (Stream) ndx; /* * Note: We can't do anything here with a Stream, but we do @@ -1004,41 +1019,4 @@ } - /** - * Return the data in the buffer. - */ - public static byte[] getBytes(ByteBuffer buf) { - - if (buf.hasArray() && buf.arrayOffset() == 0 && buf.position() == 0 - && buf.limit() == buf.capacity()) { - - /* - * Return the backing array. - */ - - return buf.array(); - - } - - /* - * Copy the expected data into a byte[] using a read-only view on the - * buffer so that we do not mess with its position, mark, or limit. - */ - final byte[] a; - { - - buf = buf.asReadOnlyBuffer(); - - final int len = buf.remaining(); - - a = new byte[len]; - - buf.get(a); - - } - - return a; - - } - } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import com.bigdata.counters.CounterSet; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rawstore.IMRMW; import com.bigdata.rawstore.IRawStore; @@ -276,6 +277,15 @@ */ public boolean useChecksums(); + /** + * A StoreState object references critical transient data that can be used + * to determine a degree of consistency between stores, specifically for an + * HA context. + * + * @return the StoreState + */ + public StoreState getStoreState(); + // /** // * Determines whether there are outstanding writes to the underlying store // */ Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -49,6 +49,7 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.IBufferAccess; import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumException; @@ -905,6 +906,11 @@ public WriteCacheService getWriteCacheService() { return m_store.getWriteCacheService(); } + + @Override + public StoreState getStoreState() { + return m_store.getStoreState(); + } // @Override // public boolean isFlushed() { Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/FixedAllocator.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -178,7 +178,9 @@ "Address committed but not set in transients"); } - m_store.showWriteCacheDebug(paddr); + m_store.showWriteCacheDebug(paddr); + + log.warn("Physical address " + paddr + " not accessible for Allocator of size " + m_size); return 0L; } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -101,6 +101,7 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockView; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumException; import com.bigdata.rawstore.IAllocationContext; @@ -1222,9 +1223,9 @@ /* * Utility to encapsulate RootBlock interpreation */ - static class RootBlockInfo { + class RootBlockInfo { - static int nextAllocation(final IRootBlockView rb) { + int nextAllocation(final IRootBlockView rb) { final long nxtOffset = rb.getNextOffset(); // next allocation to be made (in -32K units). @@ -1237,13 +1238,11 @@ return ret == 0 ? -(1 + META_ALLOCATION) : ret; } - /* - * Meta-Allocations stored as {int address; int[8] bits}, so each block - * holds 8*32=256 allocation slots of 1K totaling 256K. - * - * The returned int array is a flattened list of these int[9] blocks - */ - static int[] metabits(final IRootBlockView rb, final ReopenFileChannel reopener) throws IOException { + final int[] m_metabits; + final long m_storageStatsAddr; + final long m_lastDeferredReleaseTime; + + RootBlockInfo(final IRootBlockView rb) throws IOException { final long rawmbaddr = rb.getMetaBitsAddr(); /* @@ -1265,17 +1264,17 @@ */ final byte[] buf = new byte[metaBitsStore * 4]; - FileChannelUtility.readAll(reopener, ByteBuffer.wrap(buf), pmaddr); + FileChannelUtility.readAll(m_reopener, ByteBuffer.wrap(buf), pmaddr); final DataInputStream strBuf = new DataInputStream(new ByteArrayInputStream(buf)); // Can handle minor store version incompatibility strBuf.readInt(); // STORE VERSION - strBuf.readLong(); // Last Deferred Release Time + m_lastDeferredReleaseTime = strBuf.readLong(); // Last Deferred Release Time strBuf.readInt(); // cDefaultMetaBitsSize final int allocBlocks = strBuf.readInt(); - strBuf.readLong(); // m_storageStatsAddr + m_storageStatsAddr = strBuf.readLong(); // m_storageStatsAddr // step over those reserved ints for (int i = 0; i < cReservedMetaBits; i++) { @@ -1300,8 +1299,15 @@ * Meta-Allocations stored as {int address; int[8] bits}, so each block * holds 8*32=256 allocation slots of 1K totaling 256K. */ - return ret; + m_metabits = ret; } + + /* + * Meta-Allocations stored as {int address; int[8] bits}, so each block + * holds 8*32=256 allocation slots of 1K totaling 256K. + * + * The returned int array is a flattened list of these int[9] blocks + */ } /** @@ -1451,7 +1457,9 @@ for (int i = 0; i < m_metaBitsSize; i++) { m_metaBits[i] = strBuf.readInt(); } - m_metaTransientBits = (int[]) m_metaBits.clone(); + // m_metaTransientBits = (int[]) m_metaBits.clone(); + + syncMetaTransients(); final int numFixed = m_allocSizes.length; @@ -1478,6 +1486,18 @@ + ", " + m_metaBitsAddr); } } + + /** + * Uses System.arraycopy rather than clone() to duplicate the + * metaBits to the metaTransientBits, which will be faster. + */ + private void syncMetaTransients() { + if (m_metaTransientBits == null || m_metaTransientBits.length != m_metaBits.length) { + m_metaTransientBits = (int[]) m_metaBits.clone(); + } else { + System.arraycopy(m_metaBits, 0, m_metaTransientBits, 0, m_metaTransientBits.length); + } + } // /* // * Called when store is opened to make sure any deferred frees are @@ -2842,6 +2862,11 @@ isolatedWrites = isolatedWrites || fa.reset(m_writeCacheService, m_committedNextAllocation); } + /** + * Now clone the transient metabits for protection if this service becomes leader + */ + syncMetaTransients(); + if (!isolatedWrites) { /** * Now we should be able to unwind any unused allocators and unused @@ -3114,7 +3139,7 @@ // to provide control // writeFileSpec(); - m_metaTransientBits = (int[]) m_metaBits.clone(); + syncMetaTransients(); // Must be called from AbstractJournal commitNow after writeRootBlock // postCommit(); @@ -3500,6 +3525,9 @@ (b * cDefaultMetaBitsSize) + 1, cDefaultMetaBitsSize-1); if (ret != -1) { + // The assumption is that this bit is also NOT set in m_metaBits + assert !tstBit(m_metaBits, ret); + return ret; } } @@ -6194,14 +6222,40 @@ log.trace("Allocator " + index + ", size: " + xfa.m_size + ", startAddress: " + xfa.getStartAddr() + ", allocated: " + (xfa.getAllocatedSlots()/xfa.m_size)); } } - + + // Update m_metaBits addr and m_nextAllocation to ensure able to allocate as well as read! + { + final long nxtOffset = rbv.getNextOffset(); + + // next allocation to be made (in -32K units). + m_nextAllocation = -(int) (nxtOffset >> 32); + + if (m_nextAllocation == 0) { + throw new IllegalStateException("Invalid state for non-empty store"); + } + + m_committedNextAllocation = m_nextAllocation; + + final long savedMetaBitsAddr = m_metaBitsAddr; + // latched offset of the metabits region. + m_metaBitsAddr = -(int) nxtOffset; + + if (savedMetaBitsAddr != m_metaBitsAddr) + log.warn("Old metaBitsAddr: " + savedMetaBitsAddr + ", new metaBitsAddr: " + m_metaBitsAddr); + } + final ArrayList<FixedAllocator> nallocs = new ArrayList<FixedAllocator>(); // current metabits final int[] oldmetabits = m_metaBits; // new metabits - m_metaBits = RootBlockInfo.metabits(rbv, m_reopener); + final RootBlockInfo rbi = new RootBlockInfo(rbv); + m_metaBits = rbi.m_metabits; + // and grab the last deferred release and storageStats! + m_lastDeferredReleaseTime = rbi.m_lastDeferredReleaseTime; + m_storageStatsAddr = rbi.m_storageStatsAddr; + if(log.isTraceEnabled()) log.trace("Metabits length: " + m_metaBits.length); @@ -6903,6 +6957,16 @@ } + public String showAllocatorList() { + final StringBuilder sb = new StringBuilder(); + + for (int index = 0; index < m_allocs.size(); index++) { + final FixedAllocator xfa = m_allocs.get(index); + sb.append("Allocator " + index + ", size: " + xfa.m_size + ", startAddress: " + xfa.getStartAddr() + ", allocated: " + xfa.getAllocatedSlots() + "\n"); + } + + return sb.toString(); + } // /** // * // * @return whether WCS is flushed @@ -6913,6 +6977,75 @@ // return this.m_writeCacheService.isFlushed(); // } + public static class RWStoreState extends StoreState { + + /** + * Generated ID + */ + private static final long serialVersionUID = 4315400143557397323L; + + /* + * Transient state necessary for consistent ha leader transition + */ + int m_fileSize; + int m_nextAllocation; + int m_committedNextAllocation; + long m_minReleaseAge; + long m_lastDeferredReleaseTime; + long m_storageStatsAddr; + int m_allocsSize; + int m_metaBitsAddr; + int m_metaBitsSize; + + public boolean equals(final Object obj) { + if (obj == null || !(obj instanceof RWStoreState)) + return false; + final RWStoreState other = (RWStoreState) obj; + return m_fileSize == other.m_fileSize + && m_nextAllocation == other.m_nextAllocation + && m_committedNextAllocation == other.m_committedNextAllocation + && m_minReleaseAge == other.m_minReleaseAge + && m_lastDeferredReleaseTime == other.m_lastDeferredReleaseTime + && m_storageStatsAddr == other.m_storageStatsAddr + && m_allocsSize == other.m_allocsSize + && m_metaBitsAddr == other.m_metaBitsAddr + && m_metaBitsSize == other.m_metaBitsSize; + } + + public String toString() { + final StringBuilder sb = new StringBuilder(); + + sb.append("RWStoreState\n"); + sb.append("fileSize: " + m_fileSize + "\n"); + sb.append("nextAllocation: " + m_nextAllocation + "\n"); + sb.append("committedNextAllocation: " + m_committedNextAllocation + "\n"); + sb.append("minReleaseAge: " + m_minReleaseAge + "\n"); + sb.append("lastDeferredReleaseTime: " + m_lastDeferredReleaseTime + "\n"); + sb.append("storageStatsAddr: " + m_storageStatsAddr + "\n"); + sb.append("allocsSize: " + m_allocsSize + "\n"); + sb.append("metaBitsAddr: " + m_metaBitsAddr + "\n"); + sb.append("metaBitsSize: " + m_metaBitsSize + "\n"); + + return sb.toString(); + } + } + + public StoreState getStoreState() { + final RWStoreState ret = new RWStoreState(); + + ret.m_fileSize = m_fileSize; + ret.m_nextAllocation = m_nextAllocation; + ret.m_committedNextAllocation = m_committedNextAllocation; + ret.m_minReleaseAge = m_minReleaseAge; + ret.m_lastDeferredReleaseTime = m_lastDeferredReleaseTime; + ret.m_storageStatsAddr = m_storageStatsAddr; + ret.m_allocsSize = m_allocs.size(); + ret.m_metaBitsAddr = m_metaBitsAddr; + ret.m_metaBitsSize = m_metaBits.length; + + return ret; + } + // public void prepareForRebuild(final HARebuildRequest req) { // assert m_rebuildRequest == null; // Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/rwstore/sector/MemStrategy.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -41,6 +41,7 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockView; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.jini.ha.HAJournalTest.StoreState; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.rawstore.IAddressManager; import com.bigdata.rawstore.IAllocationContext; @@ -497,6 +498,11 @@ return m_dirty; } + @Override + public StoreState getStoreState() { + throw new UnsupportedOperationException(); + } + // @Override // public boolean isFlushed() { // return true; Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -25,14 +25,25 @@ package com.bigdata.ha.pipeline; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.msg.HAWriteMessageBase; import com.bigdata.ha.msg.IHAWriteMessageBase; import com.bigdata.io.DirectBufferPool; @@ -169,22 +180,29 @@ public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException { - final long timeout = 5000; // ms + doSimpleExchange(); + } + + private void doSimpleExchange() throws InterruptedException, + ExecutionException, TimeoutException { + + final long timeout = 5000; // ms final ByteBuffer tst1 = getRandomData(50); - final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, + chk.checksum(tst1)); final ByteBuffer rcv1 = ByteBuffer.allocate(2000); final ByteBuffer rcv2 = ByteBuffer.allocate(2000); // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); - futSnd.get(timeout,TimeUnit.MILLISECONDS); - futRec1.get(timeout,TimeUnit.MILLISECONDS); - futRec2.get(timeout,TimeUnit.MILLISECONDS); + futSnd.get(timeout, TimeUnit.MILLISECONDS); + futRec1.get(timeout, TimeUnit.MILLISECONDS); + futRec2.get(timeout, TimeUnit.MILLISECONDS); assertEquals(tst1, rcv1); assertEquals(rcv1, rcv2); } - + public void testChecksumError() throws InterruptedException, ExecutionException { @@ -711,4 +729,139 @@ } } + public void testSimpleReset() throws InterruptedException, + ExecutionException, TimeoutException { + + doSimpleExchange(); + + sendServiceA.resetSocket(); + + doSimpleExchange(); + } + + /** + * The use of threaded tasks in the send/receive service makes it difficult to + * observer the socket state changes. + * + * So let's begin by writing some tests over the raw sockets. + * + * @throws IOException + */ + public void testDirectSockets() throws IOException { + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + ss.bind(serverAddr); + + final SocketChannel cs = SocketChannel.open(); + + cs.connect(serverAddr); + + final Random r = new Random(); + final byte[] data = new byte[200]; + r.nextBytes(data); + + final ByteBuffer src = ByteBuffer.wrap(data); + + cs.write(src); + + final byte[] dst = new byte[200]; + + final Socket readSckt1 = ss.accept(); + + InputStream instr = readSckt1.getInputStream(); + + instr.read(dst); + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // now write some more data into the channel and then close it + cs.write(ByteBuffer.wrap(data)); + + // close the client socket + cs.close(); + + // and see what happens when we try to accept the data + // we expect it to hang and timeout! + assertTimout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + ss.accept(); + + return null; + }}); + + // Now try writing some more data + try { + cs.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + // the old stream should be closed + try { + final int rdlen = instr.read(dst); // should be closed + fail("Expected closed socket exception, rdlen: " + rdlen); + } catch (Exception e) { + // expected; + } + + // if so then should we explicitly close its socket? + readSckt1.close(); + + final SocketChannel cs2 = SocketChannel.open(); + cs2.connect(serverAddr); + cs2.write(ByteBuffer.wrap(data)); + + // Now we should be able to accept the new write + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + av.set(ss.accept()); + + return null; + }}); + + // the new socket and associated stream should be good to go + av.get().getInputStream().read(dst); + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + + } + + private void assertTimout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + fail("Expected timeout"); + } catch (TimeoutException e) { + // that is expected + return; + } catch (Exception e) { + fail("Expected timeout"); + } finally { + es.shutdown(); + } + } + + private void assertNoTimout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + } catch (TimeoutException e) { + fail("Unexpected timeout"); + } catch (Exception e) { + fail("Unexpected Exception", e); + } finally { + es.shutdown(); + } + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-26 18:30:22 UTC (rev 7598) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/journal/TestDumpJournal.java 2013-11-27 14:23:29 UTC (rev 7599) @@ -29,15 +29,23 @@ package com.bigdata.journal; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import com.bigdata.btree.AbstractBTreeTestCase; import com.bigdata.btree.BTree; import com.bigdata.btree.HTreeIndexMetadata; import com.bigdata.btree.IndexMetadata; import com.bigdata.btree.keys.KV; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.htree.HTree; +import com.bigdata.rwstore.IRWStrategy; +import com.bigdata.util.concurrent.LatchedExecutor; /** * Test suite for {@link DumpJournal}. @@ -66,8 +74,10 @@ /** * @param name */ - public TestDumpJournal(String name) { + public TestDumpJournal(final String name) { + super(name); + } /** @@ -361,4 +371,229 @@ } + /** + * Unit test for {@link DumpJournal} with concurrent updates against the + * backing store. This is intended primarily to detect failures to protect + * against the recycling model associated with the {@link IRWStrategy}. + * + * @throws Exception + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/762"> + * DumpJournal does not protect against concurrent updates (NSS) </a> + */ + public void test_dumpJournal_concurrent_updates() throws Exception { + + final String PREFIX = "testIndex#"; + final int NUM_INDICES = 4; + + Journal src = getStore(getProperties()); + + try { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + src.registerIndex(new IndexMetadata(name, UUID.randomUUID())); + { + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ndx.getCounter().incrementAndGet(); + + } + + } + + } + + } + + // commit the journal (!) + src.commit(); + + /** + * Task to run various DumpJournal requests. + */ + final class DumpTask implements Callable<Void> { + + private final Journal src; + + public DumpTask(final Journal src) { + + this.src = src; + + } + + public Void call() throws Exception { + + new DumpJournal(src) + .dumpJournal(false/* dumpHistory */, + true/* dumpPages */, + false/* dumpIndices */, false/* showTuples */); + + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + true/* dumpPages */, true/* dumpIndices */, + false/* showTuples */); + + // test again w/o dumpPages + new DumpJournal(src) + .dumpJournal(true/* dumpHistory */, + false/* dumpPages */, + true/* dumpIndices */, false/* showTuples */); + + return (Void) null; + + } + + } + + final class UpdateTask implements Callable<Void> { + + private final Journal src; + + public UpdateTask(final Journal src) { + + this.src = src; + + } + + public Void call() throws Exception { + + /* + * Now write some more data, going through a series of commit + * points. This let's us check access to historical commit points. + */ + for (int j = 0; j < 10; j++) { + + for (int i = 0; i < NUM_INDICES; i++) { + + // register an index + final String name = PREFIX + i; + + // lookup the index. + final BTree ndx = src.getIndex(name); + + // #of tuples to write. + final int ntuples = r.nextInt(1000); + + // generate random data. + final KV[] a = AbstractBTreeTestCase + .getRandomKeyValues(ntuples); + + // write tuples (in random order) + for (KV kv : a) { + + ndx.insert(kv.key, kv.val); + + if (r.nextInt(100) < 10) { + + // randomly increment the counter (10% of the time). + ... [truncated message content] |