From: <tho...@us...> - 2013-12-17 17:28:04
|
Revision: 7667 http://bigdata.svn.sourceforge.net/bigdata/?rev=7667&view=rev Author: thompsonbry Date: 2013-12-17 17:27:53 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Added a resetPipeline() method to HAPipelineGlue. This is intended to reset each service that is in the pipeline when the leader launders a pipeline exception. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/AbstractMessageTask.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,87 @@ +package com.bigdata.ha; + +import java.io.IOException; +import java.rmi.Remote; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import com.bigdata.ha.msg.IHAMessage; +import com.bigdata.quorum.ServiceLookup; + +/** + * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. This is + * used to execute the different requests in parallel on a local executor + * service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +abstract class AbstractMessageTask<S extends Remote, T, M extends IHAMessage> + implements Callable<T> { + + private final ServiceLookup<S> serviceLookup; + private final UUID serviceId; + protected final M msg; + + public AbstractMessageTask(final ServiceLookup<S> serviceLookup, + final UUID serviceId, final M msg) { + + this.serviceLookup = serviceLookup; + + this.serviceId = serviceId; + + this.msg = msg; + + } + + @Override + final public T call() throws Exception { + + /* + * Note: This code MAY be interrupted at any point if the Future for the + * task is cancelled. If it is interrupted during the RMI, then the + * expectation is that the NIO will be interrupted in a timely manner + * throwing back some sort of IOException indicating the asynchronous + * close of the IO channel or cancel of the RMI. + */ + + // Resolve proxy for remote service. + final S service = serviceLookup.getService(serviceId); + + // RMI. + final Future<T> ft = doRMI(service); + + try { + + /* + * Await the inner Future for the RMI. + * + * Note: In fact, this is a ThickFuture so it is already done by the + * time the RMI returns. + */ + + return ft.get(); + + } finally { + + ft.cancel(true/* mayInterruptIfRunning */); + + } + + } + + /** + * Invoke the specific RMI using the message supplied to the constructor. + * + * @param service + * The service (resolved from the service {@link UUID} supplied + * to the constructor). + * + * @return The result of submitting that RMI to the remote service. + * + * @throws IOException + * if there is a problem with the RMI. + */ + abstract protected Future<T> doRMI(final S service) throws IOException; + +} \ No newline at end of file Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -43,6 +43,8 @@ import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteSetStateRequest; import com.bigdata.ha.msg.IHAWriteSetStateResponse; +import com.bigdata.ha.pipeline.HAReceiveService; +import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.WriteExecutorService; import com.bigdata.service.proxy.ThickFuture; @@ -121,6 +123,26 @@ Future<Void> moveToEndOfPipeline() throws IOException; /** + * Reset the pipeline (blocking). This message is used to handle an error in + * pipeline replication. If replication fails, the socket connections both + * upstream and downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to bring the + * pipeline back into a known state (without forcing a quorum break) we + * message each service in the pipeline to reset its + * {@link HAReceiveService} (including the inner {@link HASendService}). The + * next message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @param msg The request. + * + * @return The {@link Future} for the operation on the remote service. + * + * @throws IOException + */ + Future<IHAPipelineResetResponse> resetPipeline(IHAPipelineResetRequest req) + throws IOException; + + /** * Accept metadata describing an NIO buffer transfer along the write * pipeline. This method is never invoked on the master. It is only invoked * on the failover nodes, including the last node in the failover chain. Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,49 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +public class HAPipelineResetRequest implements IHAPipelineResetRequest { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private long token; + + public HAPipelineResetRequest(final long token) { + this.token = token; + } + + @Override + public long token() { + return token; + } + + @Override + public String toString() { + return super.toString() + "{token=" + token + "}"; + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetResponse.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,33 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +public class HAPipelineResetResponse implements IHAPipelineResetResponse { + + /** + * + */ + private static final long serialVersionUID = 1L; + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,40 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import com.bigdata.ha.msg.IHAMessage; + +/** + * Message requesting a pipeline reset on a service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAPipelineResetRequest extends IHAMessage { + + /** + * The quorum token in effect on the leader when this request was generated. + */ + long token(); + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetResponse.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,35 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha; + +import com.bigdata.ha.msg.IHAMessage; + +/** + * Message reporting the outcome of a pipeline reset on a service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAPipelineResetResponse extends IHAMessage { + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -28,7 +28,6 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -44,14 +43,12 @@ import com.bigdata.ha.msg.IHA2PhaseAbortMessage; import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; -import com.bigdata.ha.msg.IHAMessage; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; -import com.bigdata.service.proxy.ThickFuture; -import com.bigdata.util.InnerCause; +import com.bigdata.quorum.ServiceLookup; import com.bigdata.util.concurrent.ExecutionExceptions; /** @@ -59,9 +56,9 @@ */ public class QuorumCommitImpl<S extends HACommitGlue> extends QuorumStateChangeListenerBase implements QuorumCommit<S>, - QuorumStateChangeListener { + QuorumStateChangeListener, ServiceLookup<HACommitGlue> { - static private transient final Logger log = Logger + static transient final Logger log = Logger .getLogger(QuorumCommitImpl.class); private final QuorumMember<S> member; @@ -83,62 +80,13 @@ return member.getQuorum(); } - - private HACommitGlue getService(final UUID serviceId) { + public HACommitGlue getService(final UUID serviceId) { + return member.getService(serviceId); } - - /** - * Cancel the requests on the remote services (RMI). This is a best effort - * implementation. Any RMI related errors are trapped and ignored in order - * to be robust to failures in RMI when we try to cancel the futures. - * <p> - * NOte: This is not being done in parallel. However, due to a DGC thread - * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are - * being cancelled are all local tasks running on the - * {@link #executorService}. If that local task is doing an RMI, then - * cancelling it will cause an interrupt in the NIO request. - */ - private <F extends Future<T>, T> void cancelFutures(final List<F> futures) { - if (log.isInfoEnabled()) - log.info(""); - - for (F f : futures) { - - if (f == null) { - - continue; - - } - - try { - - if (!f.isDone()) { - - f.cancel(true/* mayInterruptIfRunning */); - - } - - } catch (Throwable t) { - - if (InnerCause.isInnerCause(t, InterruptedException.class)) { - - // Propagate interrupt. - Thread.currentThread().interrupt(); - - } - - // ignored (to be robust). - - } - - } - - } - /** * {@inheritDoc} * <p> @@ -270,7 +218,7 @@ * remote service. We will await this task below. */ final Future<Boolean> rf = executorService - .submit(new PrepareMessageTask(serviceId, + .submit(new PrepareMessageTask(this, serviceId, msgForJoinedService)); // add to list of futures we will check. @@ -379,7 +327,7 @@ } finally { - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } @@ -469,7 +417,7 @@ * remote service. */ final Future<Void> rf = executorService - .submit(new CommitMessageTask(serviceId, + .submit(new CommitMessageTask(this, serviceId, msgJoinedService)); // add to list of futures we will check. @@ -533,7 +481,7 @@ } finally { // Ensure that all futures are cancelled. - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } @@ -580,7 +528,7 @@ * remote service. */ final Future<Void> rf = executorService - .submit(new AbortMessageTask(serviceId, msg)); + .submit(new AbortMessageTask(this, serviceId, msg)); // add to list of futures we will check. localFutures.add(rf); @@ -643,86 +591,24 @@ } finally { // Ensure that all futures are cancelled. - cancelFutures(localFutures); + QuorumServiceBase.cancelFutures(localFutures); } } - /** - * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. - * This is used to execute the different requests in parallel on a local - * executor service. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private abstract class AbstractMessageTask<T, M extends IHAMessage> - implements Callable<T> { + static private class PrepareMessageTask extends + AbstractMessageTask<HACommitGlue, Boolean, IHA2PhasePrepareMessage> { - private final UUID serviceId; - protected final M msg; + public PrepareMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhasePrepareMessage msg) { - public AbstractMessageTask(final UUID serviceId, final M msg) { + super(serviceLookup, serviceId, msg); - this.serviceId = serviceId; - - this.msg = msg; - } @Override - final public T call() throws Exception { - - /* - * Note: This code MAY be interrupted at any point if the Future for - * the task is cancelled. If it is interrupted during the RMI, then - * the expectation is that the NIO will be interrupted in a timely - * manner throwing back some sort of IOException indicating the - * asynchronous close of the IO channel or cancel of the RMI. - */ - - // Resolve proxy for remote service. - final HACommitGlue service = getService(serviceId); - - // RMI. - final Future<T> ft = doRMI(service); - - try { - - /* - * Await the inner Future for the RMI. - * - * Note: In fact, this is a ThickFuture so it is already done by - * the time the RMI returns. - */ - - return ft.get(); - - } finally { - - ft.cancel(true/* mayInterruptIfRunning */); - - } - - } - - abstract protected Future<T> doRMI(final HACommitGlue service) - throws IOException; - - } - - private class PrepareMessageTask extends - AbstractMessageTask<Boolean, IHA2PhasePrepareMessage> { - - public PrepareMessageTask(final UUID serviceId, - final IHA2PhasePrepareMessage msg) { - - super(serviceId, msg); - - } - - @Override protected Future<Boolean> doRMI(final HACommitGlue service) throws IOException { @@ -732,13 +618,14 @@ } - private class CommitMessageTask extends - AbstractMessageTask<Void, IHA2PhaseCommitMessage> { + static private class CommitMessageTask extends + AbstractMessageTask<HACommitGlue, Void, IHA2PhaseCommitMessage> { - public CommitMessageTask(final UUID serviceId, - final IHA2PhaseCommitMessage msg) { + public CommitMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhaseCommitMessage msg) { - super(serviceId, msg); + super(serviceLookup, serviceId, msg); } @@ -751,13 +638,14 @@ } - private class AbortMessageTask extends - AbstractMessageTask<Void, IHA2PhaseAbortMessage> { + static private class AbortMessageTask extends + AbstractMessageTask<HACommitGlue, Void, IHA2PhaseAbortMessage> { - public AbortMessageTask(final UUID serviceId, - final IHA2PhaseAbortMessage msg) { + public AbortMessageTask( + final ServiceLookup<HACommitGlue> serviceLookup, + final UUID serviceId, final IHA2PhaseAbortMessage msg) { - super(serviceId, msg); + super(serviceLookup, serviceId, msg); } @@ -769,5 +657,5 @@ } } - + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -36,6 +36,8 @@ import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; +import com.bigdata.ha.pipeline.HAReceiveService; +import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.writecache.WriteCache; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; @@ -95,6 +97,26 @@ Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, IHAWriteMessage msg) throws IOException; + /** + * Reset the pipeline (blocking). This message is used to handle an error in + * pipeline replication. If replication fails, the socket connections both + * upstream and downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to bring the + * pipeline back into a known state (without forcing a quorum break) we + * message each service in the pipeline to reset its + * {@link HAReceiveService} (including the inner {@link HASendService}). The + * next message and payload relayed from the leader will cause new socket + * connections to be established. + * + * @param msg The request. + * + * @return The {@link Future} for the operation on the local service. + * + * @throws IOException + */ + Future<IHAPipelineResetResponse> resetPipeline(IHAPipelineResetRequest req) + throws IOException; + /* * Note: Method removed since it does not appear necessary to let this * service out of the scope of the QuorumPipelineImpl and the send service Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -30,6 +30,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -70,7 +72,9 @@ import com.bigdata.quorum.QuorumStateChangeEventEnum; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; +import com.bigdata.quorum.ServiceLookup; import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.ExecutionExceptions; /** * {@link QuorumPipeline} implementation. @@ -1197,6 +1201,61 @@ } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + + final FutureTask<IHAPipelineResetResponse> ft = new FutureTask<IHAPipelineResetResponse>( + new ResetPipelineTaskImpl(req)); + + member.getExecutor().submit(ft); + + return ft; + + } + + /** + * Task resets the pipeline on this service. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private class ResetPipelineTaskImpl implements + Callable<IHAPipelineResetResponse> { + + @SuppressWarnings("unused") + private final IHAPipelineResetRequest req; + + public ResetPipelineTaskImpl(final IHAPipelineResetRequest req) { + + this.req = req; + + } + + @Override + public IHAPipelineResetResponse call() throws Exception { + + // FIXME Versus using the inner handler? +// innerEventHandler.pipelineUpstreamChange(); +// innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); + + if (receiveService != null) { + + receiveService.changeUpStream(); + + } + + if (sendService != null) { + + sendService.closeChannel(); + + } + + return new HAPipelineResetResponse(); + + } + + } + /* * This is the leader, so send() the buffer. */ @@ -1558,7 +1617,8 @@ final ByteBuffer b = this.b.duplicate(); new SendBufferTask<S>(member, quorumToken, req, snd, msg, b, - downstream, sendService, sendLock).call(); + downstream, sendService, QuorumPipelineImpl.this, + sendLock).call(); return; @@ -1687,13 +1747,15 @@ private final ByteBuffer b; private final PipelineState<S> downstream; private final HASendService sendService; + private final QuorumPipelineImpl<S> outerClass; private final Lock sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, - final HASendService sendService, final Lock sendLock) { + final HASendService sendService, + final QuorumPipelineImpl outerClass, final Lock sendLock) { this.member = member; this.token = token; @@ -1703,6 +1765,7 @@ this.b = b; this.downstream = downstream; this.sendService = sendService; + this.outerClass = outerClass; this.sendLock = sendLock; } @@ -1785,27 +1848,33 @@ } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload. + futSnd.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - launderPipelineException(true/* isLeader */, member, t); + launderPipelineException(true/* isLeader */, token, member, outerClass, t); } } - } + } // class SendBufferTask /** * Launder an exception thrown during pipeline replication. * * @param isLeader * <code>true</code> iff this service is the quorum leader. + * @param token + * The quorum token. * @param member * The {@link QuorumMember} for this service. + * @param outerClass + * The outer class - required for {@link #resetPipeline()}. * @param t * The throwable. */ - static private void launderPipelineException(final boolean isLeader, - final QuorumMember<?> member, final Throwable t) { + static private <S extends HAPipelineGlue> void launderPipelineException( + final boolean isLeader, final long token, + final QuorumMember<S> member, final QuorumPipelineImpl outerClass, + final Throwable t) { log.warn("isLeader=" + isLeader + ", t=" + t, t); @@ -1838,6 +1907,11 @@ try { + /* + * If we can identify the problem service, then force it out of + * the pipeline. It can re-enter the pipeline once it + * transitions through its ERROR state. + */ if (directCause != null) { member.getActor().forceRemoveService(priorAndNext[1]); @@ -1854,6 +1928,23 @@ } + /** + * Reset the pipeline on each service (including the leader). If + * replication fails, the socket connections both upstream and + * downstream of the point of failure can be left in an + * indeterminate state with partially buffered data. In order to + * bring the pipeline back into a known state (without forcing a + * quorum break) we message each service in the pipeline to + * reset its HAReceiveService (including the inner + * HASendService. The next message and payload relayed from the + * leader will cause new socket connections to be established. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" + * > HA Wire Pulling and Sudden Kills </a> + */ + outerClass.resetPipeline(token); + } catch (Exception e) { log.error("Problem on node removal", e); @@ -1880,8 +1971,145 @@ } } - + /** + * Issue concurrent requests to each service in the pipeline to reset the + * pipeline on that service. The request to the leader is executed in the + * caller's thread so it will own whatever locks the caller already owns - + * this is done to avoid deadlock. + * + * @param token + * The quorum token on the leader. + * @param member + */ + private void resetPipeline(final long token) { + + log.error("Leader will reset pipeline: " + token); + + /* + * We will only message the services that are in the pipeline. + * + * For services (other than the leader) in the quorum, submit the + * RunnableFutures to an Executor. + * + * For the leader, we do this in the caller's thread (to avoid + * possible deadlocks). + */ + final UUID[] pipelineIds = member.getQuorum().getPipeline(); + + member.assertLeader(token); + + final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token); + + /* + * To minimize latency, we first submit the futures for the other + * services and then do f.run() on the leader. + */ + final List<Future<IHAPipelineResetResponse>> localFutures = new LinkedList<Future<IHAPipelineResetResponse>>(); + + try { + + for (int i = 1; i < pipelineIds.length; i++) { + + final UUID serviceId = pipelineIds[i]; + + /* + * Submit task on local executor. The task will do an RMI to the + * remote service. + */ + final Future<IHAPipelineResetResponse> rf = member + .getExecutor().submit( + new PipelineResetMessageTask(member, + serviceId, msg)); + + // add to list of futures we will check. + localFutures.add(rf); + + } + + { + /* + * Run the operation on the leader using a local method call + * (non-RMI) in the caller's thread to avoid deadlock. + */ + member.assertLeader(token); + final FutureTask<IHAPipelineResetResponse> ft = new FutureTask<IHAPipelineResetResponse>( + new ResetPipelineTaskImpl(msg)); + localFutures.add(ft); + ft.run();// run on the leader. + } + /* + * Check the futures for the other services in the quorum. + */ + final List<Throwable> causes = new LinkedList<Throwable>(); + for (Future<IHAPipelineResetResponse> ft : localFutures) { + try { + ft.get(); // TODO Timeout? + } catch (InterruptedException ex) { + log.error(ex, ex); + causes.add(ex); + } catch (ExecutionException ex) { + log.error(ex, ex); + causes.add(ex); + } catch (RuntimeException ex) { + /* + * Note: ClientFuture.get() can throw a RuntimeException + * if there is a problem with the RMI call. In this case + * we do not know whether the Future is done. + */ + log.error(ex, ex); + causes.add(ex); + } finally { + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); + } + } + + /* + * If there were any errors, then throw an exception listing them. + * + * TODO But only throw an exception for the joined services. + * Non-joined services, we just long an error (or simply do not tell + * them to do an abort()). + */ + if (!causes.isEmpty()) { + // Throw exception back to the leader. + if (causes.size() == 1) + throw new RuntimeException(causes.get(0)); + throw new RuntimeException("remote errors: nfailures=" + + causes.size(), new ExecutionExceptions(causes)); + } + + } finally { + + // Ensure that all futures are cancelled. + QuorumServiceBase.cancelFutures(localFutures); + + } + + } + + static private class PipelineResetMessageTask<S extends HAPipelineGlue> + extends + AbstractMessageTask<S, IHAPipelineResetResponse, IHAPipelineResetRequest> { + + public PipelineResetMessageTask(final ServiceLookup<S> serviceLookup, + final UUID serviceId, final IHAPipelineResetRequest msg) { + + super(serviceLookup, serviceId, msg); + + } + + @Override + protected Future<IHAPipelineResetResponse> doRMI(final S service) + throws IOException { + + return service.resetPipeline(msg); + } + + } + + /** * Lock used to ensure that at most one message is being sent along the * write pipeline at a time. */ @@ -1974,7 +2202,7 @@ ft = new FutureTask<Void>(new ReceiveAndReplicateTask<S>( member, token, req, snd, msg, b, downstream, - receiveService)); + receiveService, QuorumPipelineImpl.this)); } @@ -2078,13 +2306,15 @@ private final ByteBuffer b; private final PipelineState<S> downstream; private final HAReceiveService<HAMessageWrapper> receiveService; + private final QuorumPipelineImpl<S> outerClass; public ReceiveAndReplicateTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, - final HAReceiveService<HAMessageWrapper> receiveService) { + final HAReceiveService<HAMessageWrapper> receiveService, + final QuorumPipelineImpl<S> outerClass) { this.member = member; this.token = token; @@ -2094,6 +2324,7 @@ this.b = b; this.downstream = downstream; this.receiveService = receiveService; + this.outerClass = outerClass; } @Override @@ -2159,10 +2390,11 @@ } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload? + futRec.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - launderPipelineException(false/* isLeader */, member, t); + launderPipelineException(false/* isLeader */, token, member, + outerClass, t); } // done return null; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -44,6 +45,8 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.Journal; import com.bigdata.quorum.AbstractQuorumMember; +import com.bigdata.service.proxy.ThickFuture; +import com.bigdata.util.InnerCause; /** * Abstract implementation provides the logic for distributing messages for the @@ -249,6 +252,14 @@ } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + + return pipelineImpl.resetPipeline(req); + + } + /** * Core implementation handles the message and payload when received on a * service. @@ -430,4 +441,54 @@ } + /** + * Cancel the requests on the remote services (RMI). This is a best effort + * implementation. Any RMI related errors are trapped and ignored in order + * to be robust to failures in RMI when we try to cancel the futures. + * <p> + * NOte: This is not being done in parallel. However, due to a DGC thread + * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are + * being cancelled are all local tasks running on the + * {@link #executorService}. If that local task is doing an RMI, then + * cancelling it will cause an interrupt in the NIO request. + */ + public static <F extends Future<T>, T> void cancelFutures( + final List<F> futures) { + + if (log.isInfoEnabled()) + log.info(""); + + for (F f : futures) { + + if (f == null) { + + continue; + + } + + try { + + if (!f.isDone()) { + + f.cancel(true/* mayInterruptIfRunning */); + + } + + } catch (Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + + // Propagate interrupt. + Thread.currentThread().interrupt(); + + } + + // ignored (to be robust). + + } + + } + + } + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -117,9 +117,14 @@ * data to a downstream service. */ private final HASendService sendService; - - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - + + public HASendService getSendService() { + return sendService; + } + + private final ExecutorService executor = Executors + .newSingleThreadExecutor(); + // private ServerSocketChannel server; // private FutureTask<Void> readFuture; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -258,16 +258,7 @@ return; } try { - final SocketChannel socketChannel = this.socketChannel.get(); - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException ex) { - log.error("Ignoring exception during close: " + ex, ex); - } finally { - this.socketChannel.set(null); - } - } + closeSocketChannelNoBlock(); } finally { // shutdown executor. tmp.shutdownNow(); @@ -279,6 +270,33 @@ } /** + * Close the {@link SocketChannel} to the downsteam service (blocking). + */ + public void closeChannel() { + synchronized (this.socketChannel) { + closeSocketChannelNoBlock(); + } + } + + /** + * Close the {@link SocketChannel} to the downstream service (non-blocking). + */ + private void closeSocketChannelNoBlock() { + final SocketChannel socketChannel = this.socketChannel.get(); + if (socketChannel != null) { + try { + socketChannel.close(); + } catch (IOException ex) { + log.error("Ignoring exception during close: " + ex, ex); + } finally { + this.socketChannel.set(null); + } + if (log.isInfoEnabled()) + log.info("Closed socket channel"); + } + } + + /** * Send the bytes {@link ByteBuffer#remaining()} in the buffer to the * configured {@link InetSocketAddress}. * <p> @@ -393,7 +411,7 @@ * A series of timeouts used when we need to re-open the * {@link SocketChannel}. */ - private final static long[] retryMillis = new long[] { 1, 5, 10, 50, 100, 250, 500 }; + private final static long[] retryMillis = new long[] { 1, 5, 10, 50, 100, 250, 250, 250, 250 }; /** * (Re-)open the {@link SocketChannel} if it is closed and this service is @@ -448,12 +466,14 @@ socketChannel.set(sc = openChannel(addrNext.get())); if (log.isInfoEnabled()) - log.info("Opened channel on try: " + tryno); + log.info("Opened channel on try: " + tryno + + ", addrNext=" + addrNext); } catch (IOException e) { if (log.isInfoEnabled()) - log.info("Failed to open channel on try: " + tryno); + log.info("Failed to open channel on try: " + tryno + + ", addrNext=" + addrNext); if (tryno < retryMillis.length) { @@ -671,7 +691,7 @@ */ final int nbytes; - if (false&&log.isDebugEnabled()) { // add debug latency + if (false || log.isDebugEnabled()) { // FIXME add debug latency final int limit = data.limit(); if (data.position() < (limit - 50000)) { data.limit(data.position() + 50000); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -100,6 +100,8 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.HATXSGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.IIndexManagerCallable; import com.bigdata.ha.IJoinedAndNonJoinedServices; import com.bigdata.ha.JoinedAndNonJoinedServices; @@ -8018,6 +8020,14 @@ return getProxy(ft); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + final Future<IHAPipelineResetResponse> f = quorum.getClient() + .resetPipeline(req); + return getProxy(f); + } + /* * HATXSGlue. * @@ -8243,7 +8253,6 @@ } - }; /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -41,7 +41,8 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public interface QuorumClient<S extends Remote> extends QuorumListener { +public interface QuorumClient<S extends Remote> extends QuorumListener, + ServiceLookup<S> { /** * The fully qualified identifier of the logical service whose quorum state Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/quorum/ServiceLookup.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -0,0 +1,51 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Jun 2, 2010 + */ +package com.bigdata.quorum; + +import java.rmi.Remote; +import java.util.UUID; + +public interface ServiceLookup<S extends Remote> { + + /** + * Return the remote interface used to perform HA operations on a member of + * quorum. + * + * @param serviceId + * The {@link UUID} associated with the service. + * + * @return The remote interface for that quorum member. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code> + * @throws QuorumException + * if there is no {@link Quorum} member with that + * <i>serviceId</i>. + */ + S getService(UUID serviceId); + +} Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -49,6 +49,8 @@ import com.bigdata.ha.HAGlueBase; import com.bigdata.ha.HAPipelineGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.QuorumPipeline; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.IHALogRequest; @@ -271,6 +273,12 @@ throw new UnsupportedOperationException(); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } + } // class MockHAPipelineGlue /** @@ -493,6 +501,12 @@ // NOP } + + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } } // MockQuorumMemberImpl Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -56,6 +56,8 @@ import org.apache.log4j.Logger; import com.bigdata.ha.HAPipelineGlue; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.msg.IHALogRequest; import com.bigdata.ha.msg.IHALogRootBlocksRequest; import com.bigdata.ha.msg.IHALogRootBlocksResponse; @@ -1335,6 +1337,12 @@ throw new UnsupportedOperationException(); } + @Override + public Future<IHAPipelineResetResponse> resetPipeline( + IHAPipelineResetRequest req) throws IOException { + throw new UnsupportedOperationException(); + } + } // MockService } // MockQuorumMember Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -54,6 +54,8 @@ import com.bigdata.counters.PIDUtil; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IHAPipelineResetRequest; +import com.bigdata.ha.IHAPipelineResetResponse; import com.bigdata.ha.QuorumService; import com.bigdata.ha.RunState; import com.bigdata.ha.msg.IHA2PhaseAbortMessage; @@ -1112,6 +1114,15 @@ } @Override + public Future<IHAPipelineResetResponse> resetPipeline( + final IHAPipelineResetRequest req) throws IOException { + + checkMethod("resetPipeline", new Class[] {}); + + return super.resetPipeline(req); + } + + @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg) throws IOException { Modified: branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java =================================================================== --- branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java 2013-12-17 01:00:14 UTC (rev 7666) +++ branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServer.java 2013-12-17 17:27:53 UTC (rev 7667) @@ -26,16 +26,27 @@ */ package com.bigdata.journal.jini.ha; +import java.io.IOException; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import net.jini.config.Configuration; import net.jini.core.lookup.ServiceID; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.msg.HADigestRequest; +import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; +import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.DaemonThreadFactory; /** * Life cycle and related tests for a single remote {@link HAJournalServer} out @@ -347,4 +358,153 @@ } + /** + * This test is used to characterize what happens when we interrupt an RMI. + * Most methods on the {@link HAGlue} interface are synchronous - they block + * while some behavior is executed. This is even true for some methods that + * return a {@link Future} in order to avoid overhead associated with the + * export of a proxy and DGC thread leaks (since fixed in River). + * <p> + * This unit test setups up a service and then issues an RMI that invokes a + * {@link Thread#sleep(long)} method on the service. The thread that issues + * the RMI is then interrupted during the sleep. + * + * @throws Exception + */ + public void test_interruptRMI() throws Exception { + + // Start a service. + final HAGlue serverA = startA(); + + final AtomicReference<Throwable> localCause = new AtomicReference<Throwable>(); + + final ExecutorService executorService = Executors + .newSingleThreadScheduledExecutor(DaemonThreadFactory + .defaultThreadFactory()); + + try { + + final FutureTask<Void> localFuture = new FutureTask<Void>( + new Callable<Void>() { + + @Override + public Void call() throws Exception { + + try { + final Future<Void> ft = ((HAGlueTest) serverA) + .submit(new SleepTask(6000/* ms */), + false/* asyncFuture */); + + return ft.get(); + } catch (Throwable t) { + localCause.set(t); + log.error(t, t); + throw new RuntimeException(t); + } finally { + log.warn("Local submit of remote task is done."); + } + } + }); + /* + * Submit task that will execute sleep on A. This task will block + * until A finishes its sleep. When we cancel this task, the RMI to + * A will be interrupted. + */ + executorService.execute(localFuture); + + // Wait a bit to ensure that the task was started on A. + Thread.sleep(2000/* ms */); + + // interrupt the local future. will cause interrupt of the RMI. + localFuture.cancel(true/*mayInterruptIfRunning*/); + + } finally { + + executorService.shutdownNow(); + + } + + /* + * The local root cause of the RMI failure is an InterruptedException. + * + * Note: There is a data race between when the [localCause] is set and + * when we exit the code block above. This is because we are + * interrupting the local task and have no means to await the completion + * of its error handling routine which sets the [localCause]. + */ + { + assertCondition(new Runnable() { + @Override + public void run() { + final Throwable tmp = localCause.get(); + assertNotNull(tmp); + assertTrue(InnerCause.isInnerCause(tmp, + InterruptedException.class)); + } + }, 10000/*timeout*/, TimeUnit.MILLISECONDS); + } + + /* + * Verify the root cause as observed by A for the interrupt. It should + * also be an InterruptedException. + * + * Note: Again, there is a data race. + * + * Note: Because we might retry this, we do NOT use the getAndClearXXX() + * method to recover the remote exception. + */ + { + assertCondition(new Runnable() { + @Override + public void run() { + Throwable tmp; + try { + tmp = ((HAGlueTest) serverA).getLastRootCause(); + } catch (IOException e) { + throw new RuntimeException(e); + } + assertNotNull(tmp); + log.warn("Received non-null lastRootCause=" + tmp, tmp); + assertTrue(InnerCause.isInnerCause(tmp, + InterruptedException.class)); + } + }, 10000/* timeout */, TimeUnit.MILLISECONDS); + } + + } + + /** + * Task sleeps for a specified duration. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class SleepTask extends IndexManagerCallable<Void> { + + private static final long serial... [truncated message content] |