|
From: <tho...@us...> - 2013-11-01 21:10:53
|
Revision: 7507
http://bigdata.svn.sourceforge.net/bigdata/?rev=7507&view=rev
Author: thompsonbry
Date: 2013-11-01 21:10:45 +0000 (Fri, 01 Nov 2013)
Log Message:
-----------
Continued work on #760 (2-phase commit semantics review)
I have analyzed the root cause of the override test in which B is told to reject the commit. The test actually relies on an exception being thrown back when the commit2Phase message is received. Thus, B is not actually encountering an error when handling that message and is not being transitioned automatically to the ERROR state.
I have further cleaned up the 2-phase commit logic in both QuorumCommitImpl? and AbstractJournal?. The COMMIT behavior for 2-phase commit is now rejected IFF we lack a majority of services that successfully executed the COMMIT (and for which the leader has received notice of that success). The leader will failover if the COMMIT phase fails and the transaction will be failed. (If the PREPARE phase fails, then the transaction is failed, but the leader does not fail over).
I am now encountering a problem with this test where B goes into the Operator state because it is looking for an HALog that does not exist. I had observed this in the recent longevity tests. Now we have a unit test that can recreate the problem, which is great.
http://192.168.1.133:8091 : is not joined, pipelineOrder=2, writePipelineAddr=localhost/127.0.0.1:9091, service=other, extendedRunState={server=Running, quorumService=Operator @ 1, haReady=-1, haStatus=NotReady, serviceId=e832b5c0-6c9b-444d-8946-514eacc5e329, now=1383332021358, msg=[HALog not available: commitCounter=2]}
QuorumCommitImpl? was modified to return a class containing a summary of the commit response. This makes it possible to write more flexible behaviors in AbstractJournal?.commitNow().
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/CommitResponse.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -0,0 +1,150 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Nov 1, 2013
+ */
+package com.bigdata.ha;
+
+import java.util.ArrayList;
+
+import com.bigdata.util.concurrent.ExecutionExceptions;
+
+/**
+ * Response for a 2-phase commit.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class CommitResponse {
+
+ /**
+ * The COMMIT message.
+ */
+ private final CommitRequest req;
+
+ /**
+ * An array of the root cause exceptions for any errors encountered when
+ * instructing the services to execute the COMMIT message. The indices into
+ * this collection are correlated with the service join order and the
+ * PREPARE vote order. The leader is always at index zero.
+ */
+ private final ArrayList<Throwable> causes;
+
+ /**
+ * The number of COMMIT messages that are known to have been processed
+ * successfully.
+ */
+ private final int nok;
+ /**
+ * The number of COMMIT messages that were issued and which failed.
+ */
+ private final int nfail;
+
+ public CommitResponse(final CommitRequest req,
+ final ArrayList<Throwable> causes) {
+
+ this.req = req;
+ this.causes = causes;
+
+ int nok = 0, nfail = 0;
+
+ for (Throwable t : causes) {
+
+ if (t == null)
+ nok++; // request issued and was Ok.
+ else
+ nfail++; // request issued and failed.
+
+ }
+
+ this.nok = nok;
+ this.nfail = nfail;
+
+ }
+
+ public boolean isLeaderOk() {
+
+ return causes.get(0) == null;
+
+ }
+
+ /**
+ * Number of COMMIT messages that were generated and succeeded.
+ */
+ public int getNOk() {
+
+ return nok;
+
+ }
+
+ /**
+ * Number of COMMIT messages that were generated and failed.
+ */
+ public int getNFail() {
+
+ return nfail;
+
+ }
+
+ /**
+ * Return the root cause for the ith service -or- <code>null</code> if the
+ * COMMIT did not produce an exception for that service.
+ */
+ public Throwable getCause(final int i) {
+
+ return causes.get(i);
+
+ }
+
+ /**
+ * Throw out the exception(s).
+ * <p>
+ * Note: This method is guaranteed to not return normally!
+ *
+ * @throws Exception
+ * if one or more services that voted YES failed the COMMIT.
+ *
+ * @throws IllegalStateException
+ * if all services that voted YES succeeded.
+ */
+ public void throwCauses() throws Exception {
+
+ if (causes.isEmpty()) {
+
+ // There were no errors.
+ throw new IllegalStateException();
+
+ }
+
+ // Throw exception back to the leader.
+ if (causes.size() == 1)
+ throw new Exception(causes.get(0));
+
+ final int k = req.getPrepareResponse().replicationFactor();
+
+ throw new Exception("replicationFactor=" + k + ", nok=" + nok
+ + ", nfail=" + nfail, new ExecutionExceptions(causes));
+
+ }
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java 2013-11-01 13:34:10 UTC (rev 7506)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommit.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -28,10 +28,8 @@
package com.bigdata.ha;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import com.bigdata.journal.IRootBlockView;
import com.bigdata.quorum.Quorum;
/**
@@ -64,12 +62,13 @@
/**
* Used by the leader to send a message to each joined service in the quorum
* telling it to commit using the root block from the corresponding
- * {@link #prepare2Phase(IRootBlockView, long, TimeUnit) prepare} message.
- * The commit MAY NOT go forward unless both the current quorum token and
- * the lastCommitTime on this message agree with the quorum token and
+ * {@link #prepare2Phase(PrepareRequest) prepare} message. The commit MAY
+ * NOT go forward unless both the current quorum token and the
+ * lastCommitTime on this message agree with the quorum token and
* lastCommitTime in the root block from the last "prepare" message.
*/
- void commit2Phase(CommitRequest req) throws IOException, InterruptedException;
+ CommitResponse commit2Phase(CommitRequest req) throws IOException,
+ InterruptedException;
/**
* Used by the leader to send a message to each service joined with the
@@ -80,6 +79,6 @@
* @param token
* The quorum token.
*/
- void abort2Phase(final long token) throws IOException, InterruptedException;
+ void abort2Phase(long token) throws IOException, InterruptedException;
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-11-01 13:34:10 UTC (rev 7506)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -386,8 +386,8 @@
}
@Override
- public void commit2Phase(final CommitRequest commitRequest) throws IOException,
- InterruptedException {
+ public CommitResponse commit2Phase(final CommitRequest commitRequest)
+ throws IOException, InterruptedException {
if (log.isInfoEnabled())
log.info("req=" + commitRequest);
@@ -428,10 +428,26 @@
final boolean didAllServicesPrepare = prepareResponse.getYesCount() == prepareResponse
.replicationFactor();
- final List<Future<Void>> localFutures = new LinkedList<Future<Void>>();
-
+ /*
+ * Note: These entries are in service join order. The first entry is
+ * always the leader. If a services did not vote YES for the PREPARE
+ * then it will have a null entry in this list.
+ */
+ final ArrayList<Future<Void>> localFutures = new ArrayList<Future<Void>>(
+ joinedServiceIds.length);
+
+ final ArrayList<Throwable> causes = new ArrayList<Throwable>();
+
try {
+ for (int i = 0; i < joinedServiceIds.length; i++) {
+
+ // Pre-size to ensure sufficient room for set(i,foo).
+ localFutures.add(null);
+ causes.add(null);
+
+ }
+
member.assertLeader(token);
final IHA2PhaseCommitMessage msgJoinedService = new HA2PhaseCommitMessage(
@@ -439,8 +455,6 @@
for (int i = 1; i < joinedServiceIds.length; i++) {
- final UUID serviceId = joinedServiceIds[i];
-
if (!prepareResponse.getVote(i)) {
// Skip services that did not vote YES in PREPARE.
@@ -448,6 +462,8 @@
}
+ final UUID serviceId = joinedServiceIds[i];
+
/*
* Submit task on local executor. The task will do an RMI to the
* remote service.
@@ -457,7 +473,7 @@
msgJoinedService));
// add to list of futures we will check.
- localFutures.add(rf);
+ localFutures.set(i, rf);
}
@@ -471,37 +487,38 @@
final Future<Void> f = leader.commit2Phase(msgJoinedService);
- localFutures.add(f);
+ localFutures.set(0/* leader */, f);
}
/*
* Check the futures for the other services in the quorum.
*/
- final List<Throwable> causes = new LinkedList<Throwable>();
- for (Future<Void> ft : localFutures) {
+ for (int i = 0; i < joinedServiceIds.length; i++) {
+ final Future<Void> ft = localFutures.get(i);
+ if (ft == null)
+ continue;
try {
ft.get(); // FIXME Timeout to await followers in commit2Phase().
// } catch (TimeoutException ex) {
// // Timeout on this Future.
// log.error(ex, ex);
-// causes.add(ex);
-// done = false;
+// causes.set(i, ex);
} catch (CancellationException ex) {
// Future was cancelled.
log.error(ex, ex);
- causes.add(ex);
+ causes.set(i, ex);
} catch (ExecutionException ex) {
log.error(ex, ex);
- causes.add(ex);
+ causes.set(i, ex);
} catch (RuntimeException ex) {
/*
- * Note: ClientFuture.get() can throw a RuntimeException
- * if there is a problem with the RMI call. In this case
- * we do not know whether the Future is done.
+ * Note: ClientFuture.get() can throw a RuntimeException if
+ * there is a problem with the RMI call. In this case we do
+ * not know whether the Future is done.
*/
log.error(ex, ex);
- causes.add(ex);
+ causes.set(i, ex);
} finally {
// Note: cancelling a *local* Future wrapping an RMI.
ft.cancel(true/* mayInterruptIfRunning */);
@@ -511,13 +528,7 @@
/*
* If there were any errors, then throw an exception listing them.
*/
- if (!causes.isEmpty()) {
- // Throw exception back to the leader.
- if (causes.size() == 1)
- throw new RuntimeException(causes.get(0));
- throw new RuntimeException("remote errors: nfailures="
- + causes.size(), new ExecutionExceptions(causes));
- }
+ return new CommitResponse(commitRequest, causes);
} finally {
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-11-01 13:34:10 UTC (rev 7506)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -314,10 +314,10 @@
}
@Override
- public void commit2Phase(final CommitRequest req) throws IOException,
+ public CommitResponse commit2Phase(final CommitRequest req) throws IOException,
InterruptedException {
- commitImpl.commit2Phase(req);
+ return commitImpl.commit2Phase(req);
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 13:34:10 UTC (rev 7506)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -96,6 +96,7 @@
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
import com.bigdata.ha.CommitRequest;
+import com.bigdata.ha.CommitResponse;
import com.bigdata.ha.HAGlue;
import com.bigdata.ha.HAStatusEnum;
import com.bigdata.ha.HATXSGlue;
@@ -3527,6 +3528,26 @@
/**
* HA mode commit (2-phase commit).
+ */
+ private void commitHA() {
+
+ try {
+
+ prepare2Phase();
+
+ commit2Phase();
+
+ } catch (Exception e) {
+
+ // launder throwable.
+ throw new RuntimeException(e);
+
+ }
+
+ }
+
+ /**
+ * PREPARE
* <p>
* Note: We need to make an atomic decision here regarding whether a
* service is joined with the met quorum or not. This information will
@@ -3541,12 +3562,37 @@
* metadata for the znode that is the parent of the joined services.
* However, we would need an expanded interface to get that metadata
* from zookeeper out of the Quorum.
+ *
+ * @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
*/
- private void commitHA() {
-
+ private void prepare2Phase() throws InterruptedException,
+ TimeoutException, IOException {
+
+ boolean didPrepare = false;
try {
- if(!prepare2Phase()) {
+ // Atomic decision point for joined vs non-joined services.
+ prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
+ quorum);
+
+ prepareRequest = new PrepareRequest(//
+ consensusReleaseTime,//
+ gatherJoinedAndNonJoinedServices,//
+ prepareJoinedAndNonJoinedServices,//
+ newRootBlock,//
+ quorumService.getPrepareTimeout(), // timeout
+ TimeUnit.MILLISECONDS//
+ );
+
+ // issue prepare request.
+ prepareResponse = quorumService.prepare2Phase(prepareRequest);
+
+ if (haLog.isInfoEnabled())
+ haLog.info(prepareResponse.toString());
+
+ if (!prepareResponse.willCommit()) {
// PREPARE rejected.
throw new QuorumException("PREPARE rejected: nyes="
@@ -3554,15 +3600,12 @@
+ prepareResponse.replicationFactor());
}
+ didPrepare = true;
- commitRequest = new CommitRequest(prepareRequest,
- prepareResponse);
+ } finally {
- quorumService.commit2Phase(commitRequest);
+ if (!didPrepare) {
- } catch (Throwable e) {
-
- try {
/*
* Something went wrong. Any services that were in the
* pipeline could have a dirty write set. Services that
@@ -3581,13 +3624,74 @@
* breaks, then the services will move into the Error state
* and will do a local abort as part of that transition.
*/
- quorumService.abort2Phase(commitToken);
- } catch (Throwable t) {
- log.warn(t, t);
+
+ try {
+ quorumService.abort2Phase(commitToken);
+ } catch (Throwable t) {
+ log.warn(t, t);
+ }
+
}
- if (commitRequest != null) {
+ }
+
+ }
+ // Fields set by the method above.
+ private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices;
+ private PrepareRequest prepareRequest;
+ private PrepareResponse prepareResponse;
+
+ /**
+ * COMMIT.
+ *
+ * Pre-condition: PREPARE was successful on a majority of the services.
+ */
+ private void commit2Phase() throws Exception {
+
+ boolean didCommit = false;
+ try {
+
+ /*
+ * Prepare was successful. COMMIT message has been formed. We
+ * will now commit.
+ *
+ * Note: The overall commit will fail unless we can prove that a
+ * majority of the services successfully committed.
+ */
+
+ commitRequest = new CommitRequest(prepareRequest,
+ prepareResponse);
+
+ commitResponse = quorumService.commit2Phase(commitRequest);
+
+ if (!store.quorum.isQuorum(commitResponse.getNOk())) {
+
/*
+ * Fail the commit.
+ *
+ * Note: An insufficient number of services were able to
+ * COMMIT successfully.
+ *
+ * Note: It is possible that a commit could be failed here
+ * when the commit is in fact stable on a majority of
+ * services. For example, with k=3 and 2 services running if
+ * both of them correctly update their root blocks but we
+ * lose network connectivity to the follower before the RMI
+ * returns, then we will fail the commit.
+ */
+
+ // Note: Guaranteed to not return normally!
+ commitResponse.throwCauses();
+
+ }
+
+ didCommit = true;
+
+ } finally {
+
+ if (!didCommit) {
+
+ /*
* The quorum voted to commit, but something went wrong.
*
* This forces the leader to fail over. The quorum can then
@@ -3614,55 +3718,18 @@
* another such that it will apply those HALog files on
* restart and form a consensus with the other services.
*/
+
quorumService.enterErrorState();
+
}
-
- // Re-throw the root cause exception.
- throw new RuntimeException(e);
}
}
// Fields set by the method above.
private CommitRequest commitRequest;
+ private CommitResponse commitResponse;
- /**
- * Return <code>true</code> iff the 2-phase PREPARE votes to COMMIT.
- *
- * @throws IOException
- * @throws TimeoutException
- * @throws InterruptedException
- */
- private boolean prepare2Phase() throws InterruptedException,
- TimeoutException, IOException {
-
- // Atomic decision point for joined vs non-joined services.
- prepareJoinedAndNonJoinedServices = new JoinedAndNonJoinedServices(
- quorum);
-
- prepareRequest = new PrepareRequest(//
- consensusReleaseTime,//
- gatherJoinedAndNonJoinedServices,//
- prepareJoinedAndNonJoinedServices,//
- newRootBlock,//
- quorumService.getPrepareTimeout(), // timeout
- TimeUnit.MILLISECONDS//
- );
-
- // issue prepare request.
- prepareResponse = quorumService.prepare2Phase(prepareRequest);
-
- if (haLog.isInfoEnabled())
- haLog.info(prepareResponse.toString());
-
- return prepareResponse.willCommit();
-
- }
- // Fields set by the method above.
- private IJoinedAndNonJoinedServices prepareJoinedAndNonJoinedServices;
- private PrepareRequest prepareRequest;
- private PrepareResponse prepareResponse;
-
} // class CommitState.
/**
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-01 13:34:10 UTC (rev 7506)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-11-01 21:10:45 UTC (rev 7507)
@@ -494,7 +494,7 @@
*
* TODO Consider leader failure scenarios in this test suite, not just
* scenarios where B fails. We MUST also cover failures of C (the 2nd
- * follower). We should also cover scenariors where the quorum is barely met
+ * follower). We should also cover scenarios where the quorum is barely met
* and a single failure causes a rejected commit (local decision) or 2-phase
* abort (joined services in joint agreement).
*
@@ -515,7 +515,17 @@
awaitCommitCounter(1L, startup.serverA, startup.serverB,
startup.serverC);
- // Setup B to fail the "COMMIT" message.
+ /*
+ * Setup B to fail the "COMMIT" message (specifically, it will throw
+ * back an exception rather than executing the commit.
+ *
+ * FIXME We need to cause B to actually fail the commit such that it
+ * enters the ERROR state. This is only causing the RMI to be rejected
+ * so B is not being failed out of the pipeline. Thus, B will remain
+ * joined with the met quorum (but at the wrong commit point) until we
+ * send down another replicated write. At that point B will notice that
+ * it is out of whack and enter the ERROR state.
+ */
((HAGlueTest) startup.serverB)
.failNext("commit2Phase",
new Class[] { IHA2PhaseCommitMessage.class },
@@ -543,13 +553,24 @@
// Should be two commit points on {A,C].
awaitCommitCounter(2L, startup.serverA, startup.serverC);
+ // Just one commit point on B.
+ awaitCommitCounter(1L, startup.serverB);
+
+ // B is still a follower.
+ awaitHAStatus(startup.serverB, HAStatusEnum.Follower);
+
/*
* B should go into an ERROR state and then into SeekConsensus and
* from there to RESYNC and finally back to RunMet. We can not
* reliably observe the intervening states. So what we really need
* to do is watch for B to move to the end of the pipeline and catch
* up to the same commit point.
+ *
+ * FIXME This is forcing B into an error state to simulate what
+ * would happen if B had encountered an error during the 2-phase
+ * commit above.
*/
+ ((HAGlueTest)startup.serverB).enterErrorState();
/*
* The pipeline should be reordered. B will do a service leave, then
@@ -558,6 +579,8 @@
awaitPipeline(new HAGlue[] { startup.serverA, startup.serverC,
startup.serverB });
+ awaitFullyMetQuorum();
+
/*
* There should be two commit points on {A,C,B} (note that this
* assert does not pay attention to the pipeline order).
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|