From: <tho...@us...> - 2013-12-18 15:21:45
|
Revision: 7674 http://bigdata.svn.sourceforge.net/bigdata/?rev=7674&view=rev Author: thompsonbry Date: 2013-12-18 15:21:37 +0000 (Wed, 18 Dec 2013) Log Message: ----------- Added logic in the resetPipeline implementation to await a pipeline change (up to a timeout) such that the problem service is no longer a neighbor of a given service. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + public class HAPipelineResetRequest implements IHAPipelineResetRequest { /** @@ -31,11 +33,16 @@ private static final long serialVersionUID = 1L; private long token; - - public HAPipelineResetRequest(final long token) { + private UUID problemServiceId; + private long timeoutNanos; + + public HAPipelineResetRequest(final long token, + final UUID problemServiceId, final long timeoutNanos) { this.token = token; + this.problemServiceId = problemServiceId; + this.timeoutNanos = timeoutNanos; } - + @Override public long token() { return token; @@ -43,7 +50,18 @@ @Override public String toString() { - return super.toString() + "{token=" + token + "}"; + return super.toString() + "{token=" + token + ", problemServiceId=" + + problemServiceId + ", timeoutNanos=" + timeoutNanos + "}"; } + + @Override + public UUID getProblemServiceId() { + return problemServiceId; + } + + @Override + public long getTimeoutNanos() { + return timeoutNanos; + } } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + import com.bigdata.ha.msg.IHAMessage; /** @@ -36,5 +38,21 @@ * The quorum token in effect on the leader when this request was generated. */ long token(); + + /** + * The {@link UUID} of the service that the leader has forced from the + * pipeline + * + * @return The {@link UUID} of the problem service -or- <code>null</code> if + * the leader did not identify a problem service. + */ + UUID getProblemServiceId(); + /** + * How long to await the state where the problem service is no longer part + * of the write pipeline for a service that is upstream or downstream of the + * problem service. + */ + long getTimeoutNanos(); + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -46,7 +46,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -229,6 +229,12 @@ */ private final ReentrantLock lock = new ReentrantLock(); + /** + * Condition signalled when the write replication pipeline has been changed + * (either the upstream and/or downstream service was changed). + */ + private final Condition pipelineChanged = lock.newCondition(); + /** send service (iff this is the leader). */ private HASendService sendService; @@ -595,6 +601,10 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId == newDownStreamId) { + // Nothing to do (both null or same UUID reference). + return; + } if (oldDownStreamId != null && newDownStreamId != null && oldDownStreamId.equals(newDownStreamId)) { /* @@ -604,8 +614,10 @@ return; } // The address of the next service in the pipeline. - final InetSocketAddress addrNext = newDownStreamId == null ? null - : getAddrNext(newDownStreamId); +// final InetSocketAddress addrNext = getAddrNext(newDownStreamId); + final PipelineState<S> nextState = getAddrNext(newDownStreamId); + final InetSocketAddress addrNext = nextState == null ? null + : nextState.addr; if (log.isInfoEnabled()) log.info("oldDownStreamId=" + oldDownStreamId + ",newDownStreamId=" + newDownStreamId @@ -636,7 +648,10 @@ receiveService.changeDownStream(addrNext); } // populate and/or clear the cache. - cachePipelineState(newDownStreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(newDownStreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); if (log.isDebugEnabled()) log.debug("pipelineChange - done."); } finally { @@ -657,6 +672,8 @@ if (log.isInfoEnabled()) log.info("receiveService=" + receiveService); receiveService.changeUpStream(); + // Signal pipeline change. + pipelineChanged.signalAll(); } } finally { lock.unlock(); @@ -688,7 +705,7 @@ * * @return It's {@link InetSocketAddress} */ - private InetSocketAddress getAddrNext(final UUID downStreamId) { + private PipelineState<S> getAddrNext(final UUID downStreamId) { if (downStreamId == null) return null; @@ -697,9 +714,10 @@ try { - final InetSocketAddress addrNext = service.getWritePipelineAddr(); + final InetSocketAddress addrNext = service + .getWritePipelineAddr(); - return addrNext; + return new PipelineState<S>(service, addrNext); } catch (IOException e) { @@ -757,50 +775,52 @@ } // clear cache. pipelineStateRef.set(null); + // Signal pipeline change. + pipelineChanged.signalAll(); } finally { lock.unlock(); } } - /** - * Populate or clear the {@link #pipelineState} cache. - * <p> - * Note: The only times we need to populate the {@link #pipelineState} are - * in response to a {@link #pipelineChange(UUID, UUID)} event or in response - * to message a {@link #pipelineElectedLeader()} event. - * - * @param downStreamId - * The downstream service {@link UUID}. - */ - private void cachePipelineState(final UUID downStreamId) { - - if (downStreamId == null) { - - pipelineStateRef.set(null); - - return; - - } - - final S nextService = member.getService(downStreamId); - - final PipelineState<S> pipelineState = new PipelineState<S>(); - - try { - - pipelineState.addr = nextService.getWritePipelineAddr(); - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - pipelineState.service = nextService; - - pipelineStateRef.set(pipelineState); - - } +// /** +// * Populate or clear the {@link #pipelineState} cache. +// * <p> +// * Note: The only times we need to populate the {@link #pipelineState} are +// * in response to a {@link #pipelineChange(UUID, UUID)} event or in response +// * to message a {@link #pipelineElectedLeader()} event. +// * +// * @param downStreamId +// * The downstream service {@link UUID}. +// */ +// private void cachePipelineState(final UUID downStreamId) { +// +// if (downStreamId == null) { +// +// pipelineStateRef.set(null); +// +// return; +// +// } +// +// final S nextService = member.getService(downStreamId); +// +// final PipelineState<S> pipelineState = new PipelineState<S>(); +// +// try { +// +// pipelineState.addr = nextService.getWritePipelineAddr(); +// +// } catch (IOException e) { +// +// throw new RuntimeException(e); +// +// } +// +// pipelineState.service = nextService; +// +// pipelineStateRef.set(pipelineState); +// +// } /** * Setup the send service. @@ -825,15 +845,20 @@ * handle downstreamId != null conditionally. */ final UUID downstreamId = member.getDownstreamServiceId(); - if (downstreamId != null) { - // The address of the next service in the pipeline. - final InetSocketAddress addrNext = member.getService( - downstreamId).getWritePipelineAddr(); + // The address of the next service in the pipeline. + final PipelineState<S> nextState = getAddrNext(downstreamId); + if (nextState != null) { +// // The address of the next service in the pipeline. +// final InetSocketAddress addrNext = member.getService( +// downstreamId).getWritePipelineAddr(); // Start the send service. - sendService.start(addrNext); + sendService.start(nextState.addr); } // populate and/or clear the cache. - cachePipelineState(downstreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(downstreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { try { tearDown(); @@ -865,11 +890,16 @@ final InetSocketAddress addrSelf = member.getService() .getWritePipelineAddr(); // Address of the downstream service (if any). - final InetSocketAddress addrNext = downstreamId == null ? null - : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = downstreamId == null ? null +// : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = getAddrNext(downstreamId); + final PipelineState<S> nextServiceState = getAddrNext(downstreamId); + final InetSocketAddress addrNext = nextServiceState == null ? null + : nextServiceState.addr; // Setup the receive service. - receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, - addrNext, new IHAReceiveCallback<HAMessageWrapper>() { + receiveService = new HAReceiveService<HAMessageWrapper>( + addrSelf, addrNext, + new IHAReceiveCallback<HAMessageWrapper>() { @Override public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { @@ -892,6 +922,8 @@ // Start the receive service - will not return until service is // running receiveService.start(); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { /* * Always tear down if there was a setup problem to avoid leaking @@ -1251,21 +1283,92 @@ } + /** + * If there is an identified problem service and that service is either + * our upstream or downstream service, then we need to wait until we + * observe a pipeline change event such that it is no longer our + * upstream or downstream service. Otherwise we can go ahead and reset + * our pipeline. + * + * TODO What if this service is the problem service? + */ + private boolean isProblemServiceOurNeighbor() { + + final UUID psid = req.getProblemServiceId(); + + if (psid == null) { + + return false; + + } + + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + + if (psid.equals(priorAndNext[0])) + return true; + + if (psid.equals(priorAndNext[1])) + return true; + + return false; + + } + private IHAPipelineResetResponse doRunWithLock() throws Exception { log.warn("Will reset pipeline: req=" + req); - // tear down send and/or receive services. - innerEventHandler.tearDown(); + final long begin = System.nanoTime(); + final long timeout = req.getTimeoutNanos(); + long remaining = timeout; - // The current pipeline order. - final UUID[] pipelineOrder = member.getQuorum().getPipeline(); - // The index of this service in the pipeline order. - final int index = getIndex(serviceId, pipelineOrder); - if (index == 0) { - innerEventHandler.setUpSendService(); - } else if (index > 0) { - innerEventHandler.setUpReceiveService(); + if (isProblemServiceOurNeighbor()) { + + log.warn("Problem service is our neighbor."); + + do { + + pipelineChanged.await(remaining, TimeUnit.NANOSECONDS); + + // remaining = timeout - elapsed + remaining = timeout - (begin - System.nanoTime()); + + } while (isProblemServiceOurNeighbor() && remaining > 0); + + if (isProblemServiceOurNeighbor()) { + + /* + * Timeout elapsed. + * + * Note: This could be a false timeout, e.g., the problem + * service left and re-entered and is still our neighbor. + * However, the leader will just log and ignore the problem. + * If the pipeline is working again, then all is good. If + * not, then it will force out the problem service and reset + * the pipeline again. + */ + throw new TimeoutException(); + + } + + } else { + + log.warn("Problem service is not our neighbor."); + + // tear down send and/or receive services. + innerEventHandler.tearDown(); + + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); + } + } return new HAPipelineResetResponse(); @@ -1835,7 +1938,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futSnd.isDone() || !futRec.isDone()) { /* @@ -1846,16 +1954,24 @@ try { futSnd.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRec.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ + futRec.get(); futSnd.get(); - futRec.get(); } finally { if (!futRec.isDone()) { @@ -1989,8 +2105,8 @@ } catch (Throwable e) { - // Log and continue. - log.error("Problem on reset pipeline", e); + // Log and continue. Details are logged by resetPipeline(). + log.warn("Problem(s) on reset pipeline: " + e); if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. @@ -2046,7 +2162,9 @@ member.assertLeader(token); - final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token); + // TODO Configure timeout on HAJournalServer. + final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token, + problemServiceId, TimeUnit.MILLISECONDS.toNanos(5000)); /* * To minimize latency, we first submit the futures for the other @@ -2400,7 +2518,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futRec.isDone() || !futRep.isDone()) { /* @@ -2414,17 +2537,25 @@ try { futRec.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRep.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ futRec.get(); futRep.get(); - + } finally { if (!futRep.isDone()) { // cancel remote Future unless done. @@ -2522,10 +2653,17 @@ */ public S service; + /** Deserialization constructor. */ + @SuppressWarnings("unused") public PipelineState() { } + public PipelineState(final S service, final InetSocketAddress addr) { + this.service = service; + this.addr = addr; + } + @SuppressWarnings("unchecked") public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |