From: <tho...@us...> - 2013-12-09 15:13:39
|
Revision: 7620 http://bigdata.svn.sourceforge.net/bigdata/?rev=7620&view=rev Author: thompsonbry Date: 2013-12-09 15:13:30 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Commit includes the changes to allow the leader to force another service out of the quorum. This is to support the socket resynchronization protocol - see #779. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkHA3QuorumSemantics.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -2289,17 +2289,80 @@ abstract protected void doMemberAdd(); - abstract protected void doMemberRemove(); + final protected void doMemberRemove() { + doMemberRemove(serviceId); + } + abstract protected void doMemberRemove(UUID serviceId); + abstract protected void doCastVote(long lastCommitTime); - abstract protected void doWithdrawVote(); + final protected void doWithdrawVote() { + doWithdrawVote(serviceId); + } + abstract protected void doWithdrawVote(UUID serviceId); + abstract protected void doPipelineAdd(); - abstract protected void doPipelineRemove(); + final protected void doPipelineRemove() { + doPipelineRemove(serviceId); + } + abstract protected void doPipelineRemove(UUID serviceId); + + abstract protected void doServiceJoin(); + + final protected void doServiceLeave() { + doServiceLeave(serviceId); + } + + abstract protected void doServiceLeave(UUID serviceId); + + abstract protected void doSetToken(long newToken); + +// abstract protected void doSetLastValidToken(long newToken); +// +// abstract protected void doSetToken(); + + abstract protected void doClearToken(); + + /** + * {@inheritDoc} + * <p> + * Note: This implements an unconditional remove of the specified + * service. It is intended to force a different service out of the + * pipeline. This code deliberately takes this action unconditionally + * and does NOT await the requested state change. + * <p> + * Note: This code could potentially cause the remote service to + * deadlock in one of the conditionalXXX() methods if it is concurrently + * attempting to execute quorum action on itself. If this problem is + * observed, we should add a timeout to the conditionalXXX() methods + * that will force them to fail rather than block forever. This will + * then force the service into an error state if its QuorumActor can not + * carry out the requested action within a specified timeout. + * + * @throws InterruptedException + */ + @Override + final public void forceRemoveService(final UUID psid) + throws InterruptedException { + lock.lockInterruptibly(); + try { + log.warn("Forcing remove of service" + ": thisService=" + + serviceId + ", otherServiceId=" + psid); + doMemberRemove(psid); + doWithdrawVote(psid); + doPipelineRemove(psid); + doServiceLeave(psid); + } finally { + lock.unlock(); + } + } + + /** * Invoked when our client will become the leader to (a) reorganize the * write pipeline such that our client is the first service in the write * pipeline (the leader MUST be the first service in the write @@ -2394,18 +2457,6 @@ return modified; } - abstract protected void doServiceJoin(); - - abstract protected void doServiceLeave(); - - abstract protected void doSetToken(long newToken); - -// abstract protected void doSetLastValidToken(long newToken); -// -// abstract protected void doSetToken(); - - abstract protected void doClearToken(); - } /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumActor.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -205,4 +205,22 @@ */ void clearToken(); + /** + * Remove the service from the quorum. This should be called when a problem + * with the service is reported to the quorum leader, for example as a + * result of a failed RMI request or failed socket level write replication. + * Such errors arise either from network connectivity or service death. + * These problems will generally be cured, but the heatbeat timeout to cure + * the problem can cause write replication to block. This method may be used + * to force the timely reordering of the pipeline in order to work around + * the replication problem. This is not a permenant disabling of the service + * - the service may be restarted or may recover and reenter the quorum at + * any time. + * + * @param serviceId + * The UUID of the service to be removed. + * @throws InterruptedException + */ + public void forceRemoveService(UUID serviceId) throws InterruptedException; + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -846,32 +846,20 @@ fixture.memberAdd(serviceId); } - protected void doMemberRemove() { - fixture.memberRemove(serviceId); - } - protected void doCastVote(final long lastCommitTime) { fixture.castVote(serviceId, lastCommitTime); } - protected void doWithdrawVote() { - fixture.withdrawVote(serviceId); - } - protected void doPipelineAdd() { fixture.pipelineAdd(serviceId); } - protected void doPipelineRemove() { - fixture.pipelineRemove(serviceId); - } - protected void doServiceJoin() { fixture.serviceJoin(serviceId); } - protected void doServiceLeave() { - fixture.serviceLeave(serviceId); + protected void doServiceLeave(final UUID service) { + fixture.serviceLeave(service); } protected void doSetToken(final long newToken) { @@ -890,6 +878,21 @@ fixture.clearToken(); } + @Override + protected void doMemberRemove(UUID service) { + fixture.memberRemove(service); + } + + @Override + protected void doWithdrawVote(UUID service) { + fixture.withdrawVote(service); + } + + @Override + protected void doPipelineRemove(UUID service) { + fixture.pipelineRemove(service); + } + // /** // * {@inheritDoc} // * <p> Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/TestHA3QuorumSemantics.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -1102,6 +1102,586 @@ } + + public void test_serviceJoin3_simpleForceRemove() throws InterruptedException { + + final Quorum<?, ?> quorum0 = quorums[0]; + final MockQuorumMember<?> client0 = clients[0]; + final QuorumActor<?, ?> actor0 = actors[0]; + final UUID serviceId0 = client0.getServiceId(); + + final Quorum<?, ?> quorum1 = quorums[1]; + final MockQuorumMember<?> client1 = clients[1]; + final QuorumActor<?, ?> actor1 = actors[1]; + final UUID serviceId1 = client1.getServiceId(); + + final Quorum<?, ?> quorum2 = quorums[2]; + final MockQuorumMember<?> client2 = clients[2]; + final QuorumActor<?,?> actor2 = actors[2]; + final UUID serviceId2 = client2.getServiceId(); + +// final long lastCommitTime1 = 0L; +// +// final long lastCommitTime2 = 2L; + + final long lastCommitTime = 0L; + + // declare the services as a quorum members. + actor0.memberAdd(); + actor1.memberAdd(); + actor2.memberAdd(); + fixture.awaitDeque(); + assertCondition(new Runnable() { + public void run() { + assertEquals(3, quorum0.getMembers().length); + assertEquals(3, quorum1.getMembers().length); + assertEquals(3, quorum2.getMembers().length); + } + }); + + /* + * Have the services join the pipeline. + */ + actor0.pipelineAdd(); + actor1.pipelineAdd(); + actor2.pipelineAdd(); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum2.getPipeline()); + } + }); + + /* + * Have two services cast a vote for a lastCommitTime. This will cause + * the quorum to meet. + */ + final long token1; + { + + actor0.castVote(lastCommitTime); + actor1.castVote(lastCommitTime); + fixture.awaitDeque(); + + // validate the token was assigned (must wait for meet). + token1 = quorum0.awaitQuorum(); + assertEquals(Quorum.NO_QUORUM + 1, token1); + assertEquals(Quorum.NO_QUORUM + 1, quorum0.token()); + assertEquals(Quorum.NO_QUORUM + 1, quorum0.lastValidToken()); + assertTrue(quorum0.isQuorumMet()); + // wait for meet for other clients. + assertEquals(token1, quorum1.awaitQuorum()); + assertEquals(token1, quorum2.awaitQuorum()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getVotes().get(lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their + * votes. + */ + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getJoined()); + + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getPipeline()); + } + }); + } + + /* + * Cast the last vote and verify that the last service joins. + * + * Note: The last service should join immediately since it does not have + * to do any validation when it joins. + */ + { + actor2.castVote(lastCommitTime); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getVotes().get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + // Service join in the same order in which they cast their votes. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum2.getPipeline()); + } + + /* + * Follower leave/join test. + */ + { + + /* + * Fail the first follower. This will not cause a quorum break since + * there are still (k+1)/2 services in the quorum. + */ + // actor1.serviceLeave(); + actor0.forceRemoveService(actor1.getServiceId()); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getVotes().get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their + * votes. + */ + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum1 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum2 + .getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + assertCondition(new Runnable() { + public void run() { + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum1 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum2 + .getPipeline()); + } + }); + + /* + * Rejoin the service. + */ + actor1.memberAdd(); + fixture.awaitDeque(); + actor1.pipelineAdd(); + fixture.awaitDeque(); + actor1.castVote(lastCommitTime); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum0.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum1.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum2.getVotes() + .get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + // Service join in the same order in which they cast their + // votes. + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum2.getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum2.getPipeline()); + } + }); + + } + + /* + * Leader leave test. + * + * This forces the quorum leader to do a serviceLeave(), which causes a + * quorum break. All joined services should have left. Their votes were + * withdrawn when they left and they were removed from the pipeline as + * well. + */ + { + + actor0.serviceLeave(); + fixture.awaitDeque(); + + // the votes were withdrawn. + assertCondition(new Runnable() { + public void run() { + assertEquals(0, quorum0.getVotes().size()); + assertEquals(0, quorum1.getVotes().size()); + assertEquals(0, quorum2.getVotes().size()); + } + }); + + assertCondition(new Runnable() { + public void run() { + // the consensus was cleared. + assertEquals(-1L, client0.lastConsensusValue); + assertEquals(-1L, client1.lastConsensusValue); + assertEquals(-1L, client2.lastConsensusValue); + + // No one is joined. + assertEquals(new UUID[] {}, quorum0.getJoined()); + assertEquals(new UUID[] {}, quorum1.getJoined()); + assertEquals(new UUID[] {}, quorum2.getJoined()); + + // validate the token was cleared (lastValidToken is + // unchanged). + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(Quorum.NO_QUORUM, quorum0.token()); + assertEquals(Quorum.NO_QUORUM, quorum1.token()); + assertEquals(Quorum.NO_QUORUM, quorum2.token()); + assertFalse(quorum0.isQuorumMet()); + assertFalse(quorum1.isQuorumMet()); + assertFalse(quorum2.isQuorumMet()); + + // No one is in the pipeline. + assertEquals(new UUID[] {}, quorum0.getPipeline()); + assertEquals(new UUID[] {}, quorum1.getPipeline()); + assertEquals(new UUID[] {}, quorum2.getPipeline()); + } + }); + + } + + /* + * Heal the quorum by rejoining all of the services. + */ + final long token2; + { + + actor0.pipelineAdd(); + actor1.pipelineAdd(); + actor2.pipelineAdd(); + fixture.awaitDeque(); + + actor0.castVote(lastCommitTime); + actor1.castVote(lastCommitTime); + actor2.castVote(lastCommitTime); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + + // services have voted for a single lastCommitTime. + assertEquals(1,quorum0.getVotes().size()); + assertEquals(1,quorum1.getVotes().size()); + assertEquals(1,quorum2.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getVotes() + .get(lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their + * votes. + */ + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getJoined()); + } + }); + + // validate the token was updated. + token2 = quorum0.awaitQuorum(); + assertEquals(token2,quorum1.awaitQuorum()); + assertEquals(token2,quorum2.awaitQuorum()); + assertEquals(token2, quorum0.lastValidToken()); + assertEquals(token2, quorum1.lastValidToken()); + assertEquals(token2, quorum2.lastValidToken()); + assertEquals(token2, quorum0.token()); + assertEquals(token2, quorum1.token()); + assertEquals(token2, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getPipeline()); + } + }); + + } + + /* + * Cause the quorum to break by failing both of the followers. + */ + { + + /* + * Fail one follower. The quorum should not break. + */ + // actor2.serviceLeave(); + actor0.forceRemoveService(actor2.getServiceId()); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getVotes().get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getVotes().get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getVotes().get(lastCommitTime)); + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + } + }); + + /* + * Service join in the same order in which they cast their votes. + */ + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token2, quorum0.lastValidToken()); + assertEquals(token2, quorum1.lastValidToken()); + assertEquals(token2, quorum2.lastValidToken()); + assertEquals(token2, quorum0.token()); + assertEquals(token2, quorum1.token()); + assertEquals(token2, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getPipeline()); + } + }); + + /* + * Fail the remaining follower. The quorum will break. + */ + // actor1.serviceLeave(); + actor0.forceRemoveService(actor1.getServiceId()); + fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // Services have voted for a single lastCommitTime. + assertEquals(0, quorum0.getVotes().size()); + /** + * TODO The assert above occasionally fails with this trace. + * + * <pre> + * junit.framework.AssertionFailedError: expected:<0> but was:<1> + * at junit.framework.Assert.fail(Assert.java:47) + * at junit.framework.Assert.failNotEquals(Assert.java:282) + * at junit.framework.Assert.assertEquals(Assert.java:64) + * at junit.framework.Assert.assertEquals(Assert.java:201) + * at junit.framework.Assert.assertEquals(Assert.java:207) + * at com.bigdata.quorum.TestHA3QuorumSemantics$19.run(TestHA3QuorumSemantics.java:1034) + * at com.bigdata.quorum.AbstractQuorumTestCase.assertCondition(AbstractQuorumTestCase.java:184) + * at com.bigdata.quorum.AbstractQuorumTestCase.assertCondition(AbstractQuorumTestCase.java:225) + * at com.bigdata.quorum.TestHA3QuorumSemantics.test_serviceJoin3_simple(TestHA3QuorumSemantics.java:1031) + * </pre> + */ + + // verify the vote order. + assertEquals(null, quorum0.getVotes().get(lastCommitTime)); + + // verify the consensus was cleared. + assertEquals(-1L, client0.lastConsensusValue); + assertEquals(-1L, client1.lastConsensusValue); + assertEquals(-1L, client2.lastConsensusValue); + + // no services are joined. + assertEquals(new UUID[] {}, quorum0.getJoined()); + assertEquals(new UUID[] {}, quorum1.getJoined()); + assertEquals(new UUID[] {}, quorum2.getJoined()); + } + }); + + quorum0.awaitBreak(); + quorum1.awaitBreak(); + quorum2.awaitBreak(); + + // validate the token was cleared. + assertEquals(token2, quorum0.lastValidToken()); + assertEquals(token2, quorum1.lastValidToken()); + assertEquals(token2, quorum2.lastValidToken()); + assertEquals(Quorum.NO_QUORUM, quorum0.token()); + assertEquals(Quorum.NO_QUORUM, quorum1.token()); + assertEquals(Quorum.NO_QUORUM, quorum2.token()); + assertFalse(quorum0.isQuorumMet()); + assertFalse(quorum1.isQuorumMet()); + assertFalse(quorum2.isQuorumMet()); + + assertCondition(new Runnable() { + public void run() { + // Service leaves forced pipeline leaves. + assertEquals(new UUID[] {}, quorum0.getPipeline()); + assertEquals(new UUID[] {}, quorum1.getPipeline()); + assertEquals(new UUID[] {}, quorum2.getPipeline()); + } + }); + + } + + } + /** * Unit tests for pipeline reorganization when the leader is elected. This * tests the automatic reorganization of the pipeline order where the Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -327,7 +327,7 @@ } @Override - protected void doMemberRemove() { + protected void doMemberRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -340,7 +340,7 @@ try { zk.delete(logicalServiceId + "/" + QUORUM + "/" + QUORUM_MEMBER + "/" + QUORUM_MEMBER_PREFIX - + serviceIdStr, -1/* anyVersion */); + + service.toString(), -1/* anyVersion */); } catch (NoNodeException e) { // ignore. } catch (KeeperException e) { @@ -414,7 +414,7 @@ } @Override - protected void doPipelineRemove() { + protected void doPipelineRemove(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -446,7 +446,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { zk.delete(zpath + "/" + s, -1/* anyVersion */); return; } @@ -636,7 +636,7 @@ * handles a concurrent delete by a simple retry loop. */ @Override - protected void doWithdrawVote() { + protected void doWithdrawVote(final UUID service) { // zpath for votes. final String votesZPath = getVotesZPath(); if (log.isInfoEnabled()) @@ -724,7 +724,7 @@ Thread.currentThread().interrupt(); return; } - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // found our vote. try { // delete our vote. @@ -761,7 +761,7 @@ } // done. if (log.isInfoEnabled()) - log.info("withdrawn: serviceId=" + serviceIdStr + log.info("withdrawn: serviceId=" + service.toString() + ", lastCommitTime=" + lastCommitTime); return; } catch (NoNodeException e) { @@ -836,7 +836,7 @@ } @Override - protected void doServiceLeave() { + protected void doServiceLeave(final UUID service) { // get a valid zookeeper connection object. final ZooKeeper zk; try { @@ -871,7 +871,7 @@ } final QuorumServiceState state = (QuorumServiceState) SerializerUtil .deserialize(b); - if (serviceId.equals(state.serviceUUID())) { + if (service.equals(state.serviceUUID())) { // Found this service. zk.delete(zpath + "/" + s, -1/* anyVersion */); return; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkHA3QuorumSemantics.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkHA3QuorumSemantics.java 2013-12-09 15:11:10 UTC (rev 7619) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkHA3QuorumSemantics.java 2013-12-09 15:13:30 UTC (rev 7620) @@ -1074,6 +1074,561 @@ } /** + * Unit test of {@link QuorumActor#forceRemoveService(UUID)}. + */ + public void test_serviceJoin3_simpleForceRemove() throws InterruptedException { + + final Quorum<?, ?> quorum0 = quorums[0]; + final MockQuorumMember<?> client0 = clients[0]; + final QuorumActor<?, ?> actor0 = actors[0]; + final UUID serviceId0 = client0.getServiceId(); + + final Quorum<?, ?> quorum1 = quorums[1]; + final MockQuorumMember<?> client1 = clients[1]; + final QuorumActor<?, ?> actor1 = actors[1]; + final UUID serviceId1 = client1.getServiceId(); + + final Quorum<?, ?> quorum2 = quorums[2]; + final MockQuorumMember<?> client2 = clients[2]; + final QuorumActor<?,?> actor2 = actors[2]; + final UUID serviceId2 = client2.getServiceId(); + +// final long lastCommitTime1 = 0L; +// +// final long lastCommitTime2 = 2L; + + final long lastCommitTime = 0L; + + // declare the services as a quorum members. + actor0.memberAdd(); + actor1.memberAdd(); + actor2.memberAdd(); + // fixture.awaitDeque(); + assertCondition(new Runnable() { + public void run() { + assertEquals(3, quorum0.getMembers().length); + assertEquals(3, quorum1.getMembers().length); + assertEquals(3, quorum2.getMembers().length); + } + }); + + /* + * Have the services join the pipeline. + */ + actor0.pipelineAdd(); + actor1.pipelineAdd(); + actor2.pipelineAdd(); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum2.getPipeline()); + } + }); + + /* + * Have two services cast a vote for a lastCommitTime. This will cause + * the quorum to meet. + */ + final long token1; + { + + actor0.castVote(lastCommitTime); + actor1.castVote(lastCommitTime); + // fixture.awaitDeque(); + + // validate the token was assigned (must wait for meet). + token1 = quorum0.awaitQuorum(); + assertEquals(Quorum.NO_QUORUM + 1, token1); + assertEquals(Quorum.NO_QUORUM + 1, quorum0.token()); + assertEquals(Quorum.NO_QUORUM + 1, quorum0.lastValidToken()); + assertTrue(quorum0.isQuorumMet()); + // wait for meet for other clients. + assertEquals(token1, quorum1.awaitQuorum()); + assertEquals(token1, quorum2.awaitQuorum()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getVotes().get(lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their + * votes. + */ + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getJoined()); + + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getPipeline()); + } + }); + } + + /* + * Cast the last vote and verify that the last service joins. + * + * Note: The last service should join immediately since it does not have + * to do any validation when it joins. + */ + { + actor2.castVote(lastCommitTime); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getVotes().get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + // Service join in the same order in which they cast their votes. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, serviceId2 }, + quorum2.getPipeline()); + } + + /* + * Follower leave/join test. + */ + { + + /* + * Fail the first follower. This will not cause a quorum break since + * there are still (k+1)/2 services in the quorum. + */ + actor2.forceRemoveService(actor1.getServiceId()); + + // actor1.serviceLeave(); + // fixture.awaitDeque(); + + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getVotes().get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their votes. + */ + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum1 + .getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum2 + .getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum0 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum1 + .getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2 }, quorum2 + .getPipeline()); + + /* + * Rejoin the service. + */ + actor1.memberAdd(); + actor1.pipelineAdd(); + // fixture.awaitDeque(); + actor1.castVote(lastCommitTime); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum0.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum1.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId2, + serviceId1 }, quorum2.getVotes() + .get(lastCommitTime)); + + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + /* + * Service join in the same order in which they cast their votes. + */ + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum2.getJoined()); + } + }); + + // validate the token was NOT updated. + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(token1, quorum0.token()); + assertEquals(token1, quorum1.token()); + assertEquals(token1, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId2, serviceId1 }, + quorum2.getPipeline()); + } + }); + + } + + /* + * Leader leave test. + * + * This forces the quorum leader to do a serviceLeave(), which causes a + * quorum break. All joined services should have left. Their votes were + * withdrawn when they left and they were removed from the pipeline as + * well. + */ + { + + actor0.serviceLeave(); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // the votes were withdrawn. + assertEquals(0, quorum0.getVotes().size()); + assertEquals(0, quorum1.getVotes().size()); + assertEquals(0, quorum2.getVotes().size()); + } + }); + + assertCondition(new Runnable() { + public void run() { + // the consensus was cleared. + assertEquals(-1L, client0.lastConsensusValue); + assertEquals(-1L, client1.lastConsensusValue); + assertEquals(-1L, client2.lastConsensusValue); + + // No one is joined. + assertEquals(new UUID[] {}, quorum0.getJoined()); + assertEquals(new UUID[] {}, quorum1.getJoined()); + assertEquals(new UUID[] {}, quorum2.getJoined()); + + // validate the token was cleared (lastValidToken is + // unchanged). + assertEquals(token1, quorum0.lastValidToken()); + assertEquals(token1, quorum1.lastValidToken()); + assertEquals(token1, quorum2.lastValidToken()); + assertEquals(Quorum.NO_QUORUM, quorum0.token()); + assertEquals(Quorum.NO_QUORUM, quorum1.token()); + assertEquals(Quorum.NO_QUORUM, quorum2.token()); + assertFalse(quorum0.isQuorumMet()); + assertFalse(quorum1.isQuorumMet()); + assertFalse(quorum2.isQuorumMet()); + + // No one is in the pipeline. + assertEquals(new UUID[] {}, quorum0.getPipeline()); + assertEquals(new UUID[] {}, quorum1.getPipeline()); + assertEquals(new UUID[] {}, quorum2.getPipeline()); + } + }); + } + + /* + * Heal the quorum by rejoining all of the services. + */ + final long token2; + { + + actor0.pipelineAdd(); + actor1.pipelineAdd(); + actor2.pipelineAdd(); + // fixture.awaitDeque(); + + actor0.castVote(lastCommitTime); + actor1.castVote(lastCommitTime); + actor2.castVote(lastCommitTime); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getVotes() + .get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getVotes() + .get(lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + + // Service join in the same order in which they cast their + // votes. + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getJoined()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getJoined()); + } + }); + + // validate the token was updated. + token2 = quorum0.awaitQuorum(); + assertEquals(token2,quorum1.awaitQuorum()); + assertEquals(token2,quorum2.awaitQuorum()); + assertEquals(token2, quorum0.lastValidToken()); + assertEquals(token2, quorum1.lastValidToken()); + assertEquals(token2, quorum2.lastValidToken()); + assertEquals(token2, quorum0.token()); + assertEquals(token2, quorum1.token()); + assertEquals(token2, quorum2.token()); + assertTrue(quorum0.isQuorumMet()); + assertTrue(quorum1.isQuorumMet()); + assertTrue(quorum2.isQuorumMet()); + + // The pipeline order is the same as the vote order. + assertCondition(new Runnable() { + public void run() { + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum0.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum1.getPipeline()); + assertEquals(new UUID[] { serviceId0, serviceId1, + serviceId2 }, quorum2.getPipeline()); + } + }); + + } + + /* + * Cause the quorum to break by failing both of the followers. + */ + { + + /* + * Fail one follower. The quorum should not break. + */ + actor0.forceRemoveService(actor2.getServiceId()); + // actor2.serviceLeave(); + // fixture.awaitDeque(); + + assertCondition(new Runnable() { + public void run() { + // services have voted for a single lastCommitTime. + assertEquals(1, quorum0.getVotes().size()); + assertEquals(1, quorum1.getVotes().size()); + assertEquals(1, quorum2.getVotes().size()); + // verify the vote order. + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum0 + .getVotes().get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum1 + .getVotes().get(lastCommitTime)); + assertEquals(new UUID[] { serviceId0, serviceId1 }, quorum2 + .getVotes().get(lastCommitTime)); + // verify the consensus was NOT updated. + assertEquals(lastCommitTime, client0.lastConsensusValue); + assertEquals(lastCommitTime, client1.lastConsensusValue); + assertEquals(lastCommitTime, client2.lastConsensusValue); + } + }); + + /* + ... [truncated message content] |