From: <tho...@us...> - 2013-10-11 15:05:25
|
Revision: 7448 http://bigdata.svn.sourceforge.net/bigdata/?rev=7448&view=rev Author: thompsonbry Date: 2013-10-11 15:05:17 +0000 (Fri, 11 Oct 2013) Log Message: ----------- Added pre-condition test to sendHAStore() and sendHALog() that service is leader. Invariant is that quorum remains met, therefore invariant also verifies that the service remains the leader. ZKQuorumImpl now generates a QUORUM_DISCONNECTED event. This can now be monitored by the invariant listener (TODO). AbstractQuorum modified to expose some methods to ZKQuorumImpl for sendEvent(). AbstractQuorum.client made volatile, but getClient() and getClientAsMember() still use lock (we will change that next). - http://sourceforge.net/apps/trac/bigdata/ticket/718 (HAJournalServer needs to handle ZK client connection loss) - http://sourceforge.net/apps/trac/bigdata/ticket/723 (HA asynchronous tasks must be canceled when invariants are changed) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumEventEnum.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java 2013-10-11 13:50:11 UTC (rev 7447) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java 2013-10-11 15:05:17 UTC (rev 7448) @@ -64,6 +64,10 @@ private static final Logger log = Logger.getLogger(FutureTaskInvariantMon.class); private final Quorum<HAGlue, QuorumService<HAGlue>> m_quorum; + /** + * The quorum token on entry. + */ + private final long token; private final List<QuorumEventInvariant> m_triggers = new CopyOnWriteArrayList<QuorumEventInvariant>(); @@ -77,6 +81,8 @@ m_quorum = quorum; + token = quorum.token(); + } public FutureTaskInvariantMon(final Runnable runnable, final T result, @@ -89,6 +95,8 @@ m_quorum = quorum; + token = quorum.token(); + } /** @@ -103,17 +111,25 @@ * Hook to manage listener registration and establish invariants. */ @Override - public void run() { - m_quorum.addListener(this); - try { - establishInvariants(); - - super.run(); - } finally { - m_quorum.removeListener(this); - } - } + public void run() { + boolean didStart = false; + m_quorum.addListener(this); + try { + establishInvariants(); + didStart = true; + super.run(); + } finally { + m_quorum.removeListener(this); + if (!didStart) { + /* + * Guarantee cancelled unless run() invoked. + */ + cancel(true/* mayInterruptIfRunning */); + } + } + } + /** * Establish an invariant that the specified service is a member of the * quorum. @@ -176,7 +192,8 @@ } /** - * Establish an invariant that the quorum is met. + * Establish an invariant that the quorum is met and remains met on the same + * token. */ public void assertQuorumMet() { m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.QUORUM_BROKE, @@ -185,6 +202,8 @@ // now check that quorum is met and break if not if (!m_quorum.isQuorumMet()) broken(); + if (m_quorum.token() != token) + broken(); } /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-11 13:50:11 UTC (rev 7447) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-11 15:05:17 UTC (rev 7448) @@ -292,10 +292,15 @@ /** * The {@link QuorumClient}. + * <p> + * Note: This is volatile to allow visibility without holding the + * {@link #lock}. The field is only modified in {@link #start(QuorumClient)} + * and {@link #terminate()}, and those methods use the {@link #lock} to + * impose an appropriate ordering over events. * * @see #start(QuorumClient) */ - private C client; + private volatile C client; /** * An object which watches the distributed state of the quorum and informs @@ -718,7 +723,8 @@ public C getClient() { lock.lock(); try { - if (this.client == null) + final C client = this.client; + if (client == null) throw new IllegalStateException(); return client; } finally { @@ -729,7 +735,8 @@ public QuorumMember<S> getMember() { lock.lock(); try { - if (this.client == null) + final C client = this.client; + if (client == null) throw new IllegalStateException(); if (client instanceof QuorumMember<?>) { return (QuorumMember<S>) client; @@ -754,6 +761,8 @@ */ private QuorumMember<S> getClientAsMember() { + final C client = this.client; + if (client instanceof QuorumMember<?>) { return (QuorumMember<S>) client; @@ -3262,7 +3271,7 @@ * @param e * The event. */ - private void sendEvent(final QuorumEvent e) { + protected void sendEvent(final QuorumEvent e) { if (log.isTraceEnabled()) log.trace("" + e); if (sendSynchronous) { @@ -3423,7 +3432,7 @@ * @param t * The throwable. */ - private void launderThrowable(final Throwable t) { + protected void launderThrowable(final Throwable t) { if (InnerCause.isInnerCause(t, InterruptedException.class)) { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumEventEnum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumEventEnum.java 2013-10-11 13:50:11 UTC (rev 7447) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumEventEnum.java 2013-10-11 15:05:17 UTC (rev 7448) @@ -96,6 +96,11 @@ /** * Event generated when a quorum breaks (aka when the token is cleared). */ - QUORUM_BROKE; + QUORUM_BROKE, + /** + * Event generated when a service becomes disconnected from a remote quorum + * (such as a zookeeper ensemble). + */ + QUORUM_DISCONNECTED; } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-10-11 13:50:11 UTC (rev 7447) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-10-11 15:05:17 UTC (rev 7448) @@ -896,6 +896,9 @@ // The commit counter of the desired closing root block. final long commitCounter = req.getCommitCounter(); + // Note the token on entry. + final long token = getQuorum().token(); + /* * Open the HALog file. If it exists, then we will run a task to * send it along the pipeline. @@ -928,16 +931,25 @@ isLive = r.isLive(); // Task sends an HALog file along the pipeline. - ft = new FutureTaskInvariantMon<Void>(new SendHALogTask(req, r), getQuorum()) { + ft = new FutureTaskInvariantMon<Void>(new SendHALogTask( + req, r), getQuorum()) { - @Override - protected void establishInvariants() { - assertQuorumMet(); - assertJoined(getServiceId()); - assertMember(req.getServiceId()); - assertInPipeline(req.getServiceId()); - } - + @Override + protected void establishInvariants() { + assertQuorumMet(); + assertJoined(getServiceId()); + assertMember(req.getServiceId()); + assertInPipeline(req.getServiceId()); + /* + * Note: This is a pre-condition, not an invariant. + * We verify on entry that this service is the + * leader. The invariant is that the quorum remains + * met on the current token, which is handled by + * assertQuorumMet(). + */ + getQuorum().assertLeader(token); + } + }; // Run task. @@ -1126,18 +1138,28 @@ if (haLog.isDebugEnabled()) haLog.debug("req=" + req); + // Note the token on entry. + final long token = getQuorum().token(); + // Task sends an HALog file along the pipeline. final FutureTask<IHASendStoreResponse> ft = new FutureTaskInvariantMon<IHASendStoreResponse>( new SendStoreTask(req), getQuorum()) { - @Override - protected void establishInvariants() { - assertQuorumMet(); - assertJoined(getServiceId()); - assertMember(req.getServiceId()); - assertInPipeline(req.getServiceId()); - } - + @Override + protected void establishInvariants() { + assertQuorumMet(); + assertJoined(getServiceId()); + assertMember(req.getServiceId()); + assertInPipeline(req.getServiceId()); + /* + * Note: This is a pre-condition, not an invariant. We + * verify on entry that this service is the leader. The + * invariant is that the quorum remains met on the current + * token, which is handled by assertQuorumMet(). + */ + getQuorum().assertLeader(token); + } + }; // Run task. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-11 13:50:11 UTC (rev 7447) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-11 15:05:17 UTC (rev 7448) @@ -60,6 +60,7 @@ import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumActor; import com.bigdata.quorum.QuorumClient; +import com.bigdata.quorum.QuorumEventEnum; import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumMember; import com.bigdata.quorum.QuorumWatcher; @@ -1457,12 +1458,12 @@ if (client != null) { try { client.disconnected(); - } catch (RuntimeException ex) { - log.error(ex); - } catch (Exception ex) { - log.error(ex); + } catch (Exception t) { + launderThrowable(t); } } + sendEvent(new E(QuorumEventEnum.QUORUM_DISCONNECTED, + lastValidToken(), token(), null/* serviceId */)); } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |