From: <tho...@us...> - 2013-05-18 13:07:13
|
Revision: 7145 http://bigdata.svn.sourceforge.net/bigdata/?rev=7145&view=rev Author: thompsonbry Date: 2013-05-18 13:07:04 +0000 (Sat, 18 May 2013) Log Message: ----------- Bug fix for the consensus releaseTime protocol. It was hitting a clock skew error and then failing to message back to the leader with an appropriate response such that the barrier would break and the commit could either proceed or be rejected. @see https://sourceforge.net/apps/trac/bigdata/ticket/677 (HA deadlock under UPDATE + QUERY) Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -7010,12 +7010,15 @@ public void gatherMinimumVisibleCommitTime( final IHAGatherReleaseTimeRequest req) throws IOException { - if (haLog.isInfoEnabled()) haLog.info("req=" + req); + if (haLog.isInfoEnabled()) + haLog.info("req=" + req); // Clear the old outcome. Reference SHOULD be null. Ensure not running. final Future<Void> oldFuture = gatherFuture.getAndSet(null); - if(oldFuture!=null&&!oldFuture.isDone()) oldFuture.cancel(true/*mayInterruptIfRunning*/); + if (oldFuture != null && !oldFuture.isDone()) + oldFuture.cancel(true/* mayInterruptIfRunning */); + final Callable<Void> task = ((AbstractHATransactionService) AbstractJournal.this .getLocalTransactionManager() .getTransactionService()) @@ -7038,12 +7041,48 @@ final IHANotifyReleaseTimeRequest req) throws IOException, InterruptedException, BrokenBarrierException { + /* + * Note: Pass through [req] without checks. We need to get this + * message to the CyclicBarrier regardless of whether it is + * well-formed or valid. + */ + return ((HATXSGlue) AbstractJournal.this .getLocalTransactionManager().getTransactionService()) .notifyEarliestCommitTime(req); } + + /** + * This exposes the clock used to assign transaction identifiers and + * commit times. It is being exposed to support certain kinds of + * overrides for unit tests. + * <p> + * Note: This method is NOT exposed to RMI. However, it can still be + * overridden by the unit tests. + * + * @return The next timestamp from that clock. + */ + public long nextTimestamp() { + try { + + return AbstractJournal.this.getLocalTransactionManager() + .getTransactionService().nextTimestamp(); + + } catch (IOException ex) { + + /* + * Note: This is a local method call. IOException will not be + * thrown. + */ + + throw new RuntimeException(ex); + + } + + } + /* * IService */ Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/journal/Journal.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -362,7 +362,17 @@ /** * The message from each of those followers providing their local - * earliest visible commit point. + * earliest visible commit point. + * <p> + * Note: The {@link ConcurrentHashMap} does NOT allow <code>null</code> + * values. Further the {@link IHANotifyReleaseTimeRequest} specifies the + * serviceId of the follower. Therefore, a follower whose + * {@link GatherTask} fails MUST provide a "mock" + * {@link IHANotifyReleaseTimeRequest} that it will use to wait at the + * {@link CyclicBarrier}. + * + * @see InnerJournalTransactionService#notifyEarliestCommitTime(IHANotifyReleaseTimeRequest) + * @see GatherTask */ final private Map<UUID, IHANotifyReleaseTimeRequest> responses = new ConcurrentHashMap<UUID, IHANotifyReleaseTimeRequest>(); @@ -1347,12 +1357,26 @@ this.req = req; } - + + /** + * Note: This needs to be robust to most kinds of errors. However, + * if the quorum breaks (leader leaves) of if a follower leaves that + * was joined with the met quorum as of the atomic decision point in + * commitNow(), then that change will be detected by the leader and + * it will break the {@link CyclicBarrier}. + */ public Void call() throws Exception { if (log.isInfoEnabled()) log.info("Running gather on follower"); + /* + * These variables are set in the try {} below. If we can + * discover the leader, then we will eventually respond either + * in the try{} or in the finally{}. + */ + long now = 0L; + UUID serviceId = null; HAGlue leader = null; boolean didNotifyLeader = false; @@ -1361,27 +1385,59 @@ try { - // This timestamp is used to help detect clock skew. - final long now = nextTimestamp(); - - // Verify event on leader occurs before event on follower. - assertBefore(req.getTimestampOnLeader(), now); - final long token = req.token(); + /* + * we do not need to handle the case where the token is + * invalid. The leader will reset() the CylicBarrier for + * this case. + */ getQuorum().assertQuorum(token); + /* + * If the quorumService is null because this service is + * shutting down then the leader will notice the + * serviceLeave() and reset() the CyclicBarrier. + */ final QuorumService<HAGlue> quorumService = getQuorum() .getClient(); + // The serviceId for this service. + serviceId = quorumService.getServiceId(); + + /* + * This timestamp is used to help detect clock skew. + * + * Note: This deliberately uses the (non-remote) + * nextTimestamp() method on BasicHA. This is being done so + * we can write a unit test of the GatherTask that imposes + * clock skew by overridding the next value to be returned + * by that method. + */ + now = ((BasicHA)quorumService.getService()).nextTimestamp(); + + /* + * If the token is invalid, making it impossible for us to + * discover and message the leader, then then leader will + * reset() the CyclicBarrier. + */ + leader = quorumService.getLeader(token); + + /* + * Note: At this point we have everything we need to form up + * our response. If we hit an assertion, we will still + * respond in the finally {} block below. + */ + + /* Verify event on leader occurs before event on follower. + */ + assertBefore(req.getTimestampOnLeader(), now); + if (!quorumService.isFollower(token)) throw new QuorumException(); - leader = quorumService.getLeader(token); + final IHANotifyReleaseTimeRequest req2 = newHANotifyReleaseTimeRequest(serviceId); - final IHANotifyReleaseTimeRequest req2 = newHANotifyReleaseTimeRequest(quorumService - .getServiceId()); - /* * RMI to leader. * @@ -1464,12 +1520,16 @@ if (!didNotifyLeader && leader != null) { /* - * Send a [null] to the leader so it does not block - * forever waiting for our response. + * Send mock response to the leader so it does not block + * forever waiting for our response. The mock response MUST + * include our correct serviceId. */ try { - leader.notifyEarliestCommitTime(null/* resp */); + final IHANotifyReleaseTimeRequest resp = new HANotifyReleaseTimeRequest( + serviceId, 0L/* pinnedCommitTime */, + 1L/* pinnedCommitCounter */, now/* timestamp */); + leader.notifyEarliestCommitTime(resp); } catch (Throwable t2) { log.error(t2, t2); } @@ -1503,18 +1563,33 @@ * request as this barrier instance. That will let us detect a service * that responds late (after a transient disconnect) when the leader has * moved on to another commit. See BarrierState#token for more on this. - * [Note that [req] can be [null if the follower was unable to produce a - * valid response.] + * [Note that [req] can not safely be [null] since the follower must + * self-report its serviceId.] */ @Override public IHANotifyReleaseTimeResponse notifyEarliestCommitTime( final IHANotifyReleaseTimeRequest req) throws IOException, InterruptedException, BrokenBarrierException { + /* + * Note: Do NOT error check [req] until we are in the try{} / + * finally {} below that will do the CyclicBarrier.await(). + */ + final BarrierState barrierState = barrierRef.get(); - if (barrierState == null) + if (barrierState == null) { + + /* + * If the BarrierState reference has been cleared then it is not + * possible for us to count down at the barrier for this message + * (since the CyclicBarrier is gone). Otherwise, we will await() + * at the CyclicBarrier regardless of the message. + */ + throw new IllegalStateException(); + + } try { @@ -1534,6 +1609,9 @@ * Note: We want to await() on the barrier even if there is an * error in the try{} block. This is necessary to decrement the * barrier count down to zero. + * + * TODO If there is an error, we could reset() the barrier + * instead. */ // follower blocks on Thread on the leader here. @@ -1545,9 +1623,10 @@ final IHANotifyReleaseTimeResponse resp = barrierState.consensus; if (resp == null) { - - throw new RuntimeException("No consensus"); - + /* + * Log error, but return anyway. + */ + haLog.error("No consensus"); } return resp; Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -1795,6 +1795,16 @@ + journal.getRootBlockView().getCommitCounter()); innerRunStateStr.append(", haReady=" + getHAReady()); innerRunStateStr.append(", haStatus=" + getHAStatus()); + innerRunStateStr + .append(", serviceId=" + quorumService == null ? "N/A" + : quorumService.getServiceId()); + /* + * TODO This is not a TXS timestamp. That would be more useful but I + * want to avoid taking the TXS lock. [It looks like the TXS does + * not need that synchronized keyword on nextTimestamp(). Try + * removing it and then using it here.] + */ + innerRunStateStr.append(", now=" + System.currentTimeMillis()); final String msg = server.getOperatorAlert(); if (msg != null) innerRunStateStr.append(", msg=[" + msg + "]"); Modified: branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; @@ -185,6 +186,22 @@ */ public void voteNo() throws IOException; + /** + * Set the next value to be reported by {@link BasicHA#nextTimestamp()}. + * <p> + * Note: Only a few specific methods call against + * {@link BasicHA#nextTimestamp()}. They do so precisely because they + * were written to allow us to override the clock in the test suite + * using this method. + * + * @param nextTimestamp + * when <code>-1L</code> the behavior will revert to the + * default. Any other value will be the next value reported + * by {@link BasicHA#nextTimestamp()}, after which the + * behavior will revert to the default. + */ + public void setNextTimestamp(long nextTimestamp) throws IOException; + } /** @@ -314,6 +331,8 @@ */ private final AtomicBoolean voteNo = new AtomicBoolean(false); + private final AtomicLong nextTimestamp = new AtomicLong(-1L); + private HAGlueTestImpl(final UUID serviceId) { super(serviceId); @@ -374,7 +393,14 @@ public void voteNo() throws IOException { voteNo.set(true); } + + @Override + public void setNextTimestamp(long nextTimestamp) throws IOException { + this.nextTimestamp.set(nextTimestamp); + + } + /** * Conditionally fail the method if (a) it is registered in the * {@link #failSet} and (b) it is due to fail on this invocation. @@ -720,7 +746,28 @@ return super.notifyEarliestCommitTime(req); } + + /** + * {@inheritDoc} + * <p> + * Note: This is NOT an RMI method, but we want to be able to override + * it anyway to test the releaseTime consensus protocol. + */ + @Override + public long nextTimestamp() { + final long t = nextTimestamp.getAndSet(-1L); + + if (t == -1L) { + + return super.nextTimestamp(); + + } + + return t; + + } + /* * HACommitGlue */ Modified: branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -550,6 +550,24 @@ } /** + * This stress test was written after seeing rare failures in + * testStartAB_C_MultiTransactionResync_5tx_then_200ms_delay. + * + * Currently it offers a reliable failure. + * + * @throws Exception + */ + public void _testStressStartAB_C_MultiTransactionResync() + throws Exception { + + for (int i = 0; i < 50; i++) { + doStartAB_C_MultiTransactionResync(200, 5); + destroyAll(); + } + + } + + /** * Test where C starts after <i>initialTransactions</i> on A+B. A series of * transactions are issued with the specified delay. * Modified: branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-05-18 11:52:12 UTC (rev 7144) +++ branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-05-18 13:07:04 UTC (rev 7145) @@ -27,6 +27,7 @@ package com.bigdata.journal.jini.ha; import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,6 +37,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAStatusEnum; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; +import com.bigdata.ha.msg.IHANotifyReleaseTimeRequest; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; import com.bigdata.util.InnerCause; @@ -263,7 +265,7 @@ awaitCommitCounter(2L, startup.serverA, startup.serverC); /* - * FIXME Unlike the test above, if there is a problem making the RMI + * Note: Unlike the test above, if there is a problem making the RMI * call, then B will not go through its doRejectedCommit() handler and * will not enter the ERROR state directly. We need to have B notice * that it is no longer at the same commit point, e.g., by observing a @@ -273,20 +275,20 @@ * does not know that there was an attempt to PREPARE since it did not * get the prepare2Phase() message. * - * - Modify HAJournalServer to enter the error state if we observe a + * - Modified HAJournalServer to enter the error state if we observe a * live write cache block for a commitCounter != the expected * commitCounter. * - * - Modify commit2Phase() to accept the #of services that are + * - Modified commit2Phase() to accept the #of services that are * participating in the commit. If it is not a full quorum, then we can * not purge the HA logs in commit2Phase() regardless of what the quorum * state looks like. * - * - Modify this test to do another transaction. B can not notice the + * - Modified this test to do another transaction. B can not notice the * problem until there is another write cache flushed through the * pipeline. * - * - Modify this test to await B to move to the end of the pipeline, + * - Modified this test to await B to move to the end of the pipeline, * resync, and rejoin. */ @@ -320,5 +322,122 @@ assertEquals(token, quorum.token()); } - + + /** + * This test forces clock skew on one of the followers causing it to + * encounter an error in its GatherTask. This models the problem that was + * causing a deadlock in an HA3 cluster with BSBM UPDATE running on the + * leader (EXPLORE was running on the follower, but analysis of the root + * cause shows that this was not required to trip the deadlock). The + * deadlock was caused by clock skew resulting in an exception and either + * {@link IHANotifyReleaseTimeRequest} message that was <code>null</code> + * and thus could not be processed or a failure to send that message back to + * the leader. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/677" > HA + * deadlock under UPDATE + QUERY </a> + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/673" > DGC + * in release time consensus protocol causes native thread leak in + * HAJournalServer at each commit </a> + */ + public void testStartABC_releaseTimeConsensusProtocol_clockSkew() + throws Exception { + + // Enforce the join order. + final ABC startup = new ABC(true /*sequential*/); + + final long token = awaitFullyMetQuorum(); + + // Should be one commit point. + awaitCommitCounter(1L, startup.serverA, startup.serverB, + startup.serverC); + + /* + * Setup B with a significant clock skew to force an error during the + * GatherTask. + */ + ((HAGlueTest) startup.serverB).setNextTimestamp(10L); + + try { + + // Simple transaction. + simpleTransaction(); + + } catch (Throwable t) { + /* + * TODO This test is currently failing because the consensus + * releaseTime protocol will fail if one of the joined services + * reports an error. The protocol should be robust to an error and + * move forward if a consensus can be formed. If a consensus can not + * be formed (due to some curable error), then any queries running + * on that service should break (force a service leave). Else, if + * the remaining services were to advance the release time since + * otherwise the service could not get through another releaseTime + * consensus protocol exchange successfully if it is reading on a + * commit point that has been released by the other services. + */ + if (!InnerCause.isInnerCause(t, BrokenBarrierException.class)) { + /* + * Wrong inner cause. + */ + fail("Expecting " + BrokenBarrierException.class, t); + } + + } + + // Should be one commit point. + awaitCommitCounter(1L, startup.serverA, startup.serverB, + startup.serverC); + + final long token1 = awaitFullyMetQuorum(); + + /* + * Should have formed a new quorum (each service should have done a + * rejected commit, forced a service leave, and then cured that error + * through seek consensus). + */ + assertEquals(token + 1, token1); + +// // Verify quorum is unchanged. +// assertEquals(token, quorum.token()); +// +// // Should be two commit points on {A,C]. +// awaitCommitCounter(2L, startup.serverA, startup.serverC); +// +// // Should be ONE commit points on {B}. +// awaitCommitCounter(1L, startup.serverB); +// +// /* +// * We use a simple transaction to force B to notice that it missed a +// * commit. B will notice that it did not join in the 2-phase commit when +// * the next live write cache block flows through the pipeline and it is +// * associated with a commitCounter that is GT the commitCounter which B +// * is expecting. That will force B into an Error state. From the Error +// * state, it will then resync and re-join the met quourm. +// */ +// simpleTransaction(); +// +// /* +// * The pipeline should be reordered. B will do a service leave, then +// * enter seek consensus, and then re-enter the pipeline. +// */ +// awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC, +// startup.serverB }); +// +// /* +// * There should be three commit points on {A,C,B} (note that this assert +// * does not pay attention to the pipeline order). +// */ +// awaitCommitCounter(3L, startup.serverA, startup.serverC, +// startup.serverB); +// +// // B should be a follower again. +// awaitHAStatus(startup.serverB, HAStatusEnum.Follower); +// +// // quorum token is unchanged. +// assertEquals(token, quorum.token()); + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |