From: <tho...@us...> - 2012-09-09 15:40:48
|
Revision: 6552 http://bigdata.svn.sourceforge.net/bigdata/?rev=6552&view=rev Author: thompsonbry Date: 2012-09-09 15:40:41 +0000 (Sun, 09 Sep 2012) Log Message: ----------- Resolved problem where a restart of a follower after a 2-phase commit would not result in a quorum meet. The root cause was that the leader was failing to recast its vote for its (then current) last commit time following a quorum break. Some bugs in AbstractQuorum and the test suite were also identified. With this commit, you can now restart either the leader and/or the follower and the quorum will meet. {{{ (*) Found a bug in QuorumActorBase.conditionalCastVote() where it would return immediately if the service had cast ANY vote. The code will now verify that the service has case the desired vote before returning. If any other vote was cast, then the vote will be withdrawn and then a new vote cast. (*) Found a bug in AbstractQuorumTestCase where it was not computing the remaining nanoseconds correctly when awaiting a condition to succeed. The same bug was present in the zk quorum test suite. I modified the zk version to invoke the fixed version on AbstractQuorurmTestCase. (*) Modified QuorumWatcherBase.clearToken() to invoke withdrawVote() rather than serviceLeave() if the service was joined. The quorum CI tests are now green. (*) Further modified QuorumWatcherBase.clearToken() to invoke castVote(lastCommitTime) if the QuorumMember is a QuorumService and moved the call to conditionalAddPipeline() into conditionalCastVote(). You can not stop the follower and the leader will recast its vote for its then current last commit time while remaining in the pipeline (if fact, it is not leaving the pipeline at all during that transition, which is nicer than having it leave and reenter the pipeline). When the follower restarts, the quorum meets. Bingo. If you instead stop the leader and then restart it, the quorum again meets. However, the two services have now switch roles (because they are in a different pipeline order) and B has become the leader. Again, that is exactly right. }}} Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HACommitGlue.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/AbstractQuorumTestCase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestAll.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HACommitGlue.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HACommitGlue.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/HACommitGlue.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -32,7 +32,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import com.bigdata.concurrent.TimeoutException; import com.bigdata.journal.AbstractJournal; /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -53,7 +53,7 @@ return member.getService(serviceId); } - + /** * Cancel the requests on the remote services (RMI). This is a best effort * implementation. Any RMI related errors are trapped and ignored in order Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumService.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -58,4 +58,10 @@ public interface QuorumService<S extends HAGlue> extends QuorumMember<S>, QuorumRead<S>, QuorumCommit<S>, QuorumPipeline<S> { + /** + * Return the lastCommitTime for this service (based on its current root + * block). + */ + long getLastCommitTime(); + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -117,6 +117,7 @@ } + @Override public S getService() { return service; @@ -139,6 +140,7 @@ } + @Override public Executor getExecutor() { return getLocalService().getExecutorService(); @@ -154,18 +156,21 @@ * QuorumPipeline */ + @Override public HAReceiveService<HAWriteMessage> getHAReceiveService() { return pipelineImpl.getHAReceiveService(); } + @Override public HASendService getHASendService() { return pipelineImpl.getHASendService(); } + @Override public Future<Void> receiveAndReplicate(HAWriteMessage msg) throws IOException { @@ -173,6 +178,7 @@ } + @Override public Future<Void> replicate(HAWriteMessage msg, ByteBuffer b) throws IOException { @@ -201,6 +207,7 @@ * QuorumCommit. */ + @Override public void abort2Phase(final long token) throws IOException, InterruptedException { @@ -208,6 +215,7 @@ } + @Override public void commit2Phase(final long token, final long commitTime) throws IOException, InterruptedException { @@ -215,6 +223,7 @@ } + @Override public int prepare2Phase(final boolean isRootBlock0, final IRootBlockView rootBlock, final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException, @@ -224,10 +233,20 @@ } + @Override + public long getLastCommitTime() { + + final L localService = getLocalService(); + + return localService.getLastCommitTime(); + + } + /* * QuorumRead */ + @Override public byte[] readFromQuorum(UUID storeId, long addr) throws InterruptedException, IOException { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -4646,6 +4646,12 @@ // local abort (no quorum, so we can do 2-phase abort). _abort(); + /* + * Note: We can not re-cast our vote until our last vote is + * widthdrawn. That is currently done by QuorumWatcherBase. So, + * we have to wait until we observe that to cast a new vote. + */ + } else if (didMeet) { quorumToken = newValue; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -51,6 +51,7 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAPipelineGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; @@ -1291,14 +1292,13 @@ if (!members.contains(serviceId)) throw new QuorumException(ERR_NOT_MEMBER + serviceId); /* - * FIXME This has been modified to automatically add the service - * back to the pipeline, which we need to do following a service - * leave or quorum break. However, this conflicts with the - * pre-/post- conditions declared in QuorumActor. + * Note: This has been modified to automatically add the service + * to the pipeline and the javadoc for the pre-/post- conditions + * declared in QuorumActor has been updated (9/9/2012). The + * change is inside of conditionalCastVoteImpl(). */ // if (!pipeline.contains(serviceId)) // throw new QuorumException(ERR_NOT_PIPELINE + serviceId); - conditionalPipelineAddImpl(); conditionalCastVoteImpl(lastCommitTime); } catch(InterruptedException e) { // propagate the interrupt. @@ -1513,16 +1513,24 @@ private void conditionalCastVoteImpl(final long lastCommitTime) throws InterruptedException { - final Set<UUID> tmp = votes.get(lastCommitTime); - if (tmp != null && tmp.contains(serviceId)) { - // The service has already cast this vote. - return; + final Set<UUID> votesForCommitTime = votes.get(lastCommitTime); + if (votesForCommitTime != null + && votesForCommitTime.contains(serviceId)) { + // The service has already cast *a* vote. + final Long lastCommitTime2 = getCastVote(serviceId); + if (lastCommitTime2 != null + && lastCommitTime2.longValue() == lastCommitTime) { + // The service has already cast *this* vote. + return; + } } if (log.isDebugEnabled()) log.debug("serviceId=" + serviceId + ",lastCommitTime=" + lastCommitTime); // Withdraw any existing vote by this service. conditionalWithdrawVoteImpl(); + // Ensure part of the pipeline. + conditionalPipelineAddImpl(); // Cast a vote. doCastVote(lastCommitTime); Long t = null; @@ -1539,7 +1547,9 @@ while (getCastVote(serviceId) != null) { votesChange.await(); } - } + if (log.isDebugEnabled()) + log.debug("withdrew vote: serviceId=" + serviceId + + ",lastCommitTime=" + lastCommitTime); } } private void conditionalPipelineAddImpl() throws InterruptedException { @@ -2276,17 +2286,18 @@ lock.lock(); try { // Look for a set of votes for that lastCommitTime. - LinkedHashSet<UUID> tmp = votes.get(lastCommitTime); - if (tmp == null) { + LinkedHashSet<UUID> votesForCommitTime = votes + .get(lastCommitTime); + if (votesForCommitTime == null) { // None found, so create an empty set now. - tmp = new LinkedHashSet<UUID>(); + votesForCommitTime = new LinkedHashSet<UUID>(); // And add it to the map. - votes.put(lastCommitTime, tmp); + votes.put(lastCommitTime, votesForCommitTime); } - if (tmp.add(serviceId)) { + if (votesForCommitTime.add(serviceId)) { // The service cast its vote. votesChange.signalAll(); - final int nvotes = tmp.size(); + final int nvotes = votesForCommitTime.size(); if (log.isInfoEnabled()) log.info("serviceId=" + serviceId.toString() + ", lastCommitTime=" + lastCommitTime @@ -2315,7 +2326,7 @@ } if (client != null) { final UUID clientId = client.getServiceId(); - final UUID[] voteOrder = tmp.toArray(new UUID[0]); + final UUID[] voteOrder = votesForCommitTime.toArray(new UUID[0]); if (nvotes == kmeet && clientId.equals(voteOrder[0])) { /* @@ -2400,13 +2411,13 @@ while (itr.hasNext()) { final Map.Entry<Long, LinkedHashSet<UUID>> entry = itr .next(); - final Set<UUID> votes = entry.getValue(); - if (votes.remove(serviceId)) { + final Set<UUID> votesForCommitTime = entry.getValue(); + if (votesForCommitTime.remove(serviceId)) { // The vote was withdrawn. votesChange.signalAll(); sendEvent(new E(QuorumEventEnum.WITHDRAW_VOTE, lastValidToken, token, serviceId)); - if (votes.size() + 1 == kmeet) { + if (votesForCommitTime.size() + 1 == kmeet) { final QuorumMember<S> client = getClientAsMember(); if (client != null) { // Tell the client that the consensus was lost. @@ -2421,7 +2432,7 @@ if (log.isInfoEnabled()) log.info("serviceId=" + serviceId + ", lastCommitTime=" + entry.getKey()); - if (votes.isEmpty()) { + if (votesForCommitTime.isEmpty()) { // remove map entry with no votes cast. itr.remove(); } @@ -2619,15 +2630,61 @@ } sendEvent(new E(QuorumEventEnum.QUORUM_BROKE, lastValidToken, token, null/* serviceId */)); - if (client != null) { - final UUID clientId = client.getServiceId(); - if(joined.contains(clientId)) { - // If our client is joined, then force serviceLeave. -// new Thread() {public void run() {actor.serviceLeave();}}.start(); - doAction(new Runnable() {public void run() {actor.serviceLeave();}}); - } +/* + * Note: Replacing this code with the logic below fixes a problem where a leader + * was failing to update its lastCommitTime after a quorum break caused by + * a follower that was halted. The quorum could not meet after the follower + * was restarted because the leader had not voted for a lastCommitTime. The + * code below addresses that explicitly as long as the QuorumMember is a + * QuorumService. + */ +// if (client != null) { +// final UUID clientId = client.getServiceId(); +// if(joined.contains(clientId)) { +// // If our client is joined, then force serviceLeave. +//// new Thread() {public void run() {actor.serviceLeave();}}.start(); +// doAction(new Runnable() {public void run() {actor.serviceLeave();}}); +// } +// } + if (client != null) { + final UUID clientId = client.getServiceId(); + if (joined.contains(clientId)) { + final QuorumMember<S> member = getMember(); + if (member instanceof QuorumService) { + /* + * Set the last commit time. + * + * Note: After a quorum break, a service MUST + * recast its vote for it's then-current + * lastCommitTime. If it fails to do this, then + * it will be impossible for a consensus to form + * around the then current lastCommitTimes for + * the services. It appears to be quite + * difficult for the service to handle this + * itself since it can not easily recognize when + * it's old vote has been widthdrawn. Therefore, + * the logic to do this has been moved into the + * QuorumWatcherBase. + */ + final long lastCommitTime = ((QuorumService<?>) member) + .getLastCommitTime(); + doAction(new Runnable() { + public void run() { + // recast our vote. + actor.castVote(lastCommitTime); + } + }); + } else { + // just withdraw the vote. + doAction(new Runnable() { + public void run() { + actor.withdrawVote(); + } + }); } } + } + } } finally { lock.unlock(); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -54,7 +54,7 @@ * <dt>{@link #pipelineAdd()}</dt> * <dd>member.</dd> * <dt>{@link #castVote(long)}</dt> - * <dd>member, pipeline.</dd> + * <dd>member (service implicitly joins pipeline if not present).</dd> * <dt>{@link #serviceJoin()}</dt> * <dd>member, pipeline, consensus around cast vote, predecessor in the vote * order is joined</dd> @@ -129,12 +129,13 @@ void pipelineRemove(); /** - * Cast a vote on the behalf of the associated service. If the service has - * already voted for some other lastCommitTime, then that vote is withdrawn - * before the new vote is cast. Services do not withdraw their cast votes - * until a quorum breaks and a new consensus needs to be established. When - * it does, then need to consult their root blocks and vote their then - * current lastCommitTime. + * Cast a vote on the behalf of the associated service. If the service is + * not part of the pipeline, then it is implicitly added to the pipeline. If + * the service has already voted for some other lastCommitTime, then that + * vote is withdrawn before the new vote is cast. Services do not withdraw + * their cast votes until a quorum breaks and a new consensus needs to be + * established. When it does, then need to consult their root blocks and + * vote their then current lastCommitTime. * <p> * When a service needs to re-synchronize with a quorum, it initially votes * its current lastCommitTime. Once the service is receiving writes from the Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/AbstractQuorumTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/AbstractQuorumTestCase.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/AbstractQuorumTestCase.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -27,14 +27,18 @@ package com.bigdata.quorum; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; import junit.framework.TestCase2; import com.bigdata.quorum.MockQuorumFixture.MockQuorum; +import com.bigdata.quorum.MockQuorumFixture.MockQuorum.MockQuorumActor; import com.bigdata.quorum.MockQuorumFixture.MockQuorumMember; -import com.bigdata.quorum.MockQuorumFixture.MockQuorum.MockQuorumActor; /** * Abstract base class for testing using a {@link MockQuorumFixture}. @@ -174,37 +178,41 @@ static public void assertCondition(final Runnable cond, final long timeout, final TimeUnit units) { final long begin = System.nanoTime(); - long nanos = units.toNanos(timeout); - // remaining -= (now - begin) [aka elapsed] - nanos -= System.nanoTime() - begin; + final long nanos = units.toNanos(timeout); + long remaining = nanos; + // remaining = nanos - (now - begin) [aka elapsed] + remaining = nanos - (System.nanoTime() - begin); while (true) { - AssertionFailedError cause = null; try { // try the condition cond.run(); // success. return; - } catch (AssertionFailedError e) { - nanos -= System.nanoTime() - begin; - if (nanos < 0) { + } catch (final AssertionFailedError e) { + remaining = nanos - (System.nanoTime() - begin); + if (remaining < 0) { // Timeout - rethrow the failed assertion. throw e; } - cause = e; + // Sleep up to 10ms or the remaining nanos, which ever is less. + final int millis = (int) Math.min( + TimeUnit.NANOSECONDS.toMillis(remaining), 10); + if (millis > 0) { + // sleep and retry. + try { + Thread.sleep(millis); + } catch (InterruptedException e1) { + // propagate the interrupt. + Thread.currentThread().interrupt(); + return; + } + remaining = nanos - (System.nanoTime() - begin); + if (remaining < 0) { + // Timeout - rethrow the failed assertion. + throw e; + } + } } - // Sleep up to 10ms or the remaining nanos, which ever is less. - final int millis = (int) Math.min(TimeUnit.NANOSECONDS - .toMillis(nanos), 10); - if (log.isInfoEnabled()) - log.info("Will retry: millis=" + millis + ", cause=" + cause); - // sleep and retry. - try { - Thread.sleep(millis); - } catch (InterruptedException e1) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } } } @@ -225,5 +233,49 @@ assertCondition(cond, 5, TimeUnit.SECONDS); } + + /** + * Helper method provides nice rendering of a votes snapshot. + * <p> + * Note: The snapshot uses a {@link UUID}[] rather than a collection for + * each <code>lastCommitTime</code> key. However, by default toString() for + * an array does not provide a nice rendering. + * + * @param votes + * The votes. + * @return The human readable representation. + */ + public static String toString(final Map<Long, UUID[]> votes) { + + // put things into a ordered Collection. toString() for the Collection is nice. + final Map<Long, LinkedHashSet<UUID>> m = new LinkedHashMap<Long, LinkedHashSet<UUID>>(); + + for(Map.Entry<Long,UUID[]> e : votes.entrySet()) { + + final Long commitTime = e.getKey(); + + final UUID[] a = e.getValue(); + + LinkedHashSet<UUID> votesForCommitTime = m.get(commitTime); + + if(votesForCommitTime == null) { + + votesForCommitTime = new LinkedHashSet<UUID>(); + + m.put(commitTime, votesForCommitTime); + + } + + for (UUID uuid : a) { + + votesForCommitTime.add(uuid); + + } + + } + + return m.toString(); + + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestAll.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestAll.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -103,7 +103,11 @@ */ suite.addTestSuite(TestHA3QuorumSemantics.class); - suite.addTest(StressTestHA3.suite()); + /* + * Run the test HA3 suite a bunch of times. + */ + suite.addTest(StressTestHA3.suite()); + } return suite; Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -27,6 +27,7 @@ package com.bigdata.quorum; +import java.util.Map; import java.util.UUID; import com.bigdata.quorum.MockQuorumFixture.MockQuorumMember; @@ -388,7 +389,8 @@ /* * Should be two timestamps for which services have voted (but we can * only. check the one that enacted the change since that is the only - * one for which the update is guaranteed to be visible). + * one for which the update is guaranteed to be visible without awaiting + * the Condition). */ // assertEquals(2, quorum0.getVotes().size()); assertEquals(2, quorum1.getVotes().size()); @@ -403,8 +405,8 @@ // wait for quorums to meet (visibility guarantee). final long token1 = quorum0.awaitQuorum(); - quorum1.awaitQuorum(); - quorum2.awaitQuorum(); + assertEquals(token1, quorum1.awaitQuorum()); + assertEquals(token1, quorum2.awaitQuorum()); // The last consensus timestamp should have been updated for all quorum members. assertEquals(lastCommitTime1, client0.lastConsensusValue); @@ -478,9 +480,12 @@ assertEquals(-1L, client2.lastConsensusValue); // Should be no timestamps for which services have voted. - assertEquals(0, quorum0.getVotes().size()); - assertEquals(0, quorum1.getVotes().size()); - assertEquals(0, quorum2.getVotes().size()); + final Map<Long, UUID[]> votes0 = quorum0.getVotes(); + final Map<Long, UUID[]> votes1 = quorum1.getVotes(); + final Map<Long, UUID[]> votes2 = quorum2.getVotes(); + assertEquals(AbstractQuorumTestCase.toString(votes0), 0, votes0.size()); + assertEquals(AbstractQuorumTestCase.toString(votes1), 0, votes1.size()); + assertEquals(AbstractQuorumTestCase.toString(votes2), 0, votes2.size()); // Verify the specific services voting for each timestamp. assertEquals(null, quorum0.getVotes().get(lastCommitTime1)); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -363,7 +363,7 @@ @Override public void notify(final QuorumEvent e) { - System.err.println("QuorumEvent: "+e); + System.err.println("QuorumEvent: " + e);// FIXME remove logger. switch(e.getEventType()) { case CAST_VOTE: break; @@ -411,8 +411,7 @@ break; case SERVICE_LEAVE: break; - case WITHDRAW_VOTE: - break; + case WITHDRAW_VOTE: } } }); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -512,6 +512,9 @@ // zpath for the lastCommitTime. final String lastCommitTimeZPath = logicalServiceId + "/" + QUORUM + "/" + QUORUM_VOTES + "/" + lastCommitTime; + if (log.isInfoEnabled()) + log.info("lastCommitTime=" + lastCommitTime + + ", lastCommitTimeZPath=" + lastCommitTimeZPath); // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -587,6 +590,8 @@ // zpath for votes. final String votesZPath = logicalServiceId + "/" + QUORUM + "/" + QUORUM_VOTES; + if (log.isInfoEnabled()) + log.info("votesZPath=" + votesZPath); // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -1888,12 +1893,12 @@ } @Override - protected void add(UUID serviceId) { + protected void add(final UUID serviceId) { castVote(serviceId, lastCommitTime); } @Override - protected void remove(UUID serviceId) { + protected void remove(final UUID serviceId) { withdrawVote(serviceId); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -32,17 +32,12 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; -import junit.framework.AssertionFailedError; - import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; -import com.bigdata.quorum.AbstractQuorum; +import com.bigdata.quorum.AbstractQuorumTestCase; import com.bigdata.quorum.MockQuorumFixture; -import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumActor; -import com.bigdata.quorum.QuorumWatcher; -import com.bigdata.quorum.MockQuorumFixture.MockQuorum; import com.bigdata.zookeeper.AbstractZooTestCase; import com.bigdata.zookeeper.ZooKeeperAccessor; @@ -137,102 +132,17 @@ super.tearDown(); } - /** - * Wait up to a timeout until some condition succeeds. - * <p> - * Whenever more than one {@link AbstractQuorum} is under test there will be - * concurrent indeterminism concerning the precise ordering and timing as - * updates propagate from the {@link AbstractQuorum} which takes some action - * (castVote(), pipelineAdd(), etc.) to the other quorums attached to the - * same {@link MockQuorumFixture}. This uncertainty about the ordering and - * timing state changes is not dissimilar from the uncertainty we face in a - * real distributed system. - * <p> - * While there are times when this uncertainty does not affect the behavior - * of the tests, there are other times when we must have a guarantee that a - * specific vote order or pipeline order was established. For those cases, - * this method may be used to await an arbitrary condition. This method - * simply retries until the condition becomes true, sleeping a little after - * each failure. - * <p> - * Actions executed in the main thread of the unit test will directly update - * the internal state of the {@link MockQuorumFixture}, which is shared - * across the {@link MockQuorum}s. However, uncertainty about ordering can - * arise as a result of the interleaving of the actions taken by the - * {@link QuorumWatcher}s in response to both top-level actions and actions - * taken by other {@link QuorumWatcher}s. For example, the vote order or the - * pipeline order are fully determined based on sequence such as the - * following: - * - * <pre> - * actor0.pipelineAdd(); - * actor2.pipelineAdd(); - * actor1.pipelineAdd(); - * </pre> - * - * When in doubt, or when a unit test displays stochastic behavior, you can - * use this method to wait until the quorum state has been correctly - * replicated to the {@link Quorum}s under test. - * - * @param cond - * The condition, which must throw an - * {@link AssertionFailedError} if it does not succeed. - * @param timeout - * The timeout. - * @param unit - * - * @throws AssertionFailedError - * if the condition does not succeed within the timeout. - */ - public void assertCondition(final Runnable cond, final long timeout, + protected void assertCondition(final Runnable cond, final long timeout, final TimeUnit units) { - final long begin = System.nanoTime(); - long nanos = units.toNanos(timeout); - // remaining -= (now - begin) [aka elapsed] - nanos -= System.nanoTime() - begin; - while (true) { - try { - // try the condition - cond.run(); - // success. - return; - } catch (AssertionFailedError e) { - nanos -= System.nanoTime() - begin; - if (nanos < 0) { - // Timeout - rethrow the failed assertion. - throw e; - } - } - // sleep and retry. - try { - // sleep up to 10ms or nanos, which ever is less. - Thread - .sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(nanos), - 10)); - } catch (InterruptedException e1) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } - } + + AbstractQuorumTestCase.assertCondition(cond, timeout, units); + } - /** - * Waits up to 5 seconds for the condition to succeed. - * - * @param cond - * The condition, which must throw an - * {@link AssertionFailedError} if it does not succeed. - * - * @throws AssertionFailedError - * if the condition does not succeed within the timeout. - * - * @see #assertCondition(Runnable, long, TimeUnit) - */ - public void assertCondition(final Runnable cond) { + protected void assertCondition(final Runnable cond) { - assertCondition(cond, 5, TimeUnit.SECONDS); + AbstractQuorumTestCase.assertCondition(cond, 5, TimeUnit.SECONDS); } - + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java 2012-09-09 13:08:27 UTC (rev 6551) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java 2012-09-09 15:40:41 UTC (rev 6552) @@ -76,8 +76,8 @@ // unit tests for a singleton quorum. suite.addTestSuite(TestZkSingletonQuorumSemantics.class); - // FIXME Enable HA test suite again: unit tests for a highly available quorum. -// suite.addTestSuite(TestZkHA3QuorumSemantics.class); + // unit tests for a highly available quorum. + suite.addTestSuite(TestZkHA3QuorumSemantics.class); return suite; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |