From: <tho...@us...> - 2013-10-31 17:04:42
|
Revision: 7505 http://bigdata.svn.sourceforge.net/bigdata/?rev=7505&view=rev Author: thompsonbry Date: 2013-10-31 17:04:33 +0000 (Thu, 31 Oct 2013) Log Message: ----------- The issue with the withheld version of QuorumCommitImpl has been tracked down to the non-blocking semantics of HAGlue.getRootBlock(). I have introduced a blocking version of the iHARootBlockRequest message for this method. When that is used from within awaitCommitCounter() in the test suite, then the test suite runs green for the tests that were failing. Thus, this was a data race problem in the test suite. AbstractServer: clean up dead code. HAJournalServer: clean up dead code. AbstractHA3BackupTestCase: clean up import. AbstractHAJournalServerTestCase: modify awaitCommitCounter() to use the blocking version of HAGlue.getRootBlock(). This fixes a test suite problem where increased concurrency in the 2-phase PREPARE, COMMIT, and ABORT protocols could cause test suite failures. QuorumCommitImpl: Increased parallelism. The PREPARE, COMMIT, and ABORT behaviors are now executed in parallel on the followers and the leader. Simplified the code patterns for error handling. (I)HARootBlockRequest: Added boolean parameter for blocking versus non-blocking request. AbstractJournal: Added support for blocking versus non-blocking request. See #760 (commit2Phase code review). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumCommitImpl.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -28,10 +28,11 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -43,11 +44,14 @@ import com.bigdata.ha.msg.IHA2PhaseAbortMessage; import com.bigdata.ha.msg.IHA2PhaseCommitMessage; import com.bigdata.ha.msg.IHA2PhasePrepareMessage; +import com.bigdata.ha.msg.IHAMessage; import com.bigdata.journal.IRootBlockView; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumStateChangeListener; import com.bigdata.quorum.QuorumStateChangeListenerBase; +import com.bigdata.service.proxy.ThickFuture; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.ExecutionExceptions; /** @@ -60,50 +64,73 @@ static private transient final Logger log = Logger .getLogger(QuorumCommitImpl.class); - protected final QuorumMember<S> member; - - /** - * The downstream service in the write pipeline. - */ - protected volatile UUID downStreamId = null; + private final QuorumMember<S> member; + private final ExecutorService executorService; public QuorumCommitImpl(final QuorumMember<S> member) { + + if (member == null) + throw new IllegalArgumentException(); this.member = member; + this.executorService = member.getExecutor(); + } - protected Quorum<?, ?> getQuorum() { + private Quorum<?, ?> getQuorum() { return member.getQuorum(); } - protected HACommitGlue getService(final UUID serviceId) { + private HACommitGlue getService(final UUID serviceId) { return member.getService(serviceId); } - + /** * Cancel the requests on the remote services (RMI). This is a best effort * implementation. Any RMI related errors are trapped and ignored in order * to be robust to failures in RMI when we try to cancel the futures. + * <p> + * NOte: This is not being done in parallel. However, due to a DGC thread + * leak issue, we now use {@link ThickFuture}s. Thus, the tasks that are + * being cancelled are all local tasks running on the + * {@link #executorService}. If that local task is doing an RMI, then + * cancelling it will cause an interrupt in the NIO request. */ - protected <F extends Future<T>, T> void cancelRemoteFutures( - final List<F> remoteFutures) { + private <F extends Future<T>, T> void cancelFutures(final List<F> futures) { if (log.isInfoEnabled()) log.info(""); - for (F rf : remoteFutures) { + for (F f : futures) { + if (f == null) { + + continue; + + } + try { + + if (!f.isDone()) { + + f.cancel(true/* mayInterruptIfRunning */); - rf.cancel(true/* mayInterruptIfRunning */); + } } catch (Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + + } + // ignored (to be robust). } @@ -139,6 +166,7 @@ * from the prepare message. This metadata is used to decide how the service * will handle the prepare, commit, and abort messages. */ + @Override public PrepareResponse prepare2Phase(final PrepareRequest req) throws InterruptedException, IOException { @@ -150,9 +178,6 @@ final UUID[] joinedServiceIds = req.getPrepareAndNonJoinedServices() .getJoinedServiceIds(); -// final Set<UUID> nonJoinedPipelineServiceIds = req -// .getNonJoinedPipelineServiceIds(); - final long timeout = req.getTimeout(); final TimeUnit unit = req.getUnit(); @@ -172,35 +197,36 @@ final long begin = System.nanoTime(); final long nanos = unit.toNanos(timeout); long remaining = nanos; - + /* - * The leader is a local service. The followers and other service in the - * pipeline (but not yet joined) are remote services. + * Random access list of futures. + * + * Note: These are *local* Futures. Except for the leader, the Future is + * for a task that submits an RMI request. This allows us to run the + * PREPARE in parallel for less total latency. On the leader, the task + * is run in the caller's thread (on the leader, the caller holds a lock + * that we need so we have to execute the behavior in the caller's + * thread). + * + * Note: If a follower was joined as of the atomic decision point, but + * did not *participate* in the GATHER protocol, then we still send it + * the PREPARE message. */ + final ArrayList<Future<Boolean>> localFutures = new ArrayList<Future<Boolean>>( + joinedServiceIds.length); - // #of remote followers (joined services, excluding the leader). - final int nfollowers = (joinedServiceIds.length - 1); + try { -// // #of non-joined services in the pipeline. -// final int nNonJoinedPipelineServices = nonJoinedPipelineServiceIds -// .size(); + // #of remote followers (joined services, excluding the leader). + final int nfollowers = joinedServiceIds.length - 1; - // #of remote services (followers plus others in the pipeline). - final int remoteServiceCount = nfollowers;// + nNonJoinedPipelineServices; + for (int i = 0; i <= nfollowers; i++) { - // Random access list of futures. - final ArrayList<Future<Boolean>> remoteFutures = new ArrayList<Future<Boolean>>( - remoteServiceCount); + // Pre-size to ensure sufficient room for set(i,foo). + localFutures.add(null); - for (int i = 0; i <= remoteServiceCount; i++) { - - // Pre-size to ensure sufficient room for set(i,foo). - remoteFutures.add(null); - - } - - try { - + } + // Verify the quorum is valid. member.assertLeader(token); @@ -240,64 +266,20 @@ rootBlock, timeout, unit); /* - * Runnable which will execute this message on the - * remote service. - * - * FIXME Because async futures cause DGC native thread - * leaks this is no longer running the prepare - * asynchronously on the followers. Change the code - * here, and in commit2Phase and abort2Phase to use - * multiple threads to run the tasks on the followers. + * Submit task which will execute this message on the + * remote service. We will await this task below. */ - - final HACommitGlue service = getService(serviceId); - - Future<Boolean> rf = null; - try { - // RMI. - rf = service.prepare2Phase(msgForJoinedService); - } catch (final Throwable t) { - // If anything goes wrong, wrap up exception as Future. - final FutureTask<Boolean> ft = new FutureTask<Boolean>(new Runnable() { - public void run() { - throw new RuntimeException(t); - } - }, Boolean.FALSE); - rf = ft; - ft.run(); // evaluate future. - } + final Future<Boolean> rf = executorService + .submit(new PrepareMessageTask(serviceId, + msgForJoinedService)); // add to list of futures we will check. - remoteFutures.set(i, rf); + localFutures.set(i, rf); } } -// // Next, message the pipeline services NOT met with the quorum. -// { -// -// // message for non-joined services. -// final IHA2PhasePrepareMessage msg = new HA2PhasePrepareMessage( -// false/* isJoinedService */, rootBlock, timeout, unit); -// -// for (UUID serviceId : nonJoinedPipelineServiceIds) { -// -// /* -// * Runnable which will execute this message on the -// * remote service. -// */ -// final Future<Boolean> rf = getService(serviceId) -// .prepare2Phase(msg); -// -// // add to list of futures we will check. -// remoteFutures.set(i, rf); -// -// i++; -// -// } -// } - /* * Finally, run the operation on the leader using local method * call (non-RMI) in the caller's thread to avoid deadlock. @@ -324,7 +306,7 @@ final Future<Boolean> f = leader .prepare2Phase(msgForJoinedService); - remoteFutures.set(0/* index */, f); + localFutures.set(0/* index */, f); } @@ -334,36 +316,24 @@ * Check futures for all services that were messaged. */ int nyes = 0; - assert remoteFutures.size() == remoteServiceCount + 1; - final boolean[] votes = new boolean[remoteServiceCount + 1]; - for (int i = 0; i <= remoteServiceCount; i++) { - final Future<Boolean> rf = remoteFutures.get(i); - if (rf == null) + final boolean[] votes = new boolean[1 + nfollowers]; + for (int i = 0; i <= nfollowers; i++) { + final Future<Boolean> ft = localFutures.get(i); + if (ft == null) throw new AssertionError("null @ index=" + i); - boolean done = false; try { remaining = nanos - (System.nanoTime() - begin); - final boolean vote = rf + final boolean vote = ft .get(remaining, TimeUnit.NANOSECONDS); votes[i] = vote; - if (i < joinedServiceIds.length) { - // Only the leader and the followers get a vote. - nyes += vote ? 1 : 0; - } else { - // non-joined pipeline service. vote does not count. - if (!vote) { - log.warn("Non-joined pipeline service will not prepare"); - } - } - done = true; + // Only the leader and the followers get a vote. + nyes += vote ? 1 : 0; } catch (CancellationException ex) { // This Future was cancelled. log.error(ex, ex); - done = true; // CancellationException indicates isDone(). } catch (TimeoutException ex) { // Timeout on this Future. log.error(ex, ex); - done = false; } catch (ExecutionException ex) { /* * Note: prepare2Phase() is throwing exceptions if @@ -373,7 +343,6 @@ * service when attempting to perform the RMI. */ log.error(ex, ex); - done = true; // ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException if @@ -382,14 +351,8 @@ */ log.error(ex, ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/*mayInterruptIfRunning*/); } } @@ -415,30 +378,19 @@ return new PrepareResponse(k, nyes, willCommit, votes); } finally { - /* - * Ensure that all futures are cancelled. - */ - for (Future<Boolean> rf : remoteFutures) { - if (rf == null) // ignore empty slots. - continue; - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } - } + + cancelFutures(localFutures); + } } - public void commit2Phase(final CommitRequest req) throws IOException, + @Override + public void commit2Phase(final CommitRequest commitRequest) throws IOException, InterruptedException { if (log.isInfoEnabled()) - log.info("req=" + req); + log.info("req=" + commitRequest); /* * To minimize latency, we first submit the futures for the other @@ -460,31 +412,28 @@ * atomic decision point concerning such things in commitNow().] */ - final PrepareRequest preq = req.getPrepareRequest(); + final PrepareRequest prepareRequest = commitRequest.getPrepareRequest(); - final UUID[] joinedServiceIds = preq.getPrepareAndNonJoinedServices() + final UUID[] joinedServiceIds = prepareRequest.getPrepareAndNonJoinedServices() .getJoinedServiceIds(); -// final Set<UUID> nonJoinedPipelineServiceIds = preq -// .getNonJoinedPipelineServiceIds(); - - final long token = preq.getRootBlock().getQuorumToken(); + final long token = prepareRequest.getRootBlock().getQuorumToken(); - final long commitTime = preq.getRootBlock().getLastCommitTime(); + final long commitTime = prepareRequest.getRootBlock().getLastCommitTime(); - final PrepareResponse presp = req.getPrepareResponse(); + final PrepareResponse prepareResponse = commitRequest.getPrepareResponse(); // true iff we have a full complement of services that vote YES for this // commit. - final boolean didAllServicesPrepare = presp.getYesCount() == presp + final boolean didAllServicesPrepare = prepareResponse.getYesCount() == prepareResponse .replicationFactor(); - member.assertLeader(token); - - final List<Future<Void>> remoteFutures = new LinkedList<Future<Void>>(); + final List<Future<Void>> localFutures = new LinkedList<Future<Void>>(); try { + member.assertLeader(token); + final IHA2PhaseCommitMessage msgJoinedService = new HA2PhaseCommitMessage( true/* isJoinedService */, commitTime, didAllServicesPrepare); @@ -492,7 +441,7 @@ final UUID serviceId = joinedServiceIds[i]; - if (!presp.getVote(i)) { + if (!prepareResponse.getVote(i)) { // Skip services that did not vote YES in PREPARE. continue; @@ -500,38 +449,18 @@ } /* - * Runnable which will execute this message on the remote - * service. + * Submit task on local executor. The task will do an RMI to the + * remote service. */ - final Future<Void> rf = getService(serviceId).commit2Phase( - msgJoinedService); + final Future<Void> rf = executorService + .submit(new CommitMessageTask(serviceId, + msgJoinedService)); // add to list of futures we will check. - remoteFutures.add(rf); + localFutures.add(rf); } -// if (!nonJoinedPipelineServiceIds.isEmpty()) { -// -// final IHA2PhaseCommitMessage msgNonJoinedService = new HA2PhaseCommitMessage( -// false/* isJoinedService */, commitTime); -// -// for (UUID serviceId : nonJoinedPipelineServiceIds) { -// -// /* -// * Runnable which will execute this message on the remote -// * service. -// */ -// final Future<Void> rf = getService(serviceId).commit2Phase( -// msgNonJoinedService); -// -// // add to list of futures we will check. -// remoteFutures.add(rf); -// -// } -// -// } - { /* * Run the operation on the leader using local method call in @@ -542,7 +471,7 @@ final Future<Void> f = leader.commit2Phase(msgJoinedService); - remoteFutures.add(f); + localFutures.add(f); } @@ -550,11 +479,9 @@ * Check the futures for the other services in the quorum. */ final List<Throwable> causes = new LinkedList<Throwable>(); - for (Future<Void> rf : remoteFutures) { - boolean done = false; + for (Future<Void> ft : localFutures) { try { - rf.get(); // TODO Timeout to await followers in commit2Phase(). - done = true; + ft.get(); // FIXME Timeout to await followers in commit2Phase(). // } catch (TimeoutException ex) { // // Timeout on this Future. // log.error(ex, ex); @@ -564,11 +491,9 @@ // Future was cancelled. log.error(ex, ex); causes.add(ex); - done = true; // Future is done since cancelled. } catch (ExecutionException ex) { log.error(ex, ex); causes.add(ex); - done = true; // Note: ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException @@ -578,14 +503,8 @@ log.error(ex, ex); causes.add(ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); } } @@ -593,8 +512,6 @@ * If there were any errors, then throw an exception listing them. */ if (!causes.isEmpty()) { - // Cancel remote futures. - cancelRemoteFutures(remoteFutures); // Throw exception back to the leader. if (causes.size() == 1) throw new RuntimeException(causes.get(0)); @@ -603,28 +520,22 @@ } } finally { - /* - * Ensure that all futures are cancelled. - */ - for (Future<Void> rf : remoteFutures) { - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } - } + + // Ensure that all futures are cancelled. + cancelFutures(localFutures); + } } /** + * {@inheritDoc} + * * FIXME Only issue abort to services that voted YES in prepare? [We have * that information in commitNow(), but we do not have the atomic set of * joined services in AbstractJournal.abort())]. */ + @Override public void abort2Phase(final long token) throws IOException, InterruptedException { @@ -632,14 +543,6 @@ log.info("token=" + token); /* - * To minimize latency, we first submit the futures for the other - * services and then do f.run() on the leader. This will allow the other - * services to commit concurrently with the leader's IO. - */ - - final List<Future<Void>> remoteFutures = new LinkedList<Future<Void>>(); - - /* * For services (other than the leader) in the quorum, submit the * RunnableFutures to an Executor. */ @@ -649,6 +552,12 @@ final IHA2PhaseAbortMessage msg = new HA2PhaseAbortMessage(token); + /* + * To minimize latency, we first submit the futures for the other + * services and then do f.run() on the leader. + */ + final List<Future<Void>> localFutures = new LinkedList<Future<Void>>(); + try { for (int i = 1; i < joinedServiceIds.length; i++) { @@ -656,13 +565,14 @@ final UUID serviceId = joinedServiceIds[i]; /* - * Runnable which will execute this message on the remote - * service. + * Submit task on local executor. The task will do an RMI to the + * remote service. */ - final Future<Void> rf = getService(serviceId).abort2Phase(msg); + final Future<Void> rf = executorService + .submit(new AbortMessageTask(serviceId, msg)); // add to list of futures we will check. - remoteFutures.add(rf); + localFutures.add(rf); } @@ -674,25 +584,22 @@ member.assertLeader(token); final S leader = member.getService(); final Future<Void> f = leader.abort2Phase(msg); - remoteFutures.add(f); + localFutures.add(f); } /* * Check the futures for the other services in the quorum. */ final List<Throwable> causes = new LinkedList<Throwable>(); - for (Future<Void> rf : remoteFutures) { - boolean done = false; + for (Future<Void> ft : localFutures) { try { - rf.get(); - done = true; + ft.get(); // TODO Timeout for abort? } catch (InterruptedException ex) { log.error(ex, ex); causes.add(ex); } catch (ExecutionException ex) { log.error(ex, ex); causes.add(ex); - done = true; // Note: ExecutionException indicates isDone(). } catch (RuntimeException ex) { /* * Note: ClientFuture.get() can throw a RuntimeException @@ -702,14 +609,8 @@ log.error(ex, ex); causes.add(ex); } finally { - if (!done) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + // Note: cancelling a *local* Future wrapping an RMI. + ft.cancel(true/* mayInterruptIfRunning */); } } @@ -717,11 +618,10 @@ * If there were any errors, then throw an exception listing them. * * TODO But only throw an exception for the joined services. - * Non-joined services, we just long an error. + * Non-joined services, we just long an error (or simply do not tell + * them to do an abort()). */ if (!causes.isEmpty()) { - // Cancel remote futures. - cancelRemoteFutures(remoteFutures); // Throw exception back to the leader. if (causes.size() == 1) throw new RuntimeException(causes.get(0)); @@ -730,22 +630,133 @@ } } finally { + + // Ensure that all futures are cancelled. + cancelFutures(localFutures); + + } + + } + + /** + * Helper class submits the RMI for a PREPARE, COMMIT, or ABORT message. + * This is used to execute the different requests in parallel on a local + * executor service. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private abstract class AbstractMessageTask<T, M extends IHAMessage> + implements Callable<T> { + + private final UUID serviceId; + protected final M msg; + + public AbstractMessageTask(final UUID serviceId, final M msg) { + + this.serviceId = serviceId; + + this.msg = msg; + + } + + @Override + final public T call() throws Exception { + /* - * Ensure that all futures are cancelled. + * Note: This code MAY be interrupted at any point if the Future for + * the task is cancelled. If it is interrupted during the RMI, then + * the expectation is that the NIO will be interrupted in a timely + * manner throwing back some sort of IOException indicating the + * asynchronous close of the IO channel or cancel of the RMI. */ - for (Future<Void> rf : remoteFutures) { - if (!rf.isDone()) { - // Cancel the request on the remote service (RMI). - try { - rf.cancel(true/* mayInterruptIfRunning */); - } catch (Throwable t) { - // ignored. - } - } + + // Resolve proxy for remote service. + final HACommitGlue service = getService(serviceId); + + // RMI. + final Future<T> ft = doRMI(service); + + try { + + /* + * Await the inner Future for the RMI. + * + * Note: In fact, this is a ThickFuture so it is already done by + * the time the RMI returns. + */ + + return ft.get(); + + } finally { + + ft.cancel(true/* mayInterruptIfRunning */); + } } + abstract protected Future<T> doRMI(final HACommitGlue service) + throws IOException; + } + private class PrepareMessageTask extends + AbstractMessageTask<Boolean, IHA2PhasePrepareMessage> { + + public PrepareMessageTask(final UUID serviceId, + final IHA2PhasePrepareMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Boolean> doRMI(final HACommitGlue service) + throws IOException { + + return service.prepare2Phase(msg); + + } + + } + + private class CommitMessageTask extends + AbstractMessageTask<Void, IHA2PhaseCommitMessage> { + + public CommitMessageTask(final UUID serviceId, + final IHA2PhaseCommitMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Void> doRMI(final HACommitGlue service) + throws IOException { + + return service.commit2Phase(msg); + } + + } + + private class AbortMessageTask extends + AbstractMessageTask<Void, IHA2PhaseAbortMessage> { + + public AbortMessageTask(final UUID serviceId, + final IHA2PhaseAbortMessage msg) { + + super(serviceId, msg); + + } + + @Override + protected Future<Void> doRMI(final HACommitGlue service) + throws IOException { + + return service.abort2Phase(msg); + } + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HARootBlockRequest.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -32,11 +32,33 @@ private static final long serialVersionUID = 1L; private final UUID storeUUID; + private final boolean isNonBlocking; + /** + * Create a non-blocking request (this is the historical behavior). + * + * @param storeUUID + * The store UUID (optional). + */ public HARootBlockRequest(final UUID storeUUID) { + this(storeUUID, true/* isNonBlocking */); + + } + + /** + * + * @param storeUUID + * The store UUID (optional). + * @param isNonBlocking + * <code>true</code> iff the request should be non-blocking. + */ + public HARootBlockRequest(final UUID storeUUID,final boolean isNonBlocking) { + // Note: Optional. this.storeUUID = storeUUID; + + this.isNonBlocking = isNonBlocking; } @@ -47,4 +69,11 @@ } + @Override + public boolean isNonBlocking() { + + return isNonBlocking; + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHARootBlockRequest.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -39,4 +39,18 @@ */ UUID getStoreUUID(); + /** + * When <code>true</code> the request should be non-blocking. Otherwise the + * request should obtain the lock that guards the update of the root block + * in the commit protocol such that the caller can not observe a root block + * that has been updated but where the commit protocol is still in its + * critical section. + * <p> + * Note: The non-blocking form of the request is used in some context where + * the avoidence of a deadlock is necessary. The blocking form is used in + * some contexts where we need to await a specific commit point on the + * service (typically under test suite control). + */ + boolean isNonBlocking(); + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -7842,9 +7842,34 @@ } - return new HARootBlockResponse( - AbstractJournal.this.getRootBlockView()); + if (msg.isNonBlocking()) { + // Non-blocking code path. + + return new HARootBlockResponse( + AbstractJournal.this.getRootBlockView()); + + } else { + + // Blocking code path. + + final ReadLock lock = _fieldReadWriteLock.readLock(); + + lock.lock(); + + try { + + return new HARootBlockResponse( + AbstractJournal.this.getRootBlockView()); + + } finally { + + lock.unlock(); + + } + + } + } // @Override Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -260,6 +260,11 @@ private JoinManager joinManager; /** + * Used to suppor the {@link #joinManager}. + */ + private volatile LookupDiscoveryManager lookupDiscoveryManager = null; + + /** * The {@link Configuration} read based on the args[] provided when the * server is started. */ @@ -399,24 +404,6 @@ } -// /** -// * An object used to manage jini service registrar discovery. -// */ -// public LookupDiscoveryManager getDiscoveryManagement() { -// -// return lookupDiscoveryManager; -// -// } -// -// /** -// * An object used to lookup services using the discovered service registars. -// */ -// public ServiceDiscoveryManager getServiceDiscoveryManager() { -// -// return serviceDiscoveryManager; -// -// } - /** * The {@link HAClient}. */ @@ -445,6 +432,7 @@ /** * Signals anyone waiting on {@link #discoveryEvent}. */ + @Override public void discarded(final DiscoveryEvent e) { try { @@ -472,6 +460,7 @@ /** * Signals anyone waiting on {@link #discoveryEvent}. */ + @Override public void discovered(final DiscoveryEvent e) { try { @@ -895,106 +884,6 @@ throw new AssertionError();// keeps compiler happy. } -// Note: Moved HAClient.connect() into quorumService.start(). -// final HAConnection ctx; -// try { -// -// // Create client. -// haClient = new HAClient(args); -// -// // Connect. -// ctx = haClient.connect(); -// -//// /* -//// * Note: This class will perform multicast discovery if ALL_GROUPS -//// * is specified and otherwise requires you to specify one or more -//// * unicast locators (URIs of hosts running discovery services). As -//// * an alternative, you can use LookupDiscovery, which always does -//// * multicast discovery. -//// */ -//// lookupDiscoveryManager = new LookupDiscoveryManager( -//// jiniClientConfig.groups, jiniClientConfig.locators, -//// this /* DiscoveryListener */, config); -//// -//// /* -//// * Setup a helper class that will be notified as services join or -//// * leave the various registrars to which the data server is -//// * listening. -//// */ -//// try { -//// -//// serviceDiscoveryManager = new ServiceDiscoveryManager( -//// lookupDiscoveryManager, new LeaseRenewalManager(), -//// config); -//// -//// } catch (IOException ex) { -//// -//// throw new RuntimeException( -//// "Could not initiate service discovery manager", ex); -//// -//// } -//// -//// } catch (IOException ex) { -//// -//// fatal("Could not setup discovery", ex); -//// throw new AssertionError();// keep the compiler happy. -//// -// } catch (ConfigurationException ex) { -// -// fatal("Configuration error: " + ex, ex); -// -// throw new AssertionError();// keep the compiler happy. -// -// } catch(Throwable ex) { -// -// fatal("Could not connect: " + ex, ex); -// -// throw new AssertionError();// keep the compiler happy. -// -// } - - // Note: Moved newService() call into AbstractServer.run(). -// /* -// * Create the service object. -// */ -// try { -// -// /* -// * Note: By creating the service object here rather than outside of -// * the constructor we potentially create problems for subclasses of -// * AbstractServer since their own constructor will not have been -// * executed yet. -// * -// * Some of those problems are worked around using a JiniClient to -// * handle all aspects of service discovery (how this service locates -// * the other services in the federation). -// * -// * Note: If you explicitly assign values to those clients when the -// * fields are declared, e.g., [timestampServiceClient=null] then the -// * ctor will overwrite the values set by [newService] since it is -// * running before those initializations are performed. This is -// * really crufty, may be JVM dependent, and needs to be refactored -// * to avoid this subclass ctor init problem. -// */ -// -// if (log.isInfoEnabled()) -// log.info("Creating service impl..."); -// -// // init. -// impl = newService(config); -// -// if (log.isInfoEnabled()) -// log.info("Service impl is " + impl); -// -// } catch(Exception ex) { -// -// fatal("Could not start service: "+this, ex); -// throw new AssertionError();// keeps compiler happy. -// } - -// // Export the service proxy. -// exportProxy(haClient, impl); - } /** @@ -1157,7 +1046,6 @@ } } - private volatile LookupDiscoveryManager lookupDiscoveryManager = null; /** * Await discovery of at least one {@link ServiceRegistrar}. @@ -1420,6 +1308,7 @@ * @param serviceID * The assigned {@link ServiceID}. */ + @Override synchronized public void serviceIDNotify(final ServiceID serviceID) { if (serviceID == null) @@ -1550,6 +1439,7 @@ * Note: This is only invoked if the automatic lease renewal by the lease * manager is denied by the service registrar. */ + @Override public void notify(final LeaseRenewalEvent event) { log.warn("Lease could not be renewed: " + this + " : " + event); @@ -1922,43 +1812,7 @@ } } - -// if (serviceDiscoveryManager != null) { -// -// serviceDiscoveryManager.terminate(); -// -// serviceDiscoveryManager = null; -// -// } -// -// if (lookupDiscoveryManager != null) { -// -// lookupDiscoveryManager.terminate(); -// -// lookupDiscoveryManager = null; -// -// } -// if (client != null) { -// -// if(client.isConnected()) { -// -// /* -// * Note: This will close the zookeeper client and that will -// * cause the ephemeral znode for the service to be removed. -// */ -// -//// if (log.isInfoEnabled()) -//// log.info("Disconnecting from federation"); -// -// client.disconnect(true/* immediateShutdown */); -// -// } -// -// client = null; -// -// } - } /** @@ -2181,7 +2035,8 @@ return new FileFilter() { - public boolean accept(File pathname) { + @Override + public boolean accept(final File pathname) { return false; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-31 14:53:27 UTC (rev 7504) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-31 17:04:33 UTC (rev 7505) @@ -441,11 +441,6 @@ } -// /** -// * Caching discovery client for the {@link HAGlue} services. -// */ -// private HAJournalDiscoveryClient discoveryClient; - /** * The journal. */ @@ -458,10 +453,6 @@ */ private boolean onelineDisasterRecovery; -// private ZookeeperClientConfig zkClientConfig; - -// private ZooKeeperAccessor zka; - /** * An executor used to handle events that were received in the zk watcher * event thread. We can not take actions that could block in the watcher @@ -560,15 +551,6 @@ */ Operator; } - -// /** -// * Caching discovery client for the {@link HAGlue} services. -// */ -// public HAJournalDiscoveryClient getDiscoveryClient() { -// -// return discoveryClient; -// -// } public HAJournalServer(final String[] args, final LifeCycle lifeCycle) { @@ -603,14 +585,6 @@ @Override protected void terminate() { -// if (discoveryClient != null) { -// -// discoveryClient.terminate(); -// -// discoveryClient = null; -// -// } - super.terminate(); } @@ -663,49 +637,6 @@ if (log.isInfoEnabled()) log.info("Creating service impl..."); -// /* -// * Verify discovery of at least one ServiceRegistrar. -// */ -// getHAClient().getConnection().awaitServiceRegistrars(10/* timeout */, -// TimeUnit.SECONDS); - -// { -// final long begin = System.currentTimeMillis(); -// -// ServiceRegistrar[] registrars = null; -// -// long elapsed = 0; -// -// while ((registrars == null || registrars.length == 0) -// && elapsed < TimeUnit.SECONDS.toMillis(10)) { -// -// registrars = getHAClient().getConnection() -// .getDiscoveryManagement().getRegistrars(); -// -// Thread.sleep(100/* ms */); -// -// elapsed = System.currentTimeMillis() - begin; -// -// } -// -// if (registrars == null || registrars.length == 0) { -// -// throw new RuntimeException( -// "Could not discover ServiceRegistrar(s)"); -// -// } -// -// if (log.isInfoEnabled()) { -// log.info("Found " + registrars.length + " service registrars"); -// } -// -// } - -// // Setup discovery for HAGlue clients. -// discoveryClient = new HAJournalDiscoveryClient( -// getServiceDiscoveryManager(), -// null/* serviceDiscoveryListener */, cacheMissTimeout); - // Jini/River ServiceID. final ServiceID serviceID = getServiceID(); @@ -727,7 +658,6 @@ * Setup the Quorum / HAJournal. */ -// zkClientConfig = new ZookeeperClientConfig(config); final ZookeeperClientConfig zkClientConfig = getHAClient() .getZookeeperClientConfig(); @@ -757,56 +687,9 @@ /* * Zookeeper quorum. */ - final Quorum<HAGlue, QuorumService<HAGlue>> quorum; - { + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = (Quorum) new ZKQuorumImpl<HAGlue, HAQuorumService<HAGlue, HAJournal>>( + replicationFactor); -// final List<ACL> acl = zkClientConfig.acl; -// -// final ZooKeeperAccessor zka = getHAClient().getConnection() -// .getZookeeperAccessor(); -// -// if (!zka.awaitZookeeperConnected(10, TimeUnit.SECONDS)) { -// -// throw new RuntimeException("Could not connect to zk"); -// -// } -// -// if (log.isInfoEnabled()) { -// log.info("Connected to zookeeper"); -// } -// -// /* -// * Ensure key znodes exist. -// */ -// try { -// zka.getZookeeper() -// .create(zkClientConfig.zroot, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } -// try { -// zka.getZookeeper() -// .create(logicalServiceZPathPrefix, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } -// try { -// zka.getZookeeper() -// .create(logicalServiceZPath, -// new byte[] {/* data */}, acl, -// CreateMode.PERSISTENT); -// } catch (NodeExistsException ex) { -// // ignore. -// } - - quorum = (Quorum) new ZKQuorumImpl<HAGlue, HAQuorumService<HAGlue, HAJournal>>( - replicationFactor);// , zka, acl); - } - // The HAJournal. this.journal = newHAJournal(this, config, quorum); @@ -822,19 +705,15 @@ // Setup the quorum client (aka quorum service). quorumService = newQuorumService(logicalServiceZPath, serviceUUID, haGlueService, journal); - -// // wrap the external interface, exposing administrative functions. -// final AdministrableHAGlueService administrableService = new AdministrableHAGlueService( -// this, haGlueService); -// -// // return that wrapped interface. -// return administrableService; /* - * Return that object. This will get proxied. If we wrap it with a - * delegation pattern here, then RMI methods on a subclass of - * HAGlueService will not be visible on the exported proxy. + * Return our external interface object. This object will get proxied. + * + * Note: If we wrap that object with a delegation pattern, then RMI + * methods on a subclass of HAGlueService will not be visible on the + * exported proxy. */ + return haGlueService; } @@ -924,25 +803,9 @@ // Start the NSS. startNSS(); -// // Setup listener that logs quorum events @ TRACE. -// journal.getQuorum().addListener(new QuorumListener() { -// @Override -// public void notify(final QuorumEvent e) { -// if (log.isTraceEnabled()) -// log.trace(e); -// } -// }); - -// // Setup the quorum client (aka quorum service). -// quorumService = newQuorumService(logicalServiceZPath, serviceUUID, -// haGlueService, journal); - // Start the quorum. journal.getQuorum().start(quorumService); -// // Enter a run state for the HAJournalServer. -// quorumService.enterRunState(quorumService.new RestoreTask()); - } /** @@ -985,37 +848,6 @@ if (log.isInfoEnabled()) log.info("destroy=" + destroy); - // Note: Moved to quorumService.terminate(). -// if (quorumService != null) { -// -// /* -// * FIXME SHUTDOWN: What if we are already running a ShutdownTask? We -// * should just submit a ShutdownTask here and let it work this out. -// */ -// -// /* -// * Ensure that the HAQuorumService will not attempt to cure any -// * serviceLeave or related actions. -// * -// * TODO SHUTDOWN: If we properly enter a ShutdownTask run state then -// * we would not have to do this since it will already be in the -// * Shutdown runstate. -// */ -// quorumService.runStateRef.set(RunStateEnum.Shutdown); -// -// /* -// * Terminate any running task. -// */ -// final FutureTask<?> ft = quorumService.runStateFutureRef.get(); -// -// if (ft != null) { -// -// ft.cancel(true/* mayInterruptIfRunning */); -// -// } -// -// } - final HAJournal tjournal = journal; final Quorum<HAGlue, QuorumService<HAGlue>> quorum = tjournal == null ? null @@ -1042,26 +874,7 @@ * that method.] */ quorum.terminate(); - - // Note: handled by HAClient.disconnect(). - // TODO Should we do that disconnect here? -// /* -// * Close our zookeeper connection, invalidating all ephemeral -// * znodes for this service. -// * -// * Note: This provides a decisive mechanism for removing this -// * service from the joined services, the pipeline, withdrawing -// * its vote, and removing it as a quorum member. -// */ -// if (haLog.isInfoEnabled()) -// haLog.warn("FORCING UNCURABLE ZOOKEEPER DISCONNECT"); -// -// if (zka != null) { -// -// zka.close(); -// -// } - + } catch (Throwable t) { log.error(t, t); @@ -1215,6 +1028,7 @@ } + @Override final public T call() throws Exception { /* @@ -1504,11 +1318,6 @@ @Override protected long getRetrySendTimeoutNanos() { -// final ZooKeeperAccessor zka = journal -// .getHAClient() -// .getConnection() -// .getZookeeperAccessor(); - final ZooKeeper zk = getZooKeeper(); int negotiatedSessionTimeoutMillis; // try { @@ -2206,21 +2015,6 @@ log.warn("Will attempt SERVICE LEAVE"); getActor().serviceLeave(); // Just once(!) -// while (true) { -// try { -// getActor().serviceLeave(); -// break; -// } catch (RuntimeException re) { -// if (InnerCause.isInnerCause(re, -// KeeperException.class)) { -// // Do not retry in a tight loop. -// Thread.sleep(250/* ms */); -// // Retry. -// continue; -// } -// throw re; -// } -// } /** * Dispatch Events before entering SeekConsensus! Otherwise @@ -2743,10 +2537,6 @@ while (r.hasMoreBuffers()) { -// // IHABufferStrategy -// final IHABufferStrategy strategy = journal -// .getBufferStrategy(); - // get message and fill write cache buffer (unless // WORM). final IHAWriteMessage msg = r.processNextBuffer(buf @@ -2815,6 +2605,7 @@ } + @Override protected Void doRun() throws Exception { /* @@ -3004,6 +2795,7 @@ * * @throws Exception */ + @Override protected Void doRun() throws Exception { // // Wait for the token to be set, root blocks to be valid. @@ -4106,6 +3898,7 @@ txs.runWithBarrierLock(new Runnable() { + @Override public void run() { // Verify that the quorum is valid. @@ -4765,168 +4558,8 @@ // Wait for the HAJournalServer to terminate. server.run(); -// try { -// final Server tmp = server.jettyServer; -// if (tmp != null) { -// // Wait for the jetty server to terminate. -// tmp.join(); -// } -// } catch (InterruptedException e) { -// log.warn(e); -// } - System.exit(0); } - -// /** -// * Adds jini administration interfaces to the basic {@link HAGlue} interface -// * exposed by the {@link HAJournal}. -// * -// * @see HAJournal.HAGlueService -// * -// * @author <a href="mailto:tho...@us...">Bryan -// * Thompson</a> -// */ -// public static class AdministrableHAGlueService extends HAGlueDelegate -// implements RemoteAdministrable, RemoteDestroyAdmin { -// -// final protected HAJournalServer server; -// -// public AdministrableHAGlueService(final HAJournalServer server, -// final HAGlue service) { -// -// super(service); -// -// this.server = server; -// -// } -// -//// /** -//// * Returns an object that implements whatever administration interfaces -//// * are appropriate for the particular service. -//// * -//// * @return an object that implements whatever administration interfaces -//// * are appropriate for the particular service. -//// */ -//// public Object getAdmin() throws RemoteException { -//// -//// if (log.isInfoEnabled()) -//// log.info("serviceID=" + server.getServiceID()); -//// -//// return server.proxy; -//// -//// } -// -//// /** -//// * Sets up the {@link MDC} logging context. You should do this on every -//// * client facing point of entry and then call -//// * {@link #clearLoggingContext()} in a <code>finally</code> clause. You -//// * can extend this method to add additional context. -//// * <p> -//// * This implementation adds the following parameters to the {@link MDC}. -//// * <dl> -//// * <dt>serviceName</dt> -//// * <dd>The serviceName is typically a configuration property for the -//// * service. This datum can be injected into log messages using -//// * <em>%X{serviceName}</em> in your log4j pattern layout.</dd> -//// * <dt>serviceUUID</dt> -//// * <dd>The serviceUUID is, in general, assigned asynchronously by the -//// * service registrar. Once the serviceUUID becomes available it will be -//// * added to the {@link MDC}. This datum can be injected into log -//// * messages using <em>%X{serviceUUID}</em> in your log4j pattern layout. -//// * </dd> -//// * <dt>hostname</dt> ... [truncated message content] |