From: <mar...@us...> - 2013-12-13 11:01:43
|
Revision: 7641 http://bigdata.svn.sourceforge.net/bigdata/?rev=7641&view=rev Author: martyncutcher Date: 2013-12-13 11:01:37 +0000 (Fri, 13 Dec 2013) Log Message: ----------- Implement downstream PipelineException check to handle larger quorums than 3 services Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 10:44:56 UTC (rev 7640) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 11:01:37 UTC (rev 7641) @@ -2091,15 +2091,23 @@ futSnd.cancel(true/* mayInterruptIfRunning */); } } catch (Throwable t) { - // determine next pipeline service id - // FIXME: should this check for problem from further downstream for - // quorums with > 3 services? - final UUID[] priorAndNext = member.getQuorum() - .getPipelinePriorAndNext(member.getServiceId()); - log.warn("Problem with downstream service: " + priorAndNext[1], + // determine the problem service, which may be further downstream + // if the Throwable contains a PipelineException innerCause + final PipelineException pe = (PipelineException) InnerCause + .getInnerCause(t, PipelineException.class); + final UUID problemService; + if (pe != null) { + problemService = pe.getProblemServiceId(); + } else { + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + problemService = priorAndNext[1]; + } + + log.warn("Problem with downstream service: " + problemService, t); - throw new PipelineException(priorAndNext[1], t); + throw new PipelineException(problemService, t); } // done This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 18:55:37
|
Revision: 7644 http://bigdata.svn.sourceforge.net/bigdata/?rev=7644&view=rev Author: thompsonbry Date: 2013-12-13 18:55:30 +0000 (Fri, 13 Dec 2013) Log Message: ----------- also modified the receiveAndReplicate code path to wait until both futures are done before dropping out of the while() loop. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 16:59:23 UTC (rev 7643) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-13 18:55:30 UTC (rev 7644) @@ -1756,7 +1756,7 @@ * local Future and only check the remote Future every * second. Timeouts are ignored during this loop. */ - while (!futSnd.isDone() && !futRec.isDone()) { + while (!futSnd.isDone() || !futRec.isDone()) { /* * Make sure leader's quorum token remains valid for * ALL writes. @@ -1771,6 +1771,8 @@ } catch (TimeoutException ignore) { } } + + // Note: Both futures are DONE at this point! futSnd.get(); futRec.get(); @@ -2121,7 +2123,7 @@ * local Future and only check the remote Future every * second. Timeouts are ignored during this loop. */ - while (!futRec.isDone() && !futRep.isDone()) { + while (!futRec.isDone() || !futRep.isDone()) { /* * The token must remain valid, even if this service * is not joined with the met quorum. If fact, @@ -2139,6 +2141,8 @@ } catch (TimeoutException ignore) { } } + + // Note: Both futures are DONE at this point! futRec.get(); futRep.get(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 18:03:44
|
Revision: 7669 http://bigdata.svn.sourceforge.net/bigdata/?rev=7669&view=rev Author: thompsonbry Date: 2013-12-17 18:03:37 +0000 (Tue, 17 Dec 2013) Log Message: ----------- Bug fix to code looking for root cause of interrupted exception in launder throwable. It was examining the wrong stack trace. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:00:44 UTC (rev 7668) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 18:03:37 UTC (rev 7669) @@ -1939,7 +1939,7 @@ // Log and continue. log.error("Problem on node removal", e); - if(InnerCause.isInnerCause(t, InterruptedException.class)) { + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); } @@ -1970,7 +1970,7 @@ // Log and continue. log.error("Problem on reset pipeline", e); - if(InnerCause.isInnerCause(t, InterruptedException.class)) { + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-17 19:24:51
|
Revision: 7672 http://bigdata.svn.sourceforge.net/bigdata/?rev=7672&view=rev Author: thompsonbry Date: 2013-12-17 19:24:45 +0000 (Tue, 17 Dec 2013) Log Message: ----------- The resetPipeline() code now takes the outer lock and then tears down the send/receive service and restarts them. This does not clear up the problem, especially when the HASendService is sending 50k packets. I tried replacing the sendLock with a semaphore to get more confidence that were not yielding the lock and allowing more than one send at a time. This does not change anything. We need to drill down further on the behavior when the send/receive services are torn down and the setup again and how this is interacting with the timing of the forceRemoveService() and the re-replication of the message and payload from the leader. I have not yet drilled down on the logs in any depth for this version of the code. I expect that there is an interaction between the timing of those different activities such that we get a repeated series of resetPipeline() requests rather than actually recovering a pipeline in a known good order and clean slate condition. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:11:24 UTC (rev 7671) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-17 19:24:45 UTC (rev 7672) @@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -594,6 +595,14 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId != null && newDownStreamId != null + && oldDownStreamId.equals(newDownStreamId)) { + /* + * Nothing to do. The pipeline is already configured + * correctly. + */ + return; + } // The address of the next service in the pipeline. final InetSocketAddress addrNext = newDownStreamId == null ? null : getAddrNext(newDownStreamId); @@ -1222,7 +1231,6 @@ private class ResetPipelineTaskImpl implements Callable<IHAPipelineResetResponse> { - @SuppressWarnings("unused") private final IHAPipelineResetRequest req; public ResetPipelineTaskImpl(final IHAPipelineResetRequest req) { @@ -1234,20 +1242,30 @@ @Override public IHAPipelineResetResponse call() throws Exception { - // FIXME Versus using the inner handler? -// innerEventHandler.pipelineUpstreamChange(); -// innerEventHandler.pipelineChange(oldDownStreamId, newDownStreamId); - log.warn("Will reset pipeline"); - if (receiveService != null) { + lock.lock(); + try { + return doRunWithLock(); + } finally { + lock.unlock(); + } + + } - receiveService.changeUpStream(); + private IHAPipelineResetResponse doRunWithLock() throws Exception { - } + log.warn("Will reset pipeline: req=" + req); - if (sendService != null) { + // tear down send and/or receive services. + innerEventHandler.tearDown(); - sendService.closeChannel(); - + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); } return new HAPipelineResetResponse(); @@ -1748,14 +1766,14 @@ private final PipelineState<S> downstream; private final HASendService sendService; private final QuorumPipelineImpl<S> outerClass; - private final Lock sendLock; + private final Semaphore sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HASendService sendService, - final QuorumPipelineImpl<S> outerClass, final Lock sendLock) { + final QuorumPipelineImpl<S> outerClass, final Semaphore sendLock) { this.member = member; this.token = token; @@ -1778,7 +1796,7 @@ * write pipeline at a time. */ - sendLock.lock(); + sendLock.acquire(); try { @@ -1788,7 +1806,7 @@ } finally { - sendLock.unlock(); + sendLock.release(); } @@ -1920,25 +1938,29 @@ problemServiceId = priorAndNext[1]; - member.getActor().forceRemoveService(problemServiceId); - } else if (remoteCause != null) { problemServiceId = remoteCause.getProblemServiceId(); - member.getActor().forceRemoveService(problemServiceId); - } else { // Do not remove anybody. } + + if (problemServiceId != null) { + + // Force out the problem service. + member.getActor().forceRemoveService(problemServiceId); + + } } catch (Throwable e) { // Log and continue. - log.error("Problem on node removal", e); - + log.error("Problem on force remove: problemServiceId=" + + problemServiceId, e); + if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. Thread.currentThread().interrupt(); @@ -2009,7 +2031,7 @@ */ private void resetPipeline(final long token, final UUID problemServiceId) { - log.error("Leader will reset pipeline: " + token); + log.error("Leader will reset pipeline: token=" + token+", problemServiceId=" + problemServiceId); /* * We will only message the services that are in the pipeline. @@ -2134,7 +2156,8 @@ * Lock used to ensure that at most one message is being sent along the * write pipeline at a time. */ - private final Lock sendLock = new ReentrantLock(); +// private final Lock sendLock = new ReentrantLock(); + private final Semaphore sendLock = new Semaphore(1/*permits*/); @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2014-01-07 11:05:25
|
Revision: 7741 http://bigdata.svn.sourceforge.net/bigdata/?rev=7741&view=rev Author: martyncutcher Date: 2014-01-07 11:05:13 +0000 (Tue, 07 Jan 2014) Log Message: ----------- Change future checks to avoid deadlock if socket send blocks. It has not been possible to test this locally since failure has only been observed in CI on EC2 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-06 21:12:19 UTC (rev 7740) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-07 11:05:13 UTC (rev 7741) @@ -1944,8 +1944,14 @@ * also ignored. We want to continue this loop until * both Futures are done. Interrupts are not trapped, so * an interrupt will still exit the loop. + * + * It appears that it is possible for futSnd to be blocked + * and not generate an error. If we do not exit the loop + * and check the futRec future in this case then we coul loop + * continuously. This does rather beg the question of + * whether we should only be checking futRec at this stage. */ - while (!futSnd.isDone() || !futRec.isDone()) { + while (!(futSnd.isDone() || futRec.isDone())) { /* * Make sure leader's quorum token remains valid for * ALL writes. @@ -2491,7 +2497,7 @@ @Override public Void call() throws Exception { - + // wrap the messages together. final HAMessageWrapper wrappedMsg = new HAMessageWrapper( req, snd, msg); @@ -2524,8 +2530,11 @@ * also ignored. We want to continue this loop until * both Futures are done. Interrupts are not trapped, so * an interrupt will still exit the loop. + * + * TODO: check the comparative logic with this and robustReplicate + * to confirm the equivalence of checking the different futures. */ - while (!futRec.isDone() || !futRep.isDone()) { + while (!(futRec.isDone() || futRep.isDone())) { /* * The token must remain valid, even if this service * is not joined with the met quorum. If fact, @@ -2547,7 +2556,7 @@ } /* - * Note: Both futures are DONE at this point. However, + * Note: Both futures are DONE (or not - check condition above) at this point. However, * we want to check the remote Future for the downstream * service first in order to accurately report the * service that was the source of a pipeline replication This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2014-01-08 17:10:15
|
Revision: 7748 http://bigdata.svn.sourceforge.net/bigdata/?rev=7748&view=rev Author: martyncutcher Date: 2014-01-08 17:10:07 +0000 (Wed, 08 Jan 2014) Log Message: ----------- Modification to support possible future non-thick future for receiveAndReplicate to check for regression on CI. The intention is to comment out this code once we have verified it is a plausible approach. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-08 15:14:09 UTC (rev 7747) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-08 17:10:07 UTC (rev 7748) @@ -1951,27 +1951,65 @@ * continuously. This does rather beg the question of * whether we should only be checking futRec at this stage. */ - while (!(futSnd.isDone() || futRec.isDone())) { + + /* + * Await the Futures, but spend more time waiting on the + * local Future and only check the remote Future every + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. + */ + while (!futSnd.isDone() || !futRec.isDone()) { /* * Make sure leader's quorum token remains valid for * ALL writes. */ member.assertLeader(token); try { - futSnd.get(1L, TimeUnit.SECONDS); + futSnd.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { + /* + * Try the other Future with timeout and cancel + * if not done. + */ + try { + futRec.get(1L, TimeUnit.SECONDS); + } catch(TimeoutException ex) { // Ignore. + } catch(ExecutionException ex) { // Ignore. + } finally { + futRec.cancel(true/* mayInterruptIfRunning */); + } + /* + * Note: Both futures are DONE at this point. + */ } try { futRec.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { + /* + * Try the other Future with timeout and cancel + * if not done. + */ + try { + futSnd.get(10L, TimeUnit.MILLISECONDS); + } catch(TimeoutException ex) { // Ignore. + } catch(ExecutionException ex) { // Ignore. + } finally { + futSnd.cancel(true/* mayInterruptIfRunning */); + } + /* + * Note: Both futures are DONE at this point. + */ } } - + /* - * Note: Both futures are DONE at this point. However, - * we want to check the remote Future for the downstream + * Note: We want to check the remote Future for the downstream * service first in order to accurately report the * service that was the source of a pipeline replication * problem. @@ -2534,7 +2572,7 @@ * TODO: check the comparative logic with this and robustReplicate * to confirm the equivalence of checking the different futures. */ - while (!(futRec.isDone() || futRep.isDone())) { + while (!futRec.isDone() || !futRep.isDone()) { /* * The token must remain valid, even if this service * is not joined with the met quorum. If fact, @@ -2547,11 +2585,39 @@ futRec.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { + /* + * Try the other Future with timeout and cancel + * if not done. + */ + try { + futRep.get(1L, TimeUnit.SECONDS); + } catch(TimeoutException ex) { // Ignore. + } catch(ExecutionException ex) { // Ignore. + } finally { + futRep.cancel(true/* mayInterruptIfRunning */); + } + /* + * Note: Both futures are DONE at this point. + */ } try { futRep.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { + /* + * Try the other Future with timeout and cancel + * if not done. + */ + try { + futRec.get(1L, TimeUnit.SECONDS); + } catch(TimeoutException ex) { // Ignore. + } catch(ExecutionException ex) { // Ignore. + } finally { + futRec.cancel(true/* mayInterruptIfRunning */); + } + /* + * Note: Both futures are DONE at this point. + */ } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-09 17:44:51
|
Revision: 7752 http://bigdata.svn.sourceforge.net/bigdata/?rev=7752&view=rev Author: thompsonbry Date: 2014-01-09 17:44:44 +0000 (Thu, 09 Jan 2014) Log Message: ----------- Modified timeouts in loops for local and remote futures in QuorumPipelineImpl prior to merge to main branch. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-08 17:58:02 UTC (rev 7751) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2014-01-09 17:44:44 UTC (rev 7752) @@ -1951,17 +1951,6 @@ * continuously. This does rather beg the question of * whether we should only be checking futRec at this stage. */ - - /* - * Await the Futures, but spend more time waiting on the - * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop - they - * are used to let us wait longer on the local Future - * than on the remote Future. ExecutionExceptions are - * also ignored. We want to continue this loop until - * both Futures are done. Interrupts are not trapped, so - * an interrupt will still exit the loop. - */ while (!futSnd.isDone() || !futRec.isDone()) { /* * Make sure leader's quorum token remains valid for @@ -1969,7 +1958,7 @@ */ member.assertLeader(token); try { - futSnd.get(10L, TimeUnit.MILLISECONDS); + futSnd.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -1977,7 +1966,7 @@ * if not done. */ try { - futRec.get(1L, TimeUnit.SECONDS); + futRec.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { @@ -1988,7 +1977,7 @@ */ } try { - futRec.get(10L, TimeUnit.MILLISECONDS); + futRec.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -1996,7 +1985,7 @@ * if not done. */ try { - futSnd.get(10L, TimeUnit.MILLISECONDS); + futSnd.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { @@ -2582,7 +2571,7 @@ */ member.getQuorum().assertQuorum(token); try { - futRec.get(1L, TimeUnit.SECONDS); + futRec.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -2590,7 +2579,7 @@ * if not done. */ try { - futRep.get(1L, TimeUnit.SECONDS); + futRep.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { @@ -2601,7 +2590,7 @@ */ } try { - futRep.get(10L, TimeUnit.MILLISECONDS); + futRep.get(500L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException ignore) { /* @@ -2609,7 +2598,7 @@ * if not done. */ try { - futRec.get(1L, TimeUnit.SECONDS); + futRec.get(500L, TimeUnit.MILLISECONDS); } catch(TimeoutException ex) { // Ignore. } catch(ExecutionException ex) { // Ignore. } finally { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |