From: <tho...@us...> - 2013-10-31 13:29:53
|
Revision: 7503 http://bigdata.svn.sourceforge.net/bigdata/?rev=7503&view=rev Author: thompsonbry Date: 2013-10-31 13:29:45 +0000 (Thu, 31 Oct 2013) Log Message: ----------- Import of Martyn's code with my initial review edits. Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Added Paths: ----------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/btree/BytesUtil.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1575,7 +1575,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf) { - + + if (buf == null) + return "NULL"; + return toHexString(buf, buf.length); } @@ -1591,6 +1594,10 @@ * @return The hex string. */ static public String toHexString(final byte[] buf, int n) { + + if (buf == null) + return "NULL"; + n = n < buf.length ? n : buf.length; final StringBuffer out = new StringBuffer(); for (int i = 0; i < n; i++) { Added: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha; + +import java.util.UUID; + +/** + * PipelineException is thrown from RMI calls to communicate + * the root cause of a pipeline problem. The caller is then able + * to take action: for example to remove the problem service + * from the quorum. + */ +public class PipelineException extends RuntimeException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** The UUID of the service that could not be reached. */ + private final UUID serviceId; + + public PipelineException(final UUID serviceId, final Throwable t) { + super(t); + + this.serviceId = serviceId; + } + + /** Return the UUID of the service that could not be reached. */ + public UUID getProblemServiceId() { + return serviceId; + } + +} Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -1184,7 +1184,7 @@ final IHAWriteMessage msg) { // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); + super(msg.getSize(),msg.getChk(),msg.getToken()); this.req = req; // MAY be null; this.msg = msg; @@ -1473,7 +1473,22 @@ innerReplicate(0/* retryCount */); } catch (Throwable t) { + + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + log.error("Really need to remove service " + pe.getProblemServiceId()); + final UUID psid = pe.getProblemServiceId(); + + try { + member.getActor().forceRemoveService(psid); + } catch (Exception e) { + log.warn("Problem on node removal", e); + + throw new RuntimeException(e); + } + } + // Note: Also see retrySend()'s catch block. if (InnerCause.isInnerCause(t, InterruptedException.class) // || InnerCause.isInnerCause(t, CancellationException.class) @@ -1714,7 +1729,7 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b); + final Future<Void> futSnd = sendService.send(b, msg.getToken()); try { @@ -1754,6 +1769,18 @@ } } + } catch (Throwable t) { + // check inner cause for downstream PipelineException + final PipelineException pe = (PipelineException) InnerCause.getInnerCause(t, PipelineException.class); + if (pe != null) { + throw pe; // throw it upstream + } + + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); @@ -2022,6 +2049,12 @@ } } + } catch (Throwable t) { + // determine next pipeline service id + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext(member.getServiceId()); + log.warn("Problem with downstream service: " + priorAndNext[1], t); + + throw new PipelineException(priorAndNext[1], t); } finally { // cancel the local Future. futSnd.cancel(true/* mayInterruptIfRunning */); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -302,7 +302,7 @@ this.compressorKey = compressorKey; } - + /** * The initial version. * Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -28,7 +28,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Random; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.pipeline.HAReceiveService; import com.bigdata.ha.pipeline.HASendService; @@ -55,7 +57,38 @@ /** The Alder32 checksum of the bytes to be transfered. */ private int chk; + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + private byte[] token; + + static private Random r = new Random(); + static final private int TOKEN_SIZE = 8; + + static private byte[] genToken() { + final byte[] token = new byte[TOKEN_SIZE]; + + while (!unique1(token)) { + r.nextBytes(token); + } + + return token; + } + /** + * Checks that the first byte is not repeated in the + * remaining bytes, this simplifies search for the token + * in the input stream. + */ + static private boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** * * @param sze * The #of bytes of data to be transfered. @@ -63,6 +96,10 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { + this(sze, chk, genToken()); + } + + public HAWriteMessageBase(final int sze, final int chk, final byte[] token) { if (sze <= 0) throw new IllegalArgumentException(); @@ -71,6 +108,8 @@ this.chk = chk; + this.token = token; + } /** @@ -115,7 +154,7 @@ final IHAWriteMessageBase t = (IHAWriteMessageBase) obj; - return sze == t.getSize() && chk == t.getChk(); + return sze == t.getSize() && chk == t.getChk() && (BytesUtil.compareBytes(t.getToken(), getToken()) == 0); } @@ -143,6 +182,15 @@ chk = in.readInt(); + // read token + final int tlen = in.readInt(); + if (tlen == 0) { + token = null; + } else { + token = new byte[tlen]; + in.read(token); + } + } public void writeExternal(final ObjectOutput out) throws IOException { @@ -153,6 +201,17 @@ out.writeInt(chk); + if (token == null) { + out.writeInt(0); + } else { + out.writeInt(token.length); + out.write(token); + } } + @Override + public byte[] getToken() { + return token; + } + } Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/msg/IHAWriteMessageBase.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -37,4 +37,6 @@ /** The Alder32 checksum of the bytes to be transfered. */ int getChk(); + /** A byte[] token that must prefix the message payload, needed to skip stale data from failed read tasks */ + byte[] getToken(); } \ No newline at end of file Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -51,6 +51,7 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -564,7 +565,7 @@ try { readFuture.get(); } catch (Exception e) { - log.warn(e, e); + log.warn("Pipeline should have been drained", e); } lock.lockInterruptibly(); @@ -579,6 +580,7 @@ } finally { final Client client = clientRef.get(); if (client != null) { + log.warn("Closing client"); client.close(); } } @@ -958,29 +960,44 @@ } -// boolean success = false; -// try { + boolean success = false; + try { doReceiveAndReplicate(client); -// success = true; + success = true; // success. return null; -// } finally { -// try { -// if(success) { -// ack(client); -// } else { -// nack(client); -// } -// } catch (IOException ex) { -// // log and ignore. -// log.error(ex, ex); -// } -// } + } finally { + try { + if (!success) { + // Drain, assuming that localBuffer is sized to be able to receive the full message + // TODO; confirm this assumption + if (localBuffer.capacity() < message.getSize()) + log.error("Insufficient buffer capacity"); + + // TODO: confirm that it is not possible for the next message to be sent to the pipeline since the + // RMI may have already failed and the next message could be on the way. If so the drain may read the + // start of the next message. + final int startDrain = localBuffer.position(); + final int msgSize = message.getSize(); + log.warn("Start drain at " + startDrain + ", message size: " + msgSize + ", blocking mode: " + client.client.isBlocking()); + while(localBuffer.position() < msgSize) { + if (client.client.read(localBuffer) <= 0) // either -1 or no bytes available + break; + } + log.warn("Drained the pipe of " + (localBuffer.position()-startDrain) + " bytes"); + } + } catch (IOException ex) { + // log and ignore. + log.error(ex, ex); + } + } } // call. private void doReceiveAndReplicate(final Client client) throws Exception { + + log.warn("doReceiveAndReplicate"); /* * We should now have parameters ready in the WriteMessage and can @@ -998,6 +1015,17 @@ // for debug retain number of low level reads int reads = 0; + // setup token values to search for any provided token prefix + final byte[] token = message.getToken(); + + boolean foundStart = token == null; // if null then not able to check + int tokenIndex = 0; + final byte[] tokenBuffer = token == null ? null : new byte[token.length]; + final ByteBuffer tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(token)); + while (rem > 0 && !EOS) { // block up to the timeout. @@ -1049,15 +1077,68 @@ } + final Set<SelectionKey> keys = client.clientSelector .selectedKeys(); final Iterator<SelectionKey> iter = keys.iterator(); + int ntokenreads = 0; + int ntokenbytematches = 0; + while (iter.hasNext()) { iter.next(); iter.remove(); + + if (!foundStart) { + if (log.isDebugEnabled()) + log.debug("Looking for token, reads: " + ntokenreads); + // Note that the logic for finding the token bytes depends on the + // first byte in the token being unique! + // We have to be a bit clever to be sure we do not read beyond the + // token and therefore complicate the reading into the localBuffer. + // This is optimized for the normal case where the key token is read + // as the next bytes from the stream. In the worst case scenario this + // could read large amounts of data only a few bytes at a time, however + // this is not in reality a significant overhead. + while (tokenIndex < token.length ) { + final int remtok = token.length - tokenIndex; + tokenBB.limit(remtok); + tokenBB.position(0); + + final int rdLen = client.client.read(tokenBB); + for (int i = 0; i < rdLen; i++) { + if (tokenBuffer[i] != token[tokenIndex]) { + log.warn("TOKEN MISMATCH"); + tokenIndex = 0; + if (tokenBuffer[i] == token[tokenIndex]) { + tokenIndex++; + } + } else { + tokenIndex++; + ntokenbytematches++; + } + } + + ntokenreads++; + if (ntokenreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + ntokenreads); + } + + foundStart = tokenIndex == token.length; + } + + if (!foundStart) { // not sufficient data ready + // if (log.isDebugEnabled()) + log.warn("Not found token yet!"); + continue; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); + } + } final int rdlen = client.client.read(localBuffer); @@ -1099,41 +1180,53 @@ * * Note: loop since addrNext might change asynchronously. */ - while(true) { - if (rdlen != 0 && addrNextRef.get() != null) { - if (log.isTraceEnabled()) - log.trace("Incremental send of " + rdlen + " bytes"); - final ByteBuffer out = localBuffer.asReadOnlyBuffer(); - out.position(localBuffer.position() - rdlen); - out.limit(localBuffer.position()); - synchronized (sendService) { - /* - * Note: Code block is synchronized on [downstream] - * to make the decision to start the HASendService - * that relays to [addrNext] atomic. The - * HASendService uses [synchronized] for its public - * methods so we can coordinate this lock with its - * synchronization API. - */ - if (!sendService.isRunning()) { - /* - * Prepare send service for incremental - * transfers to the specified address. - */ - // Check for termination. - client.checkFirstCause(); - // Note: use then current addrNext! - sendService.start(addrNextRef.get()); - continue; - } - } - // Check for termination. - client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); - } - break; - } + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { + if (log.isTraceEnabled()) + log.trace("Incremental send of " + rdlen + + " bytes"); + final ByteBuffer out = localBuffer + .asReadOnlyBuffer(); + out.position(localBuffer.position() - rdlen); + out.limit(localBuffer.position()); + synchronized (sendService) { + /* + * Note: Code block is synchronized on + * [downstream] to make the decision to start + * the HASendService that relays to [addrNext] + * atomic. The HASendService uses [synchronized] + * for its public methods so we can coordinate + * this lock with its synchronization API. + */ + if (!sendService.isRunning()) { + /* + * Prepare send service for incremental + * transfers to the specified address. + */ + // Check for termination. + client.checkFirstCause(); + // Note: use then current addrNext! + sendService.start(addrNextRef.get()); + continue; + } + } + // Check for termination. + client.checkFirstCause(); + + // Send and await Future, sending message token if at start of buffer + if (out.position() == 0) { + log.warn("Sending token " + BytesUtil.toHexString(message.getToken())); + } + try { + sendService.send(out, out.position() == 0 ? message.getToken() : null).get(); + } catch(Throwable t) { + log.warn("Send downstream error", t); + + throw new RuntimeException(t); + } + } + break; + } } } // while( rem > 0 ) @@ -1295,6 +1388,7 @@ localBuffer.limit(message.getSize()); localBuffer.position(0); messageReady.signalAll(); + if (log.isTraceEnabled()) log.trace("Will accept data for message: msg=" + msg); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -38,6 +38,8 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; + /** * A service for sending raw {@link ByteBuffer}s across a socket. This service * supports the HA write pipeline. This service is designed to be paired with an @@ -281,7 +283,11 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer) { +// public Future<Void> send(final ByteBuffer buffer) { +// return send(buffer, null); +// } + + public Future<Void> send(final ByteBuffer buffer, final byte[] token) { if (buffer == null) throw new IllegalArgumentException(); @@ -300,7 +306,9 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + log.warn("Sending message with token: " + BytesUtil.toHexString(token)); + + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), token)); } @@ -402,13 +410,14 @@ * * @param buffer * The buffer whose data are to be sent. + * @param token * * @return The task which will send the data to the configured * {@link InetSocketAddress}. */ - protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { + protected Callable<Void> newIncSendTask(final ByteBuffer buffer, final byte[] token) { - return new IncSendTask(buffer); + return new IncSendTask(buffer, token); } @@ -460,8 +469,9 @@ // private final SocketChannel socketChannel; private final ByteBuffer data; + private final byte[] token; - public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { + public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data, final byte[] token) { // if (socketChannel == null) // throw new IllegalArgumentException(); @@ -472,12 +482,12 @@ // this.socketChannel = socketChannel; this.data = data; - + this.token = token; } public Void call() throws Exception { - // defer until we actually run. + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); if (!isRunning()) @@ -494,9 +504,22 @@ try { + int ntoken = 0; int nwritten = 0; while (nwritten < remaining) { + + log.warn("TOKEN: " + BytesUtil.toHexString(token) + ", written: " + (token == null ? false : ntoken == token.length)); + if (token != null && ntoken < token.length) { + final ByteBuffer tokenBB = ByteBuffer.wrap(token); + tokenBB.position(ntoken); + + ntoken += socketChannel.write(tokenBB); + + log.warn("Wrote " + ntoken + " token bytes"); + + continue; + } /* * Write the data. Depending on the channel, will either Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -2289,16 +2289,36 @@ abstract protected void doMemberAdd(); - abstract protected void doMemberRemove(); + final protected void doMemberRemove() { + doMemberRemove(serviceId); + } + abstract protected void doMemberRemove(UUID serviceId); + abstract protected void doCastVote(long lastCommitTime); - abstract protected void doWithdrawVote(); + final protected void doWithdrawVote() { + doWithdrawVote(serviceId); + } + abstract protected void doWithdrawVote(UUID serviceId); + abstract protected void doPipelineAdd(); - abstract protected void doPipelineRemove(); + final protected void doPipelineRemove() { + doPipelineRemove(serviceId); + } + abstract protected void doPipelineRemove(UUID serviceId); + + @Override + public void forceRemoveService(final UUID psid) { + doMemberRemove(psid); + doWithdrawVote(psid); + doPipelineRemove(psid); + doServiceLeave(psid); + } + /** * Invoked when our client will become the leader to (a) reorganize the * write pipeline such that our client is the first service in the write @@ -2396,8 +2416,12 @@ abstract protected void doServiceJoin(); - abstract protected void doServiceLeave(); + final protected void doServiceLeave() { + doServiceLeave(serviceId); + } + abstract protected void doServiceLeave(UUID serviceId); + abstract protected void doSetToken(long newToken); // abstract protected void doSetLastValidToken(long newToken); Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -205,4 +205,21 @@ */ void clearToken(); + /** + * Remove the service from the quorum. This should be called when a problem + * with the service is reported to the quorum leader, for example as a + * result of a failed RMI request or failed socket level write replication. + * Such errors arise either from network connectivity or service death. + * These problems will generally be cured, but the heatbeat timeout to cure + * the problem can cause write replication to block. This method may be used + * to force the timely reordering of the pipeline in order to work around + * the replication problem. This is not a permenant disabling of the service + * - the service may be restarted or may recover and reenter the quorum at + * any time. + * + * @param serviceId + * The UUID of the service to be removed. + */ + public void forceRemoveService(UUID serviceId); + } Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -60,6 +60,9 @@ { final TestSuite suite = new TestSuite("write pipeline"); + + // Test message buffer framing idiom (not required for CI). + // suite.addTestSuite(TestBufferFraming.class); // Test of HASendService and HAReceiveService (2 nodes). suite.addTestSuite(TestHASendAndReceive.class); Added: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java (rev 0) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestBufferFraming.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -0,0 +1,191 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.ha.pipeline; + +import java.util.Random; + +import junit.framework.TestCase; + +/** + * This is a test suite for the buffer framing idiom. + * <p> + * BufferFraming is required to ensure that pipeline buffers are correctly + * identified by RMI messages. + * <p> + * There is currently a problem where receive tasks can be interrupted leaving + * data in the pipeline and subsequent data reads are unable to process the + * correct data. + * <p> + * A proposed solution is to prefix the buffer with an 8 byte identifier + * suitably unique to stochastically avoid problems of random matching errors. + * <p> + * This test class tests finding the offset of the long value in otherwise + * random data. Since it must read each byte this is complicated by the + * requirement to window shift. This complexity can be somewhat simplified by + * ensuring that each byte in the long key is unique. + * <p> + * Note: only the first byte in the key needs to be unique, guaranteeing that + * if a match attempt fails it is only necessary to check the failing character + * to see if that could be the start of a new match attempt. + * + * @author Martyn Cutcher + */ +public class TestBufferFraming extends TestCase { + +// private static final Logger log = Logger +// .getLogger(junit.framework.Test.class); + + boolean unique(final byte[] bytes) { + for (int i = 0; i < bytes.length; i++) { + final byte b = bytes[i]; + for (int t = i + 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + } + + return true; + } + + boolean unique1(final byte[] bytes) { + final byte b = bytes[0]; + for (int t = 1; t < bytes.length; t++) { + if (bytes[t] == b) + return false; + } + + return true; + } + + /** + * Returns n bytes of unique values. + * + * The unique values are important to simplify testing + * against data streams. + * + * In fact the only important aspect is that the initial byte + * is unique! This is sufficient to identify the start point + * of the key in a data stream. + */ + byte[] genKey(Random r, final int size) { + final byte[] ret = new byte[size]; + + while (!unique1(ret)) { + r.nextBytes(ret); + } + + return ret; + } + + /** + * Functional test on performance of key generation + */ + public void testGenKey() { + final Random r = new Random(); + + final int keys = 100000000; // 100M + + final long start = System.currentTimeMillis(); + for (int i = 0; i < keys; i++) { + genKey(r, 8); + } + final long end = System.currentTimeMillis(); + + final long throughputms = (keys / (end - start)); + + assertTrue(throughputms > 10000L); // should be able to produce more than 10M keys per second + } + + /** + * Let's write a string into the middle of a load + * of random data and identify it with our generated key. + */ + public void testEmbeddedMessage() { + doEmbeddedMessage(); + } + + public void testStressEmbeddedMessage() { + for (int t = 0; t < 1000; t++) { + doEmbeddedMessage(); + } + } + + public void doEmbeddedMessage() { + final Random r = new Random(); + final byte[] buffer = new byte[10000000]; // 10M bytes + r.nextBytes(buffer); + + final String tst = "Hello World"; + final byte[] tstbytes = tst.getBytes(); + + final byte[] key = genKey(r, 8); + + int offset = r.nextInt(9000000); // somewhere in first 9M bytes + + // copy string into buffer + copy(key, 0, buffer, offset); + copy(tstbytes, 0, buffer, offset+key.length); + + final int position = find(key, buffer); + + assertTrue(position == offset); + + final byte[] copy = new byte[tstbytes.length]; + copy(buffer, position+key.length, copy, 0); + + final String tstRead = new String(copy); + + assertTrue(tstRead.equals(tst)); + } + + void copy(byte[] src, int srcOffset, byte[] dst, int dstOffset) { + int len = Math.min(src.length, dst.length); + + for (int i = 0; i < len; i++) { + dst[dstOffset+i] = src[srcOffset+i]; + } + } + + int find(final byte[] key, final byte[] buffer) { + final int endPos = buffer.length - key.length; + for (int i = 0; i < endPos; i++) { + if (buffer[i] == key[0]) { + boolean match = true; + for (int t = 1; match && t < key.length; t++) { + match = buffer[i+t] == key[t]; + + if (!match) { + i += t-1; + } + } + if (match) { + return i; + } + } + } + + return -1; + } +} Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -24,6 +24,11 @@ package com.bigdata.ha.pipeline; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Random; @@ -108,6 +113,32 @@ chk = null; } + + /** + * Need to check base message serialization + * + * @throws IOException + * @throws ClassNotFoundException + */ + public void testMessageSerialization() throws IOException, ClassNotFoundException { + final ByteArrayOutputStream boutstr = new ByteArrayOutputStream(); + final ObjectOutputStream obout = new ObjectOutputStream(boutstr); + + final HAWriteMessageBase src_msg = new HAWriteMessageBase(50, 23); + + obout.writeObject(src_msg); + obout.flush(); + + final ByteArrayInputStream binstr = new ByteArrayInputStream(boutstr.toByteArray()); + final ObjectInputStream obin = new ObjectInputStream(binstr); + + final Object dst_msg = obin.readObject(); + + assertTrue(src_msg.equals(dst_msg)); + + // now check that it would falsely compare against a different message + assertFalse(src_msg.equals(new HAWriteMessageBase(50, 23))); + } /** * Should we expect concurrency of the Socket send and RMI? It seems that we @@ -130,7 +161,7 @@ final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); final ByteBuffer rcv = ByteBuffer.allocate(2000); final Future<Void> futRec = receiveService.receiveData(msg1, rcv); - final Future<Void> futSnd = sendService.send(tst1); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst1, rcv); @@ -140,7 +171,7 @@ final ByteBuffer tst2 = getRandomData(100); final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); final ByteBuffer rcv2 = ByteBuffer.allocate(2000); - final Future<Void> futSnd = sendService.send(tst2); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); @@ -149,6 +180,40 @@ } + public void testSimpleExchangeWithTokens() throws InterruptedException, ExecutionException, TimeoutException { + + final long timeout = 5000;// ms + { + final ByteBuffer tst1 = getRandomData(50); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(50, chk.checksum(tst1)); + final ByteBuffer rcv = ByteBuffer.allocate(2000); + final Future<Void> futRec = receiveService.receiveData(msg1, rcv); + final Future<Void> futSnd = sendService.send(tst1, msg1.getToken()); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst1, rcv); + } + + { + // how throw some random data into the stream to force the tokens to do something + final ByteBuffer junk = getRandomData(10000); + final Future<Void> futSnd = sendService.send(junk, null); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + } + + { + final ByteBuffer tst2 = getRandomData(100); + final IHAWriteMessageBase msg2 = new HAWriteMessageBase(100, chk.checksum(tst2)); + final ByteBuffer rcv2 = ByteBuffer.allocate(2000); + final Future<Void> futSnd = sendService.send(tst2, msg2.getToken()); + final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); + futSnd.get(timeout,TimeUnit.MILLISECONDS); + futRec.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst2, rcv2); + } + + } + /** * Sends a large number of random buffers, confirming successful * transmission. @@ -168,7 +233,7 @@ final ByteBuffer rcv = ByteBuffer.allocate(sze + r.nextInt(1024)); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst, rcv); // make sure buffer has been transmitted @@ -199,7 +264,7 @@ assertEquals(sze,tst.limit()); // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); - final Future<Void> futSnd = sendService.send(tst); + final Future<Void> futSnd = sendService.send(tst, msg.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); // make sure buffer has been transmitted Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -177,7 +177,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -195,7 +195,7 @@ // rcv.limit(50); final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); while (!futSnd.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); @@ -284,7 +284,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -317,7 +317,7 @@ .receiveData(msg1, rcv1); // final Future<Void> futRec2 = receiveService2 // .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1.duplicate()); + final Future<Void> futSnd = sendServiceA.send(tst1.duplicate(), msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -421,7 +421,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); // Send will always succeed. futSnd.get(timeout, TimeUnit.MILLISECONDS); /* @@ -498,7 +498,7 @@ // .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); // futRec1.get(); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -520,7 +520,7 @@ .receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC .receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst1); + final Future<Void> futSnd = sendServiceA.send(tst1, msg1.getToken()); futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -553,7 +553,7 @@ rcv1); // final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, // rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); // futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -576,7 +576,7 @@ rcv1); final Future<Void> futRec2 = receiveServiceA.receiveData(msg1, rcv2); - final Future<Void> futSnd = sendServiceC.send(tst1); + final Future<Void> futSnd = sendServiceC.send(tst1, msg1.getToken()); futSnd.get(timeout, TimeUnit.MILLISECONDS); futRec1.get(timeout, TimeUnit.MILLISECONDS); futRec2.get(timeout, TimeUnit.MILLISECONDS); @@ -665,7 +665,7 @@ // FutureTask return ensures remote ready for Socket data final Future<Void> futRec1 = receiveServiceB.receiveData(msg, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg, rcv2); - final Future<Void> futSnd = sendServiceA.send(tst); + final Future<Void> futSnd = sendServiceA.send(tst, msg.getToken()); while (!futSnd.isDone() && !futRec1.isDone() && !futRec2.isDone()) { try { futSnd.get(10L, TimeUnit.MILLISECONDS); Modified: branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -846,32 +846,20 @@ fixture.memberAdd(serviceId); } - protected void doMemberRemove() { - fixture.memberRemove(serviceId); - } - protected void doCastVote(final long lastCommitTime) { fixture.castVote(serviceId, lastCommitTime); } - protected void doWithdrawVote() { - fixture.withdrawVote(serviceId); - } - protected void doPipelineAdd() { fixture.pipelineAdd(serviceId); } - protected void doPipelineRemove() { - fixture.pipelineRemove(serviceId); - } - protected void doServiceJoin() { fixture.serviceJoin(serviceId); } - protected void doServiceLeave() { - fixture.serviceLeave(serviceId); + protected void doServiceLeave(final UUID service) { + fixture.serviceLeave(service); } protected void doSetToken(final long newToken) { @@ -890,6 +878,21 @@ fixture.clearToken(); } + @Override + protected void doMemberRemove(UUID service) { + fixture.memberRemove(service); + } + + @Override + protected void doWithdrawVote(UUID service) { + fixture.withdrawVote(service); + } + + @Override + protected void doPipelineRemove(UUID service) { + fixture.pipelineRemove(service); + } + // /** // * {@inheritDoc} // * <p> Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -327,7 +327,7 @@ } @Override - protected void doMemberRemove() { + protected void doMemberRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -340,7 +340,7 @@ try { zk.delete(logicalServiceId + "/" + QUORUM + "/" + QUORUM_MEMBER + "/" + QUORUM_MEMBER_PREFIX - + serviceIdStr, -1/* anyVersion */); + + service.toString(), -1/* anyVersion */); } catch (NoNodeException e) { // ignore. } catch (KeeperException e) { @@ -414,7 +414,7 @@ } @Override - protected void doPipelineRemove() { + protected void doPipelineRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -446,7 +446,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { zk.delete(zpath + "/" + s, -1/* anyVersion */); return; } @@ -636,7 +636,7 @@ * handles a concurrent delete by a simple retry loop. */ @Override - protected void doWithdrawVote() { + protected void doWithdrawVote(final UUID service) { // zpath for votes. final String votesZPath = getVotesZPath(); if (log.isInfoEnabled()) @@ -724,7 +724,7 @@ Thread.currentThread().interrupt(); return; } - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // found our vote. try { // delete our vote. @@ -761,7 +761,7 @@ } // done. if (log.isInfoEnabled()) - log.info("withdrawn: serviceId=" + serviceIdStr + log.info("withdrawn: serviceId=" + service.toString() + ", lastCommitTime=" + lastCommitTime); return; } catch (NoNodeException e) { @@ -836,7 +836,7 @@ } @Override - protected void doServiceLeave() { + protected void doServiceLeave(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -871,7 +871,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // Found this service. zk.delete(zpath + "/" + s, -1/* anyVersion */); return; @@ -2492,5 +2492,4 @@ } } - } Modified: branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:12:26 UTC (rev 7502) +++ branches/PIPELINE_RESYNC/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-10-31 13:29:45 UTC (rev 7503) @@ -45,6 +45,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.halog.HALogWriter; import com.bigdata.ha.halog.IHALogReader; import com.bigdata.ha.msg.HARootBlockRequest; @@ -1031,6 +1032,180 @@ } /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill C (the last + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+B). + */ + public void testABC_LiveLoadRemainsMet_kill_C() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverC); + + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); + + awaitMembers(new HAGlue[] {startup.serverA, startup.serverB}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverB}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + public void _testStressABC_LiveLoadRemainsMet_kill_C() throws Exception { + for (int i = 0; i < 5; i++) { + try { + testABC_LiveLoadRemainsMet_kill_C(); + } catch (Throwable t) { + fail("Run " + i, t); + } finally { + Thread.sleep(2000); + destroyAll(); + } + } + } + + /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, sure kill B (the first + * follower). Verify that the LOAD completes successfully with the remaining + * services (A+C), after the leader re-orders the pipeline. + */ + public void testABC_LiveLoadRemainsMet_kill_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + kill(startup.serverB); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // also check members and joined + awaitMembers(new HAGlue[] {startup.serverA, startup.serverC}); + awaitJoined(new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** + * Instead of killing B this forces its removal from Zookeeper using the + * forceRemoveService method in a task submitted to the leader. + * + * @throws Exception + */ + public void testABC_LiveLoadRemainsMet_remove_B() throws Exception { + + // enforce join order + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // start concurrent task loads that continue until fully met + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // allow load head start + Thread.sleep(300/* ms */); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + startup.serverA.submit(new ForceRemoveService(startup.serverB.getServiceId()), false); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // ...and in this case we might also expect the service to rejoin + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC, startup.serverB}); + assertEquals(token, awaitFullyMetQuorum()); + + } + + static class... [truncated message content] |