From: <tho...@us...> - 2013-12-17 19:24:51
|
Revision: 7672 http://bigdata.svn.sourceforge.net/bigdata/?rev=7672&view=rev Author: thompsonbry Date: 2013-12-17 19:24:45 +0000 (Tue, 17 Dec 2013) Log Message: ----------- The resetPipeline() code now takes the outer lock and then tears down the send/receive service and restarts them. This does not clear up the problem, especially when the HASendService is sending 50k packets. I tried replacing the sendLock with a semaphore to get more confidence that were not yielding the lock and allowing more than one send at a time. This does not change anything. We need to drill down further on the behavior when the send/receive services are torn down and the setup again and how this is interacting with the timing of the forceRemoveService() and the re-replication of the message and payload from the leader. I have not yet drilled down on the logs in any depth for this version of the code. I expect that there is an interaction between the timing of those different activities such that we get a repeated series of resetPipeline() requests rather than actually recovering a pipeline in a known good order and clean slate condition. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:11:24 UTC (rev 7671) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:24:45 UTC (rev 7672) @@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -594,6 +595,14 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId != null && newDownStreamId != null + && oldDownStreamId.equals(newDownStreamId)) { + /* + * Nothing to do. The pipeline is already configured + * correctly. + */ + return; + } // The address of the next service in the pipeline. final InetSocketAddress addrNext = newDownStreamId == null ? null : getAddrNext(newDownStreamId); @@ -1222,7 +1231,6 @@ private class ResetPipelineTaskImpl implements Callable<IHAPipelineResetResponse> { - @SuppressWarnings("unused") private final IHAPipelineResetRequest req; public ResetPipelineTaskImpl(final IHAPipelineResetRequest req) { @@ -1234,20 +1242,30 @@ @Override public IHAPipelineResetResponse call() throws Exception { - // FIXME Versus using the inner handler? -// innerEventHandler.pipelineUpstreamChange(); -// innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); - log.warn("Will reset pipeline"); - if (receiveService != null) { + lock.lock(); + try { + return doRunWithLock(); + } finally { + lock.unlock(); + } + + } - receiveService.changeUpStream(); + private IHAPipelineResetResponse doRunWithLock() throws Exception { - } + log.warn("Will reset pipeline: req=" + req); - if (sendService != null) { + // tear down send and/or receive services. + innerEventHandler.tearDown(); - sendService.closeChannel(); - + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); } return new HAPipelineResetResponse(); @@ -1748,14 +1766,14 @@ private final PipelineState<S> downstream; private final HASendService sendService; private final QuorumPipelineImpl<S> outerClass; - private final Lock sendLock; + private final Semaphore sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HASendService sendService, - final QuorumPipelineImpl<S> outerClass, final Lock sendLock) { + final QuorumPipelineImpl<S> outerClass, final Semaphore sendLock) { this.member = member; this.token = token; @@ -1778,7 +1796,7 @@ * write pipeline at a time. */ - sendLock.lock(); + sendLock.acquire(); try { @@ -1788,7 +1806,7 @@ } finally { - sendLock.unlock(); + sendLock.release(); } @@ -1920,25 +1938,29 @@ problemServiceId = priorAndNext[1]; - member.getActor().forceRemoveService(problemServiceId); - } else if (remoteCause != null) { problemServiceId = remoteCause.getProblemServiceId(); - member.getActor().forceRemoveService(problemServiceId); - } else { // Do not remove anybody. } + + if (problemServiceId != null) { + + // Force out the problem service. + member.getActor().forceRemoveService(problemServiceId); + + } } catch (Throwable e) { // Log and continue. - log.error("Problem on node removal", e); - + log.error("Problem on force remove: problemServiceId=" + + problemServiceId, e); + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); @@ -2009,7 +2031,7 @@ */ private void resetPipeline(final long token, final UUID problemServiceId) { - log.error("Leader will reset pipeline: " + token); + log.error("Leader will reset pipeline: token=" + token+", problemServiceId=" + problemServiceId); /* * We will only message the services that are in the pipeline. @@ -2134,7 +2156,8 @@ * Lock used to ensure that at most one message is being sent along the * write pipeline at a time. */ - private final Lock sendLock = new ReentrantLock(); +// private final Lock sendLock = new ReentrantLock(); + private final Semaphore sendLock = new Semaphore(1/*permits*/); @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |