From: <tho...@us...> - 2013-12-13 16:54:38
|
Revision: 7642 http://bigdata.svn.sourceforge.net/bigdata/?rev=7642&view=rev Author: thompsonbry Date: 2013-12-13 16:54:30 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Sync to Martyn and CI changes to the error handling for pipeline replication and some new unit tests. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java Deleted: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -1,56 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ - -package com.bigdata.ha; - -import java.util.UUID; - -/** - * PipelineException is thrown from RMI calls to communicate - * the root cause of a pipeline problem. The caller is then able - * to take action: for example to remove the problem service - * from the quorum. - */ -public class PipelineException extends RuntimeException { - - /** - * Generated ID - */ - private static final long serialVersionUID = 8019938954269914574L; - - /** The UUID of the service that could not be reached. */ - private final UUID serviceId; - - public PipelineException(final UUID serviceId, final Throwable t) { - super(t); - - this.serviceId = serviceId; - } - - /** Return the UUID of the service that could not be reached. */ - public UUID getProblemServiceId() { - return serviceId; - } - -} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -57,6 +57,9 @@ import com.bigdata.ha.pipeline.HAReceiveService; import com.bigdata.ha.pipeline.HAReceiveService.IHAReceiveCallback; import com.bigdata.ha.pipeline.HASendService; +import com.bigdata.ha.pipeline.ImmediateDownstreamReplicationException; +import com.bigdata.ha.pipeline.NestedPipelineException; +import com.bigdata.ha.pipeline.PipelineImmediateDownstreamReplicationException; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; import com.bigdata.quorum.QCE; @@ -1729,91 +1732,147 @@ } private void doRunWithLock() throws InterruptedException, - ExecutionException, IOException { + ExecutionException, IOException { - try { - // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService - .send(b, snd.getMarker()); + // Get Future for send() outcome on local service. + final Future<Void> futSnd = sendService.send(b, snd.getMarker()); + try { + try { - try { + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRec; + try { + futRec = downstream.service.receiveAndReplicate(req, + snd, msg); + } catch (IOException ex) { // RMI error. + throw new ImmediateDownstreamReplicationException(ex); + } - // Get Future for receive outcome on the remote service - // (RMI). - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + try { - try { + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futSnd.isDone() && !futRec.isDone()) { + /* + * Make sure leader's quorum token remains valid for + * ALL writes. + */ + member.assertLeader(token); + try { + futSnd.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRec.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futSnd.get(); + futRec.get(); - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * Make sure leader's quorum token remains valid for - * ALL writes. - */ - member.assertLeader(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + } finally { + if (!futRec.isDone()) { + // cancel remote Future unless done. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + // cancel the local Future. + futSnd.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + launderPipelineException(true/* isLeader */, member, t); + } + } + + } - } finally { - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } + /** + * Launder an exception thrown during pipeline replication. + * @param isLeader + * @param member + * @param t + */ + static private void launderPipelineException(final boolean isLeader, + final QuorumMember<?> member, final Throwable t) { - } catch (Throwable t) { - // check inner cause for downstream PipelineException - final PipelineException pe = (PipelineException) InnerCause - .getInnerCause(t, PipelineException.class); - final UUID problemService; - if (pe != null) { - // throw pe; // throw it upstream - already should have been - // handled - problemService = pe.getProblemServiceId(); - } else { - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - problemService = priorAndNext[1]; - } + log.warn("isLeader=" + isLeader + ", t=" + t, t); + + /* + * When non-null, some service downstream of this service had a problem + * replicating to a follower. + */ + final PipelineImmediateDownstreamReplicationException remoteCause = (PipelineImmediateDownstreamReplicationException) InnerCause.getInnerCause(t, + PipelineImmediateDownstreamReplicationException.class); - // determine next pipeline service id - log.warn("Problem with downstream service: " + problemService, - t); + /* + * When non-null, this service has a problem with replication to its + * immediate follower. + * + * Note: if [remoteCause!=null], then we DO NOT look for a direct cause + * (since there will be one wrapped up in the exception trace for some + * remote service rather than for this service). + */ + final ImmediateDownstreamReplicationException directCause = remoteCause == null ? (ImmediateDownstreamReplicationException) InnerCause + .getInnerCause(t, + ImmediateDownstreamReplicationException.class) + : null; - // Carry out remedial work directly - BAD - log.error("Really need to remove service " + problemService); + final UUID thisService = member.getServiceId(); - try { - member.getActor().forceRemoveService(problemService); - } catch (Exception e) { - log.warn("Problem on node removal", e); + final UUID[] priorAndNext = member.getQuorum().getPipelinePriorAndNext( + member.getServiceId()); - throw new RuntimeException(e); - } + if (isLeader) { + + try { + + if (directCause != null) { - throw new PipelineException(problemService, t); + member.getActor().forceRemoveService(priorAndNext[1]); - } - } + } else if (remoteCause != null) { + + final UUID problemService = remoteCause.getProblemServiceId(); + + member.getActor().forceRemoveService(problemService); + + } else { + + // Do not remove anybody. + + } + + } catch (Exception e) { + + log.error("Problem on node removal", e); + + throw new RuntimeException(e); + + } + + } + + if (directCause != null) { + + throw new PipelineImmediateDownstreamReplicationException( + thisService, priorAndNext, t); + + } else if (remoteCause != null) { + + throw new NestedPipelineException(t); + + } else { + + throw new RuntimeException(t); + + } + } /** @@ -1829,10 +1888,11 @@ /* * FIXME We should probably pass the quorum token through from the - * leader for ALL replicated writes. This uses the leader's quorum token - * when it is available (for a live write) and otherwise uses the - * current quorum token (for historical writes, since we are not - * providing the leader's token in this case). + * leader for ALL replicated writes [this is now done by the + * IHASendState but the code is not really using that data yet]. This + * uses the leader's quorum token when it is available (for a live + * write) and otherwise uses the current quorum token (for historical + * writes, since we are not providing the leader's token in this case). */ final long token = req == null ? msg.getQuorumToken() : member .getQuorum().token(); @@ -1966,8 +2026,8 @@ final HAMessageWrapper wrappedMsg = new HAMessageWrapper( req, snd, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, + // Get Future for receive() outcome on local service. + final Future<Void> futRec = receiveService.receiveData(wrappedMsg, b); try { @@ -1978,7 +2038,7 @@ // Verify token remains valid. member.getQuorum().assertQuorum(token); // Await the future. - return futSnd.get(1000, TimeUnit.MILLISECONDS); + return futRec.get(1000, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { // Timeout. Ignore and retry loop. Thread.sleep(100/* ms */); @@ -1989,7 +2049,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/*mayInterruptIfRunning*/); + futRec.cancel(true/*mayInterruptIfRunning*/); } @@ -2031,89 +2091,75 @@ } @Override - public Void call() throws Exception { + public Void call() throws Exception { - // wrap the messages together. - final HAMessageWrapper wrappedMsg = new HAMessageWrapper(req, snd, - msg); + // wrap the messages together. + final HAMessageWrapper wrappedMsg = new HAMessageWrapper( + req, snd, msg); - // Get Future for send() outcome on local service. - final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, - b); + // Get Future for receive() outcome on local service. + final Future<Void> futRec = receiveService.receiveData(wrappedMsg, + b); - try { - try { + try { + try { - // Get future for receive outcome on the remote - // service. - final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, snd, msg); + // Get Future for receive outcome on the remote service + // (RMI). + final Future<Void> futRep; + try { + futRep = downstream.service.receiveAndReplicate(req, + snd, msg); + } catch (IOException ex) { // RMI error. + throw new ImmediateDownstreamReplicationException(ex); + } - try { + try { - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. - */ - while (!futSnd.isDone() && !futRec.isDone()) { - /* - * The token must remain valid, even if this service - * is not joined with the met quorum. If fact, - * services MUST replicate writes regardless of - * whether or not they are joined with the met - * quorum, but only while there is a met quorum. - */ - member.getQuorum().assertQuorum(token); - try { - futSnd.get(1L, TimeUnit.SECONDS); - } catch (TimeoutException ignore) { - } - try { - futRec.get(10L, TimeUnit.MILLISECONDS); - } catch (TimeoutException ignore) { - } - } - futSnd.get(); - futRec.get(); + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop. + */ + while (!futRec.isDone() && !futRep.isDone()) { + /* + * The token must remain valid, even if this service + * is not joined with the met quorum. If fact, + * services MUST replicate writes regardless of + * whether or not they are joined with the met + * quorum, but only while there is a met quorum. + */ + member.getQuorum().assertQuorum(token); + try { + futRec.get(1L, TimeUnit.SECONDS); + } catch (TimeoutException ignore) { + } + try { + futRep.get(10L, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignore) { + } + } + futRec.get(); + futRep.get(); - } finally { - if (!futRec.isDone()) { - // cancel remote Future unless done. - futRec.cancel(true/* mayInterruptIfRunning */); - } - } + } finally { + if (!futRep.isDone()) { + // cancel remote Future unless done. + futRep.cancel(true/* mayInterruptIfRunning */); + } + } - } finally { - // Is it possible that this cancel conflicts with throwing - // the PipelineException? - // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); - } - } catch (Throwable t) { - // determine the problem service, which may be further downstream - // if the Throwable contains a PipelineException innerCause - final PipelineException pe = (PipelineException) InnerCause - .getInnerCause(t, PipelineException.class); - final UUID problemService; - if (pe != null) { - problemService = pe.getProblemServiceId(); - } else { - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - problemService = priorAndNext[1]; - } + } finally { + // cancel the local Future. + futRec.cancel(true/* mayInterruptIfRunning */); + } + } catch (Throwable t) { + launderPipelineException(false/* isLeader */, member, t); + } + // done + return null; + } - log.warn("Problem with downstream service: " + problemService, - t); - - throw new PipelineException(problemService, t); - } - - // done - return null; - } - } /** Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineChangeException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Jun 7, 2013. + */ +package com.bigdata.ha.pipeline; + +/** + * A quorum related exception dealing with the write replication pipeline. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract public class AbstractPipelineChangeException extends AbstractPipelineException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public AbstractPipelineChangeException() { + } + + public AbstractPipelineChangeException(String message) { + super(message); + } + + public AbstractPipelineChangeException(Throwable cause) { + super(cause); + } + + public AbstractPipelineChangeException(String message, Throwable cause) { + super(message, cause); + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/AbstractPipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,55 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +import com.bigdata.quorum.QuorumException; + +/** + * A quorum related exception dealing with the write replication pipeline. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract public class AbstractPipelineException extends QuorumException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public AbstractPipelineException() { + } + + public AbstractPipelineException(String message) { + super(message); + } + + public AbstractPipelineException(Throwable cause) { + super(cause); + } + + public AbstractPipelineException(String message, Throwable cause) { + super(message, cause); + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -763,6 +763,10 @@ * more efficient bulk copy operations from the NIO buffer into a local * byte[] on the Java heap against which we then track the evolving * checksum of the data. + * + * FIXME Why isn't this buffer scoped to the outer HAReceiveService? By + * being an inner class field, we allocate it once per payload + * received.... */ private final byte[] a = new byte[512]; @@ -1118,17 +1122,7 @@ callback.incReceive(message, reads, rdlen, rem); } -// if (downstreamFirstCause == null) { -// try { - forwardReceivedBytes(client, rdlen); -// } catch (ExecutionException ex) { -// log.error( -// "Downstream replication failure" -// + ": will drain payload and then rethrow exception: rootCause=" -// + ex, ex); -// downstreamFirstCause = ex; -// } -// } + forwardReceivedBytes(client, rdlen); } // while(itr.hasNext()) @@ -1154,28 +1148,6 @@ + ", actual=" + (int) chk.getValue()); } -// if (downstreamFirstCause != null) { -// -// /** -// * Replication to the downstream service failed. The payload has -// * been fully drained. This ensures that we do not leave part of -// * the payload in the upstream socket channel. We now wrap and -// * rethrow the root cause of the downstream failure. The leader -// * will handle this by forcing the remove of the downstream -// * service and then re-replicating the payload. -// * -// * @see <a -// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" -// * > HA wire pulling and sure kill testing </a> -// */ -// -// throw new RuntimeException( -// "Downstream replication failure: msg=" + message -// + ", ex=" + downstreamFirstCause, -// downstreamFirstCause); -// -// } - // Check for termination. client.checkFirstCause(); @@ -1211,6 +1183,7 @@ * * @throws ExecutionException * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException * * @todo Since the downstream writes are against a blocking mode * channel, the receiver on this node runs in sync with the @@ -1222,8 +1195,11 @@ * HA wire pulling and sure kill testing </a> */ private void forwardReceivedBytes(final Client client, final int rdlen) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, + ImmediateDownstreamReplicationException { + while (true) { + if (rdlen != 0 && addrNextRef.get() != null) { if (log.isTraceEnabled()) log.trace("Incremental send of " + rdlen + " bytes"); @@ -1265,6 +1241,7 @@ : null).get(); } break; // break out of the inner while loop. + } // while(true) } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -38,6 +39,9 @@ import org.apache.log4j.Logger; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.Haltable; + /** * A service for sending raw {@link ByteBuffer}s across a socket. This service * supports the HA write pipeline. This service is designed to be paired with an @@ -82,9 +86,6 @@ private static final Logger log = Logger.getLogger(HASendService.class); -// static final byte ACK = 1; -// static final byte NACK = 0; - /** * The Internet socket address of the receiving service. */ @@ -104,6 +105,7 @@ /* * Note: toString() must be thread-safe. */ + @Override public String toString() { return super.toString() + "{addrNext=" + addrNext + "}"; @@ -295,6 +297,9 @@ * @return The {@link Future} which can be used to await the outcome of this * operation. * + * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException + * * @throws IllegalArgumentException * if the buffer is <code>null</code>. * @throws IllegalArgumentException @@ -305,8 +310,10 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer, final byte[] marker) { - + public Future<Void> send(final ByteBuffer buffer, final byte[] marker) + throws ImmediateDownstreamReplicationException, + InterruptedException { + if (buffer == null) throw new IllegalArgumentException(); @@ -324,10 +331,64 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); + try { + return tmp + .submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); + + } catch (Throwable t) { + + launderThrowable(t); + + // make the compiler happy. + throw new AssertionError(); + + } + } + /** + * Test the {@link Throwable} for its root cause and distinguish between a + * root cause with immediate downstream replication, normal termination + * through {@link InterruptedException}, {@link CancellationException}, and + * nested {@link AbstractPipelineException}s thrown by a downstream service. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > HA + * wire pulling and sure kill testing </a> + */ + private void launderThrowable(final Throwable t) + throws InterruptedException, + ImmediateDownstreamReplicationException { + + if (Haltable.isTerminationByInterrupt(t)) { + + // root cause is interrupt or cancellation exception. + throw new RuntimeException(t); + + } + + if (InnerCause.isInnerCause(t, AbstractPipelineException.class)) { + + /* + * The root cause is NOT the inability to replicate to our immediate + * downstream service. Instead, some service (not us) has a problem + * with pipline replication. + */ + + throw new NestedPipelineException(t); + + } + + /* + * We have a problem with replication to our immediate downstream + * service. + */ + + throw new ImmediateDownstreamReplicationException(toString(), t); + + } + + /** * A series of timeouts used when we need to re-open the * {@link SocketChannel}. */ Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/ImmediateDownstreamReplicationException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,64 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +import java.io.IOException; + + +/** + * An exception thrown by the {@link HAReceiveService} when replication to the + * downstream service fails. The root cause can be an RMI error (can not connect + * or connection lost), a socket channel write error (can not connect, + * connection lost, etc.), or even a transitive error from further down the + * write pipeline. This exception DOES NOT decisively indicate the problem is + * with the immediate downstream service. The caller must inspect the root cause + * to make this determination. However, this exception DOES indicate that the + * problem is with downstream replication rather than with the receipt or + * handling of the payload on this service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class ImmediateDownstreamReplicationException extends IOException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public ImmediateDownstreamReplicationException() { + } + + public ImmediateDownstreamReplicationException(String message) { + super(message); + } + + public ImmediateDownstreamReplicationException(Throwable cause) { + super(cause); + } + + public ImmediateDownstreamReplicationException(String message, Throwable cause) { + super(message, cause); + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/NestedPipelineException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,55 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.pipeline; + +/** + * An exception that is used to wrap and rethrow a cause whose root cause is + * another {@link AbstractPipelineException}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class NestedPipelineException extends AbstractPipelineException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NestedPipelineException() { + super(); + } + + public NestedPipelineException(String message, Throwable cause) { + super(message, cause); + } + + public NestedPipelineException(String message) { + super(message); + } + + public NestedPipelineException(Throwable cause) { + super(cause); + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineDownstreamChange.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -42,7 +42,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class PipelineDownstreamChange extends QuorumException { +public class PipelineDownstreamChange extends AbstractPipelineChangeException { /** * Copied: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java (from rev 7640, branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/PipelineException.java) =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineImmediateDownstreamReplicationException.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -0,0 +1,113 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2013. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.ha.pipeline; + +import java.util.UUID; + +/** + * Exception thrown when there is a problem with write replication from a + * service to its downstream service, including a problem with RMI to the + * downstream service or socket level write replication to the downstream + * service. This typed exception enables the leader is then able to take action + * intended to cure the pipeline by forcing the problem service from the quorum. + * The problem service can then attempt to re-enter the quorum. + */ +public class PipelineImmediateDownstreamReplicationException extends + AbstractPipelineException { + + /** + * Generated ID + */ + private static final long serialVersionUID = 8019938954269914574L; + + /** + * The {@link UUID} of the service reporting a problem replicating writes to + * its downstream service. + */ + private final UUID serviceId; + + /** + * The prior and next service {@link UUID}s for the service reporting the + * problem. The problem is with the communication to the "next" service. + */ + private final UUID[] priorAndNext; + + /** + * + * @param serviceId + * The {@link UUID} of the service reporting a problem + * replicating writes to its downstream service. + * @param priorAndNext + * The prior and next service {@link UUID}s for the service + * reporting the problem. The problem is with the communication + * to the "next" service. + * @param t + * The root cause exception. + */ + public PipelineImmediateDownstreamReplicationException(final UUID serviceId, + final UUID[] priorAndNext, final Throwable t) { + + super(t); + + this.serviceId = serviceId; + + this.priorAndNext = priorAndNext; + + } + + /** + * Return the {@link UUID} of the service reporting the problem. This is the + * service that attempted to replicate a payload to the downstream service + * and was unable to replicate the payload due to the reported root cause. + */ + public UUID getReportingServiceId() { + + return serviceId; + + } + + /** + * The prior and next service {@link UUID}s for the service reporting the + * problem. The problem is with the communication to the "next" service. + */ + public UUID[] getPriorAndNext() { + + return priorAndNext; + + } + + /** + * Return the {@link UUID} of the downstream service - the problem is + * reported for the communication channel between the reporting service and + * this downstream service. The downstream service is the service that + * should be forced out of the quorum in order to "fix" the pipeline. + */ + public UUID getProblemServiceId() { + + return priorAndNext[1]; + + } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/PipelineUpstreamChange.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -29,7 +29,6 @@ import java.util.concurrent.CancellationException; import com.bigdata.ha.QuorumPipelineImpl; -import com.bigdata.quorum.QuorumException; /** * Exception thrown when the upstream service is changed by a pipeline @@ -42,7 +41,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class PipelineUpstreamChange extends QuorumException { +public class PipelineUpstreamChange extends AbstractPipelineChangeException { /** * Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -114,8 +114,9 @@ * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException + * @throws ImmediateDownstreamReplicationException */ - public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException { + public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException, ImmediateDownstreamReplicationException { final long timeout = 5000;// ms { @@ -149,9 +150,10 @@ * @throws TimeoutException * @throws ExecutionException * @throws InterruptedException + * @throws ImmediateDownstreamReplicationException */ public void testStress() throws TimeoutException, InterruptedException, - ExecutionException { + ExecutionException, ImmediateDownstreamReplicationException { final long timeout = 5000; // ms for (int i = 0; i < 100; i++) { Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -159,7 +159,7 @@ } public void testSimpleExchange() throws InterruptedException, - ExecutionException, TimeoutException { + ExecutionException, TimeoutException, ImmediateDownstreamReplicationException { final long timeout = 5000; // ms final ByteBuffer tst1 = getRandomData(50); @@ -177,7 +177,7 @@ assertEquals(rcv1, rcv2); } - public void testChecksumError() throws InterruptedException, ExecutionException + public void testChecksumError() throws InterruptedException, ExecutionException, ImmediateDownstreamReplicationException { final ByteBuffer tst1 = getRandomData(50); Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 11:01:37 UTC (rev 7641) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java 2013-12-13 16:54:30 UTC (rev 7642) @@ -87,6 +87,10 @@ final long token = awaitFullyMetQuorum(); + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + // start concurrent task loads that continue until fully met final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( token)); @@ -94,7 +98,7 @@ executorService.submit(ft); // allow load head start - Thread.sleep(300/* ms */); + Thread.sleep(2000/* ms */); // Verify load is still running. assertFalse(ft.isDone()); @@ -103,11 +107,10 @@ log.warn("ZOOKEEPER\n" + dumpZoo()); kill(startup.serverC); - - // FIXME: in the face of no implemented error propagation we can explicitly - // tell the leader to remove the killed service! - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + // Note: Automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true).get(); + awaitPipeline(20, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverB}); // token must remain unchanged to indicate same quorum @@ -153,6 +156,10 @@ final long token = awaitFullyMetQuorum(); + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + // start concurrent task loads that continue until fully met final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( token)); @@ -160,7 +167,7 @@ executorService.submit(ft); // allow load head start - Thread.sleep(1000/* ms */); + Thread.sleep(2000/* ms */); // Verify load is still running. assertFalse(ft.isDone()); @@ -170,8 +177,8 @@ kill(startup.serverB); - // FIXME: temporary call to explicitly remove the service prior to correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceBId()), true).get(); awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] {startup.serverA, startup.serverC}); @@ -311,15 +318,10 @@ // Dump Zookeeper log.warn("ZOOKEEPER\n" + dumpZoo()); - // Note: sure kill is done automatically when we hit the desired point - // in the write replication. - // kill(startup.serverB); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) +// .get(); - // FIXME: temporary call to explicitly remove the service prior to - // correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceBId()), true) - // .get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverC }); @@ -390,15 +392,10 @@ // Dump Zookeeper log.warn("ZOOKEEPER\n" + dumpZoo()); - // Note: sure kill is done automatically when we hit the desired point - // in the write replication. - // kill(startup.serverB); + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); - // FIXME: temporary call to explicitly remove the service prior to - // correct protocol - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - // .get(); - awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, startup.serverB }); @@ -415,11 +412,83 @@ // token must remain unchanged to indicate same quorum assertEquals(token, awaitMetQuorum()); - // TODO We could finally restart the killed service and verify resync. - } /** + * Start A+B+C in strict sequence. Wait until the quorum fully meets. Start + * a long running LOAD. While the LOAD is running, C will issue a sure kill + * of B (the 1st follower) when it has received a specified number of bytes + * of data from B. Verify that the LOAD completes successfully with the + * remaining services (A+C). + */ + public void testABC_LiveLoadRemainsMet_C_kills_B_duringIncrementalReplication() + throws Exception { + + // enforce join order + final ABC startup = new ABC(true /* sequential */); + + final long token = awaitFullyMetQuorum(); + + // await the initial KB commit on all services. + awaitCommitCounter(1L, new HAGlue[] { startup.serverA, startup.serverB, + startup.serverC }); + + // Set a trigger to sure kill B once C reaches the specified + // replication point. + ((HAGlueTest) startup.serverC) + .failWriteReplication(new HAProgressListenerKillPID_1( + ((HAGlueTest) startup.serverB).getPID())); + + // start large load. + final FutureTask<Void> ft = new FutureTask<Void>(new LargeLoadTask( + token)); + + executorService.submit(ft); + + // Wait until B is killed. + assertCondition(new Runnable() { + public void run() { + try { + startup.serverB.getWritePipelineAddr(); + fail("B is still running."); + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: B is no longer responding: " + + ex); + return; + } + } + }, 20000, TimeUnit.MILLISECONDS); + + // Verify load is still running. + assertFalse(ft.isDone()); + + // Dump Zookeeper + log.warn("ZOOKEEPER\n" + dumpZoo()); + + // Note: automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); + + awaitPipeline(10, TimeUnit.SECONDS, new HAGlue[] { startup.serverA, + startup.serverC }); + + // also check members and joined + awaitMembers(new HAGlue[] { startup.serverA, startup.serverC }); + awaitJoined(new HAGlue[] { startup.serverA, startup.serverC }); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + // Await LOAD, but with a timeout. + ft.get(longLoadTimeoutMillis, TimeUnit.MILLISECONDS); + + // token must remain unchanged to indicate same quorum + assertEquals(token, awaitMetQuorum()); + + } + + /** * Test where we start A+B+C in strict sequence. Once we observe that all * services have gone through the initial KB create, we do a sudden kill of * B. We then start the live load. This test explores what happens when A is @@ -471,8 +540,6 @@ * C. We then start the live load. This test explores what happens when A * and B are not yet aware that C is dead when the UPDATE operation starts. * - * Can I commit this - * * @throws Exception */ public void testABC_awaitKBCreate_killC_LiveLoadRemainsMet() @@ -495,9 +562,9 @@ executorService.submit(ft); - // FIXME RESYNC_PIPELINE: move into QuorumPipelineImpl. - // startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) - // .get(); + // Note: Automatic. +// startup.serverA.submit(new ForceRemoveService(getServiceCId()), true) +// .get(); awaitPipeline(getZKSessionTimeout() + 5000, TimeUnit.MILLISECONDS, new HAGlue[] { startup.serverA, startup.serverB }); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |