From: <tho...@us...> - 2013-10-11 13:50:25
|
Revision: 7447 http://bigdata.svn.sourceforge.net/bigdata/?rev=7447&view=rev Author: thompsonbry Date: 2013-10-11 13:50:11 +0000 (Fri, 11 Oct 2013) Log Message: ----------- Checkpoint providing resolution for #723 (HA asynchronous tasks must be canceled when invariants are changed) and #718 (HAJournalServer needs to handle ZK client connection loss). Some remaining todos have been identified in working these tickets: For #723, we want to add an assertLeader() invariant and then use this in sendHALog() and sendHAStore() to guard an additional invariant. Also for #723, we want to add an invariant that the zk client is connected. This will need to hook the QuorumClient.disconnected() method. For #718, we are not aware of any problems. However, we need to provide more test coverage. We will add a bounce2() and bounce3() test. We will also try to parameterize the test suite for tests that currently do a service shutdown and restart with an enum indicating whether we should do a service shutdown, sudden kill, or dropZookeeper for the service(s) that get shutdown in our failover test suite. AbstractQuorum.interruptAll() should be invoked from the ErrorTask. This will interrupt any inflight requests for the actor to acomplish a quorum state transition. If the actor attempts to cause two different state transitions, this can lead to a deadlock since one of the expected conditions might no be achieved. This situation has been observed before for AbstractQuorum.terminate(). AbstractQuorum.getClient() has been observed to block. It will be rewritten to use a non-blocking access method as this is currently causing contention that is not appropriate, e.g., for getExtendedRunState() versus doServiceLeave(). We will add an invariant listener to the digest utility to ensure that the locks are released if the client computing the digest is disconnected from zookeeper. We have updated the zookeeper dependency to 3.4.5. See #723 (HA asynchronous tasks must be canceled when invariants are changed) See #718 (HAJournalServer needs to handle ZK client connection loss) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/.classpath branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/journal/Journal.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorumClient.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/QuorumClient.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/BIGDATA_RELEASE_1_3_0/build.properties Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/lib/apache/zookeeper-3.4.5.jar branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/InvariantTask.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3InvariantListener.java Removed Paths: ------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/lib/apache/zookeeper-3.3.3.jar Modified: branches/BIGDATA_RELEASE_1_3_0/.classpath =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/.classpath 2013-10-10 17:18:39 UTC (rev 7446) +++ branches/BIGDATA_RELEASE_1_3_0/.classpath 2013-10-11 13:50:11 UTC (rev 7447) @@ -33,7 +33,7 @@ <classpathentry kind="src" path="bigdata-gas/src/test"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/dsi-utils-1.0.6-020610.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/lgpl-utils-1.0.6-020610.jar"/> - <classpathentry exported="true" kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.3.3.jar"/> + <classpathentry kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.4.5.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-continuation-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-http-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-io-7.2.2.v20101205.jar"/> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskInvariantMon.java 2013-10-11 13:50:11 UTC (rev 7447) @@ -0,0 +1,302 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.concurrent; + +import java.io.Serializable; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import org.apache.log4j.Logger; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; +import com.bigdata.quorum.Quorum; +import com.bigdata.quorum.QuorumEvent; +import com.bigdata.quorum.QuorumEventEnum; +import com.bigdata.quorum.QuorumListener; +import com.bigdata.util.StackInfoReport; + +/** + * A {@link Future} that allows you to cancel a computation if an invariant is + * violated. This class is specifically designed to monitor quorum related + * invariants for HA. + * <p> + * Once an invariant is established, listening for the relevant quorum events + * commences and a check is made to verify that the invariant holds on entry. + * This pattern ensures there is no possibility of a missed event. + * <p> + * This {@link FutureTask} wrapper will return the value of the {@link Callable} + * (or the specified result for the {@link Runnable}) iff the task completes + * successfully without the invariant being violated. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @param <T> + * The generic type of the future. + */ +public abstract class FutureTaskInvariantMon<T> extends FutureTaskMon<T> + implements QuorumListener { + + private static final Logger log = Logger.getLogger(FutureTaskInvariantMon.class); + + private final Quorum<HAGlue, QuorumService<HAGlue>> m_quorum; + + private final List<QuorumEventInvariant> m_triggers = new CopyOnWriteArrayList<QuorumEventInvariant>(); + + public FutureTaskInvariantMon(final Callable<T> callable, + final Quorum<HAGlue, QuorumService<HAGlue>> quorum) { + + super(callable); + + if (quorum == null) + throw new IllegalArgumentException(); + + m_quorum = quorum; + + } + + public FutureTaskInvariantMon(final Runnable runnable, final T result, + Quorum<HAGlue, QuorumService<HAGlue>> quorum) { + + super(runnable, result); + + if (quorum == null) + throw new IllegalArgumentException(); + + m_quorum = quorum; + + } + + /** + * Concrete implementations use this callback hook to establish the + * invariants to be monitored. + */ + abstract protected void establishInvariants(); + + /** + * {@inheritDoc} + * <p> + * Hook to manage listener registration and establish invariants. + */ + @Override + public void run() { + m_quorum.addListener(this); + try { + establishInvariants(); + + super.run(); + } finally { + m_quorum.removeListener(this); + } + } + + /** + * Establish an invariant that the specified service is a member of the + * quorum. + * + * @param serviceId + * The service. + */ + public void assertMember(final UUID serviceId) { + m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.MEMBER_REMOVE, + serviceId)); + + // now check that already a member and break if not + assertMembership(m_quorum.getMembers(), serviceId); + } + + /** + * Establish an invariant that the specified service is joined with the met + * quorum. + * + * @param serviceId + * The service. + */ + public void assertJoined(final UUID serviceId) { + m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.SERVICE_LEAVE, + serviceId)); + + // now check that already joined and break if not + assertMembership(m_quorum.getJoined(), serviceId); + } + + /** + * Establish an invariant that the specified service is a not joined with + * the met quorum. + * + * @param serviceId + * The service. + */ + public void assertNotJoined(final UUID serviceId) { + m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.SERVICE_JOIN, + serviceId)); + + // now check not already joined and break if it is + if (isMember(m_quorum.getJoined(), serviceId)) + broken(); + } + + /** + * Establish an invariant that the specified service is in the quorum + * pipeline. + * + * @param serviceId + * The service. + */ + public void assertInPipeline(final UUID serviceId) { + m_triggers.add(new QuorumEventInvariant( + QuorumEventEnum.PIPELINE_REMOVE, serviceId)); + + // now check that already in pipeline and break if not + assertMembership(m_quorum.getPipeline(), serviceId); + } + + /** + * Establish an invariant that the quorum is met. + */ + public void assertQuorumMet() { + m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.QUORUM_BROKE, + null/* serviceId */)); + + // now check that quorum is met and break if not + if (!m_quorum.isQuorumMet()) + broken(); + } + + /** + * Establish an invariant that the quorum is fully met. + */ + public void assertQuorumFullyMet() { + // no-one must leave! + m_triggers.add(new QuorumEventInvariant(QuorumEventEnum.SERVICE_LEAVE, + null/* serviceId */)); + + // now check that quorum is fully met on the current token and break if not + if (!m_quorum.isQuorumFullyMet(m_quorum.token())) + broken(); + } + + private void assertMembership(final UUID[] members, final UUID serviceId) { + if (isMember(members, serviceId)) + return; + + broken(); + } + + private boolean isMember(final UUID[] members, final UUID serviceId) { + for (UUID member : members) { + if (member.equals(serviceId)) + return true; + } + + return false; + } + + /** + * Any QuorumEvent must be checked to see if it matches an Invariant trigger + */ + @Override + public void notify(final QuorumEvent e) { + boolean interrupt = false; + + for (QuorumEventInvariant inv : m_triggers) { + if (inv.matches(e)) { + interrupt = true; + break; + } + } + + // interrupt the call thread + if (interrupt) { + broken(); + } else { + if (log.isDebugEnabled()) + log.debug("Ignoring event: " + e); + } + } + + private void broken() { + log.warn("BROKEN", new StackInfoReport()); + + cancel(true/*mayInterruptIfRunning*/); + } + + @SuppressWarnings("serial") + private class QuorumEventInvariant implements QuorumEvent, Serializable { + + private final QuorumEventEnum m_qe; + private final UUID m_sid; + + /** + * + * @param qe + * The {@link QuorumEvent} type (required). + * @param sid + * The service {@link UUID} (optional). When <code>null</code> + * the {@link QuorumEventEnum} will be matched for ANY service + * {@link UUID}. + */ + public QuorumEventInvariant(final QuorumEventEnum qe, final UUID sid) { + if (qe == null) + throw new IllegalArgumentException(); + m_qe = qe; + m_sid = sid; + } + + @Override + public QuorumEventEnum getEventType() { + return m_qe; + } + + @Override + public long lastValidToken() { + throw new UnsupportedOperationException(); + } + + @Override + public long token() { + throw new UnsupportedOperationException(); + } + + @Override + public UUID getServiceId() { + return m_sid; + } + + @Override + public long lastCommitTime() { + throw new UnsupportedOperationException(); + } + + public boolean matches(final QuorumEvent qe) { + return qe.getEventType() == m_qe + && (m_sid == null || m_sid.equals(qe.getServiceId())); + } + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-10 17:18:39 UTC (rev 7446) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-10-11 13:50:11 UTC (rev 7447) @@ -1,2095 +1,2133 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -package com.bigdata.ha; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; - -import com.bigdata.ha.msg.HAWriteMessageBase; -import com.bigdata.ha.msg.IHALogRequest; -import com.bigdata.ha.msg.IHAMessage; -import com.bigdata.ha.msg.IHASyncRequest; -import com.bigdata.ha.msg.IHAWriteMessage; -import com.bigdata.ha.pipeline.HAReceiveService; -import com.bigdata.ha.pipeline.HAReceiveService.IHAReceiveCallback; -import com.bigdata.ha.pipeline.HASendService; -import com.bigdata.io.DirectBufferPool; -import com.bigdata.io.IBufferAccess; -import com.bigdata.quorum.QCE; -import com.bigdata.quorum.Quorum; -import com.bigdata.quorum.QuorumException; -import com.bigdata.quorum.QuorumMember; -import com.bigdata.quorum.QuorumStateChangeEvent; -import com.bigdata.quorum.QuorumStateChangeEventEnum; -import com.bigdata.quorum.QuorumStateChangeListener; -import com.bigdata.quorum.QuorumStateChangeListenerBase; -import com.bigdata.util.InnerCause; - -/** - * {@link QuorumPipeline} implementation. - * <p> - * The {@link QuorumMember} must pass along the "pipeline" messages, including: - * <ul> - * <li>{@link QuorumMember#pipelineAdd()}</li> - * <li>{@link QuorumMember#pipelineRemove()}</li> - * <li>{@link QuorumMember#pipelineChange(UUID, UUID)}</li> - * </ul> - * When a quorum is met, the <i>leader</i> is always first in the write pipeline - * since it is the node which receives writes from clients. When a service joins - * the write pipeline, it always does so at the end of the chain. Services may - * enter the write pipeline before joining a quorum in order to synchronize with - * the quorum. If a service in the middle of the chain leaves the pipeline, then - * the upstream node will reconfigure and retransmit the current cache block to - * its new downstream node. This prevent nodes which are "bouncing" during - * synchronization from causing write sets to be discarded. However, if the - * leader leaves the write pipeline, then the quorum is broken and the write set - * will be discarded. - * <p> - * Since the write pipeline is used to synchronize services trying to join the - * quorum as well as the replicate writes for services joined with the quorum, - * {@link HAReceiveService} may be live for a met quorum even though the - * {@link QuorumMember} on whose behalf this class is acting is not joined with - * the met quorum. - * - * <h3>Pipeline maintenance</h3> - * - * There are three broad categories which have to be handled: (1) leader leaves; - * (2) pipeline leader election; and (3) follower leaves. A leader leave causes - * the quorum to break, which will cause service leaves and pipeline leaves for - * all joined services. However, services must add themselves to the pipeline - * before they join the quorum and the pipeline will be reorganized if necessary - * when the quorum leader is elected. This will result in a - * {@link #pipelineElectedLeader()} event. A follower leave only causes the - * follower to leave the pipeline and results in a - * {@link #pipelineChange(UUID, UUID)} event. - * <p> - * There are two cases for a follower leave: (A) when the follower did not did - * not have a downstream node; and (B) when there is downstream node. For (B), - * the upstream node from the left follower should reconfigure for the new - * downstream node and retransmit the current cache block and the event should - * be otherwise unnoticed. - * <p> - * Handling a follower join requires us to synchronize the follower first which - * requires some more infrastructure and should be done as part of the HA - * synchronization test suite. - * <p> - * What follows is an example of how events will arrive for a quorum of three - * services: A, B, and C. - * - * <pre> - * A.getActor().pipelineAdd() => A.pipelineAdd() - * B.getActor().pipelineAdd() => B.pipelineAdd(); A.pipelineChange(null,B); - * C.getActor().pipelineAdd() => C.pipelineAdd(); B.pipelineChange(null,C); - * </pre> - * - * At this point the pipeline order is <code>[A,B,C]</code>. Notice that the - * {@link HASendService} for A is not established until the - * <code>A.pipelineChange(null,B)</code> sets B as the new downstream service - * for A. Likewise, B will not relay to C until it handles the - * <code>B.pipelineChange(null,C)</code> event. - * - * <p> - * - * Given the pipeline order <code>[A,B,C]</code>, if B were to leave, then the - * events would be: - * - * <pre> - * B.getActor().pipelineRemove() => B.pipelineRemove(); A.pipelineChange(B,C); - * </pre> - * - * and when this class handles the <code>A.pipelineChange(B,C)</code> event, it - * must update the {@link HAReceiveService} such that it now relays data to C. - * - * <p> - * - * On the other hand, given the pipeline order <code>[A,B,C]</code>, if C were - * to leave the events would be: - * - * <pre> - * C.getActor().pipelineRemove() => C.pipelineRemove(); B.pipelineChange(C,null); - * </pre> - * - * and when this class handles the <code>B.pipelineChange(C,null)</code> event, - * it must update the C's {@link HAReceiveService} such that it continues to - * receive data, but no longer relays data to a downstream service. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @param <S> - * - * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/681" > - * HAJournalServer deadlock: pipelineRemove() and getLeaderId() </a> - */ -abstract public class QuorumPipelineImpl<S extends HAPipelineGlue> /*extends - QuorumStateChangeListenerBase */implements QuorumPipeline<S>, - QuorumStateChangeListener { - - static private transient final Logger log = Logger - .getLogger(QuorumPipelineImpl.class); - - /** - * The timeouts for a sleep before the next retry. These timeouts are - * designed to allow some asynchronous processes to reconnect the - * {@link HASendService} and the {@link HAReceiveService}s in write pipeline - * such that a retransmit can succeed after a service has left the pipeline. - * Depending on the nature of the error (i.e., a transient network problem - * versus a pipeline reorganization), this can involve a number of zookeeper - * events. Hence the sleep latency is backed off through this array of - * values. - * - * TODO We do not want to induce too much latency here. It would be nice if - * we automatically retried after each relevant quorum event that might cure - * the problem as well as after a timeout. This would require a Condition - * that we await with a timeout and signaling the Condition if there are any - * relevant events (probably once we handle them locally). - */ - static protected final int RETRY_SLEEP[] = new int[] { 100, 200, 200, 500, 500, 1000 }; - - /** - * The {@link QuorumMember}. - */ - protected final QuorumMember<S> member; - - /** - * The service {@link UUID} for the {@link QuorumMember}. - */ - protected final UUID serviceId; - - /** - * Lock managing the various mutable aspects of the pipeline state. - */ - private final ReentrantLock lock = new ReentrantLock(); - - /** send service (iff this is the leader). */ - private HASendService sendService; - - /** - * The receive service (iff this is a follower in a met quorum). - */ - private HAReceiveService<HAMessageWrapper> receiveService; - - /** - * The buffer used to relay the data. This is only allocated for a - * follower. - */ - private IBufferAccess receiveBuffer; - - /** - * Cached metadata about the downstream service. - */ - private final AtomicReference<PipelineState<S>> pipelineStateRef = new AtomicReference<PipelineState<S>>(); - - /** - * Inner class does the actual work once to handle an event. - */ - private final InnerEventHandler innerEventHandler = new InnerEventHandler(); - - /** - * Core implementation of the handler for the various events. Always run - * while holding the {@link #lock}. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - * - * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/681" > - * HAJournalServer deadlock: pipelineRemove() and getLeaderId() </a> - */ - private final class InnerEventHandler extends QuorumStateChangeListenerBase { - - /** - * A queue of events that can only be handled when a write replication - * operation owns the {@link QuorumPipelineImpl#lock}. - * - * @see QuorumPipelineImpl#lock() - * @see #dispatchEvents() - */ - private final BlockingQueue<QuorumStateChangeEvent> queue = new LinkedBlockingQueue<QuorumStateChangeEvent>(); - - protected InnerEventHandler() { - - } - - /** - * Enqueue an event. - * - * @param e - * The event. - */ - private void queue(final QuorumStateChangeEvent e) { - - if (log.isInfoEnabled()) - log.info("Adding StateChange: " + e); - - queue.add(e); - - } - - /** - * Boolean controls whether or not event elision is used. See below. - * - * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/681" > - * HAJournalServer deadlock: pipelineRemove() and getLeaderId() - * </a> - */ - static private final boolean s_eventElission = true; - - /** - * Event elission endeavours to ensure that events processed - * represent current state change. - * - * This is best explained with an example from its original usage - * in processing graphic events. Whilst a "button click" is a singular - * event and all button clicks should be processed, a "mouse move" event - * could be elided with the next "mouse move" event. Thus the move events - * (L1 -> L2) and (L2 -> L3) would elide to a single (L1 -> L3). - * - * In HA RMI calls can trigger event processing, whilst other threads monitor - * state changes - such as open sockets. Without elission, monitoring threads - * will observe unnecessary transitional state changes. HOWEVER, there remains - * a problem with this pattern of synchronization. - */ - private void elideEvents() { - - if (!s_eventElission) { - return; - } - - /* - * Check for event elission: check for PIPELINE_UPSTREAM and - * PIPELINE_CHANGE and remove earlier ones check for PIPELINE_ADD - * and PIPELINE_REMOVE pairings. - */ - final Iterator<QuorumStateChangeEvent> events = queue.iterator(); - QuorumStateChangeEvent uce = null; // UPSTREAM CHANGE - QuorumStateChangeEvent dce = null; // DOWNSTREAM CHANGE - QuorumStateChangeEvent add = null; // PIPELINE_ADD - - while (events.hasNext()) { - final QuorumStateChangeEvent tst = events.next(); - if (tst.getEventType() == QuorumStateChangeEventEnum.PIPELINE_UPSTREAM_CHANGE) { - if (uce != null) { - if (log.isDebugEnabled()) - log.debug("Elission removal of: " + uce); - queue.remove(uce); - } - uce = tst; - } else if (tst.getEventType() == QuorumStateChangeEventEnum.PIPELINE_CHANGE) { - if (dce != null) { - // replace 'from' of new state with 'from' of old - tst.getDownstreamOldAndNew()[0] = dce - .getDownstreamOldAndNew()[0]; - - if (log.isDebugEnabled()) - log.debug("Elission removal of: " + dce); - queue.remove(dce); - } - dce = tst; - } else if (tst.getEventType() == QuorumStateChangeEventEnum.PIPELINE_ADD) { - add = tst; - } else if (tst.getEventType() == QuorumStateChangeEventEnum.PIPELINE_REMOVE) { - if (add != null) { - if (log.isDebugEnabled()) { - log.debug("Elission removal of: " + add); - log.debug("Elission removal of: " + tst); - } - queue.remove(add); - queue.remove(tst); - add = null; - } - if (dce != null) { - if (log.isDebugEnabled()) - log.debug("Elission removal of: " + dce); - queue.remove(dce); - dce = null; - } - if (uce != null) { - if (log.isDebugEnabled()) - log.debug("Elission removal of: " + uce); - queue.remove(uce); - uce = null; - } - } - - } - - } // elideEvents() - - /** - * Dispatch any events in the {@link #queue}. - */ - private void dispatchEvents() { - - elideEvents(); - - QuorumStateChangeEvent e; - - // If an event is immediately available, dispatch it now. - while ((e = queue.poll()) != null) { - - if (log.isInfoEnabled()) - log.info("Dispatching: " + e); - - // An event is available. - innerEventHandler.dispatchEvent(e); - - } - - } - - /** - * Dispatch to the InnerEventHandler. - * - * @param e - * The event. - * - * @throws IllegalMonitorStateException - * if the caller does not own the {@link #lock}. - */ - private void dispatchEvent(final QuorumStateChangeEvent e) - throws IllegalMonitorStateException { - - if(!lock.isHeldByCurrentThread()) { - - /* - * The InnerEventHandler should be holding the outer lock. - */ - - throw new IllegalMonitorStateException(); - - } - - if (log.isInfoEnabled()) - log.info(e.toString()); - - switch (e.getEventType()) { - case CONSENSUS: - consensus(e.getLastCommitTimeConsensus()); - break; - case LOST_CONSENSUS: - lostConsensus(); - break; - case MEMBER_ADD: - memberAdd(); - break; - case MEMBER_REMOVE: - memberRemove(); - break; - case PIPELINE_ADD: - pipelineAdd(); - break; - case PIPELINE_CHANGE: { - final UUID[] a = e.getDownstreamOldAndNew(); - pipelineChange(a[0]/* oldDownStreamId */, a[1]/* newDownStreamId */); - break; - } - case PIPELINE_ELECTED_LEADER: - pipelineElectedLeader(); - break; - case PIPELINE_REMOVE: - pipelineRemove(); - break; - case PIPELINE_UPSTREAM_CHANGE: - pipelineUpstreamChange(); - break; - case QUORUM_BREAK: - quorumBreak(); - break; - case QUORUM_MEET: - quorumMeet(e.getToken(), e.getLeaderId()); - break; - case SERVICE_JOIN: - serviceJoin(); - break; - case SERVICE_LEAVE: - serviceLeave(); - break; - default: - throw new UnsupportedOperationException(e.getEventType().toString()); - } - } - -// @Override -// public void serviceLeave() { -// } -// -// @Override -// public void serviceJoin() { -// } -// -// /** -// * Extended to setup this service as a leader ({@link #setUpLeader()}), -// * or a relay ({@link #setUpReceiveAndRelay()}. -// */ -// @Override -// public void quorumMeet(final long token, final UUID leaderId) { -// super.quorumMeet(token, leaderId); -// lock.lock(); -// try { -// this.token = token; -// if(leaderId.equals(serviceId)) { -// setUpLeader(); -// } else if(member.isPipelineMember()) { -// setUpReceiveAndRelay(); -// } -// } finally { -// lock.unlock(); -// } -// } - -// @Override -// public void quorumBreak() { -// super.quorumBreak(); -// lock.lock(); -// try { -// tearDown(); -// } finally { -// lock.unlock(); -// } -// } - - /** - * {@inheritDoc} - * <p> - * This implementation sets up the {@link HASendService} or the - * {@link HAReceiveService} as appropriate depending on whether or not - * this service is the first in the pipeline order. - */ - @Override - public void pipelineAdd() { - if (log.isInfoEnabled()) - log.info(""); - super.pipelineAdd(); - lock.lock(); - try { - // The current pipeline order. - final UUID[] pipelineOrder = member.getQuorum().getPipeline(); - // The index of this service in the pipeline order. - final int index = getIndex(serviceId, pipelineOrder); - if (index == 0) { - setUpSendService(); - } else if (index > 0) { - setUpReceiveService(); - } - } finally { - lock.unlock(); - } - } - - @Override - public void pipelineElectedLeader() { - if (log.isInfoEnabled()) - log.info(""); - super.pipelineElectedLeader(); - lock.lock(); - try { - tearDown(); - setUpSendService(); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - * <p> - * This implementation tears down the {@link HASendService} or - * {@link HAReceiveService} associated with this service. - */ - @Override - public void pipelineRemove() { - if (log.isInfoEnabled()) - log.info(""); - super.pipelineRemove(); - lock.lock(); - try { - tearDown(); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - * <p> - * This implementation changes the target of the {@link HASendService} - * for the leader (or the {@link HAReceiveService} for a follower) to - * send (or relay) write cache blocks to the specified service. - */ - @Override - public void pipelineChange(final UUID oldDownStreamId, - final UUID newDownStreamId) { - super.pipelineChange(oldDownStreamId, newDownStreamId); - lock.lock(); - try { - // The address of the next service in the pipeline. - final InetSocketAddress addrNext = newDownStreamId == null ? null - : getAddrNext(newDownStreamId); - if (log.isInfoEnabled()) - log.info("oldDownStreamId=" + oldDownStreamId - + ",newDownStreamId=" + newDownStreamId - + ", addrNext=" + addrNext + ", sendService=" - + sendService + ", receiveService=" - + receiveService); - if (sendService != null) { - /* - * Terminate the existing connection (we were the first - * service in the pipeline). - */ - sendService.terminate(); - if (addrNext != null) { - if (log.isDebugEnabled()) - log.debug("sendService.start(): addrNext=" - + addrNext); - sendService.start(addrNext); - } - } else if (receiveService != null) { - /* - * Reconfigure the receive service to change how it is - * relaying (we were relaying, so the receiveService was - * running but not the sendService). - */ - if (log.isDebugEnabled()) - log.debug("receiveService.changeDownStream(): addrNext=" - + addrNext); - receiveService.changeDownStream(addrNext); - } - // populate and/or clear the cache. - cachePipelineState(newDownStreamId); - if (log.isDebugEnabled()) - log.debug("pipelineChange - done."); - } finally { - lock.unlock(); - } - } - - @Override - public void pipelineUpstreamChange() { - super.pipelineUpstreamChange(); - lock.lock(); - try { - if (receiveService != null) { - /* - * Make sure that the receiveService closes out its client - * connection with the old upstream service. - */ - if (log.isInfoEnabled()) - log.info("receiveService=" + receiveService); - receiveService.changeUpStream(); - } - } finally { - lock.unlock(); - } - } - -// @Override -// public void memberRemove() { -// } -// -// @Override -// public void memberAdd() { -// } -// -// @Override -// public void lostConsensus() { -// } -// -// @Override -// public void consensus(long lastCommitTime) { -// } - - /** - * Request the {@link InetSocketAddress} of the write pipeline for a service - * (RMI). - * - * @param downStreamId - * The service. - * - * @return It's {@link InetSocketAddress} - */ - private InetSocketAddress getAddrNext(final UUID downStreamId) { - - if (downStreamId == null) - return null; - - final S service = member.getService(downStreamId); - - try { - - final InetSocketAddress addrNext = service.getWritePipelineAddr(); - - return addrNext; - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - } - - /** - * Tear down any state associated with the {@link QuorumPipelineImpl}. This - * implementation tears down the send/receive service and releases the - * receive buffer. - */ - private void tearDown() { - if (log.isInfoEnabled()) - log.info(""); - lock.lock(); - try { - /* - * Leader tear down. - */ - { - if (sendService != null) { - sendService.terminate(); - sendService = null; - } - } - /* - * Follower tear down. - */ - { - if (receiveService != null) { - receiveService.terminate(); - try { - receiveService.awaitShutdown(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - receiveService = null; - } - } - if (receiveBuffer != null) { - try { - /* - * Release the buffer back to the pool. - */ - receiveBuffer.release(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - receiveBuffer = null; - } - } - } - // clear cache. - pipelineStateRef.set(null); - } finally { - lock.unlock(); - } - } - - /** - * Populate or clear the {@link #pipelineState} cache. - * <p> - * Note: The only times we need to populate the {@link #pipelineState} are - * in response to a {@link #pipelineChange(UUID, UUID)} event or in response - * to message a {@link #pipelineElectedLeader()} event. - * - * @param downStreamId - * The downstream service {@link UUID}. - */ - private void cachePipelineState(final UUID downStreamId) { - - if (downStreamId == null) { - - pipelineStateRef.set(null); - - return; - - } - - final S nextService = member.getService(downStreamId); - - final PipelineState<S> pipelineState = new PipelineState<S>(); - - try { - - pipelineState.addr = nextService.getWritePipelineAddr(); - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - pipelineState.service = nextService; - - pipelineStateRef.set(pipelineState); - - } - - /** - * Setup the send service. - */ - private void setUpSendService() { - if (log.isInfoEnabled()) - log.info(""); - lock.lock(); - try { - // Allocate the send service. - sendService = new HASendService(); - /* - * The service downstream from this service. - * - * Note: The downstream service in the pipeline is not available - * when the first service adds itself to the pipeline. In those - * cases the pipelineChange() event is used to update the - * HASendService to send to the downstream service. - * - * Note: When we handle a pipelineLeaderElected() message the - * downstream service MAY already be available, which is why we - * handle downstreamId != null conditionally. - */ - final UUID downstreamId = member.getDownstreamServiceId(); - if (downstreamId != null) { - // The address of the next service in the pipeline. - final InetSocketAddress addrNext = member.getService( - downstreamId).getWritePipelineAddr(); - // Start the send service. - sendService.start(addrNext); - } - // populate and/or clear the cache. - cachePipelineState(downstreamId); - } catch (Throwable t) { - try { - tearDown(); - } catch (Throwable t2) { - log.error(t2, t2); - } - throw new RuntimeException(t); - } finally { - lock.unlock(); - } - } - - /** - * Setup the service to receive pipeline writes and to relay them (if there - * is a downstream service). - */ - private void setUpReceiveService() { - lock.lock(); - try { - // The downstream service UUID. - final UUID downstreamId = member.getDownstreamServiceId(); - // Acquire buffer from the pool to receive data. - try { - receiveBuffer = DirectBufferPool.INSTANCE.acquire(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // The address of this service. - final InetSocketAddress addrSelf = member.getService() - .getWritePipelineAddr(); - // Address of the downstream service (if any). - final InetSocketAddress addrNext = downstreamId == null ? null - : member.getService(downstreamId).getWritePipelineAddr(); - // Setup the receive service. - receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, - addrNext, new IHAReceiveCallback<HAMessageWrapper>() { - public void callback(final HAMessageWrapper msg, - final ByteBuffer data) throws Exception { - // delegate handling of write cache blocks. - handleReplicatedWrite(msg.req, msg.msg, data); - } - }); - // Start the receive service - will not return until service is - // running - receiveService.start(); - } catch (Throwable t) { - /* - * Always tear down if there was a setup problem to avoid leaking - * threads or a native ByteBuffer. - */ - try { - tearDown(); - } catch (Throwable t2) { - log.error(t2, t2); - } finally { - log.error(t, t); - } - throw new RuntimeException(t); - } finally { - lock.unlock(); - } - } - - }; - - /** - * Acquire {@link #lock} and {@link #dispatchEvents()}. - */ - private void lock() { - boolean ok = false; - this.lock.lock(); - try { - innerEventHandler.dispatchEvents();// have lock, dispatch events. - ok = true; // success. - } finally { - if (!ok) { - // release lock if there was a problem. - this.lock.unlock(); - } - } - } - - /** - * Acquire {@link #lock} and {@link #dispatchEvents()}. - */ - private void lockInterruptibly() throws InterruptedException { - boolean ok = false; - lock.lockInterruptibly(); - try { - innerEventHandler.dispatchEvents(); // have lock, dispatch events. - ok = true; // success. - } finally { - if (!ok) { - // release lock if there was a problem. - this.lock.unlock(); - } - } - } - - /** - * {@link #dispatchEvents()} and release {@link #lock}. - */ - private void unlock() { - try { - innerEventHandler.dispatchEvents(); - } finally { - this.lock.unlock(); - } - } - - public QuorumPipelineImpl(final QuorumMember<S> member) { - - if (member == null) - throw new IllegalArgumentException(); - - this.member = member; - - this.serviceId = member.getServiceId(); - - } - - /** - * Extended to invoke {@link #tearDown()} in order to guarantee the eventual - * release of the {@link #receiveBuffer} and the shutdown of the - * {@link #sendService} or {@link #receiveService}. - */ - @Override - protected void finalize() throws Throwable { - - innerEventHandler.tearDown(); - - super.finalize(); - - } - - /** - * Return the index at which the given serviceId appears in the array of - * serviceIds. - * - * @param serviceId - * The {@link UUID} of some quorum member. - * @param a - * An array of service {@link UUID}s. - * - * @return The index of the service in the array -or- <code>-1</code> if the - * service does not appear in the array. - */ - static private int getIndex(final UUID serviceId, final UUID[] a) { - - if (serviceId == null) - throw new IllegalArgumentException(); - - for (int i = 0; i < a.length; i++) { - - if (serviceId.equals(a[i])) { - - return i; - - } - } - - return -1; - - } - - /** - * Return the NIO buffer used to receive payloads written on the HA write - * pipeline. - * - * @return The buffer -or- <code>null</code> if the pipeline has been torn - * down or if this is the leader. - */ - private ByteBuffer getReceiveBuffer() { - - if (!lock.isHeldByCurrentThread()) { - - // The caller MUST be holding the lock. - throw new IllegalMonitorStateException(); - - } - - // trinary pattern is safe while thread has lock. - return receiveBuffer == null ? null : receiveBuffer.buffer(); - - } - - /** - * Return the {@link HAReceiveService} used to receive payloads written on - * the HA write pipeline. - * - * @return The buffer -or- <code>null</code> if the pipeline has been torn - * down or if this is the leader. - */ - private HAReceiveService<HAMessageWrapper> getHAReceiveService() { - - if (!lock.isHeldByCurrentThread()) { - - // The caller MUST be holding the lock. - throw new IllegalMonitorStateException(); - - } - - return receiveService; - - } - - /** - * Return the {@link HASendService} used to write payloads on the HA write - * pipeline. - * - * @return The {@link HASendService} -or- <code>null</code> if the pipeline - * has been torn down. - */ - private HASendService getHASendService() { - - if (!lock.isHeldByCurrentThread()) { - - // The caller MUST be holding the lock. - throw new IllegalMonitorStateException(); - - } - - return sendService; - - } - - /* - * QuorumStateChangeListener - * - * Note: This interface is delegated using a queue. The queue allows - * the processing of the events to be deferred until the appropriate - * lock is held. This prevents contention for the lock and avoids - * lock ordering problems such as described at [1]. - * - * @see InnerEventHandler - */ - - @Override - public void pipelineAdd() { - - innerEventHandler - .queue(new QCE(QuorumStateChangeEventEnum.PIPELINE_ADD)); - - } - - @Override - public void pipelineElectedLeader() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.PIPELINE_ELECTED_LEADER)); - - } - - @Override - public void pipelineRemove() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.PIPELINE_REMOVE)); - - } - - @Override - public void pipelineChange(final UUID oldDownStreamId, - final UUID newDownStreamId) { - - innerEventHandler - .queue(new QCE(QuorumStateChangeEventEnum.PIPELINE_CHANGE, - new UUID[] { oldDownStreamId, newDownStreamId }, - null/* lastCommitTimeConsensus */, null/* token */, - null/* leaderId */)); - - } - - @Override - public void pipelineUpstreamChange() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.PIPELINE_UPSTREAM_CHANGE)); - - } - - @Override - public void memberAdd() { - - innerEventHandler.queue(new QCE(QuorumStateChangeEventEnum.MEMBER_ADD)); - - } - - @Override - public void memberRemove() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.MEMBER_REMOVE)); - - } - - @Override - public void consensus(final long lastCommitTime) { - - innerEventHandler.queue(new QCE(QuorumStateChangeEventEnum.CONSENSUS, - null/* downstreamIds */, - lastCommitTime/* lastCommitTimeConsensus */, null/* token */, - null/* leaderId */)); - - } - - @Override - public void lostConsensus() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.LOST_CONSENSUS)); - - } - - @Override - public void serviceJoin() { - - innerEventHandler - .queue(new QCE(QuorumStateChangeEventEnum.SERVICE_JOIN)); - - } - - @Override - public void serviceLeave() { - - innerEventHandler.queue(new QCE( - QuorumStateChangeEventEnum.SERVICE_LEAVE)); - - } - - @Override - public void quorumMeet(final long token, final UUID leaderId) { - - innerEventHandler.queue(new QCE(QuorumStateChangeEventEnum.QUORUM_MEET, - null/* downstreamIds */, null/* lastCommitTimeConsensus */, - token, leaderId)); - - } - - @Override - public void quorumBreak() { - - innerEventHandler - .queue(new QCE(QuorumStateChangeEventEnum.QUORUM_BREAK)); - - } - - /* - * End of QuorumStateChangeListener. - */ - - /** - * Glue class wraps the {@link IHAWriteMessage} and the - * {@link IHALogRequest} message and exposes the requires {@link IHAMessage} - * interface to the {@link HAReceiveService}. This class is never persisted. - * It just let's us handshake with the {@link HAReceiveService} and get back - * out the original {@link IHAWriteMessage} as well as the optional - * {@link IHALogRequest} message. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class HAMessageWrapper extends HAWriteMessageBase { - - private static final long serialVersionUID = 1L; - - final IHASyncRequest req; - final IHAWriteMessage msg; - - public HAMessageWrapper(final IHASyncRequest req, - final IHAWriteMessage msg) { - - // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); - - this.req = req; // MAY be null; - this.msg = msg; - - } - - } - - /* - * This is the leader, so send() the buffer. - */ - @Override - public Future<Void> replicate(final IHASyncRequest req, - final IHAWriteMessage msg, final ByteBuffer b) throws IOException { - - final RunnableFuture<Void> ft; - - lock(); - try { - - ft = new FutureTask<Void>(new RobustReplicateTask(req, msg, b)); - - } finally { - - unlock(); - - } - - // Submit Future for execution (outside of the lock). - member.getExecutor().execute(ft); - - // Return Future. Caller must wait on the Future. - return ft; - - } - - /** - * Task robustly replicates an {@link IHAWriteMessage} and the associated - * payload. - */ - private class RobustReplicateTask implements Callable<Void> { - - /** - * An historical message is indicated when the {@link IHASyncRequest} is - * non-<code>null</code>. - */ - private final IHASyncRequest req; - - /** - * The {@link IHAWriteMessage}. - */ - private final IHAWriteMessage msg; - - /** - * The associated payload. - */ - private final ByteBuffer b; - - /** - * The token for the leader. The service that initiates the replication - * of a message MUST be the leader for this token. - * <p> - * The token is either taken from the {@link IHAWriteMessage} (if this - * is a live write) or from the current {@link Quorum#token()}. - * <p> - * Either way, we verify that this service is (and remains) the leader - * for that token throughout the {@link RobustReplicateTask}. - */ - private final long quorumToken; - - public RobustReplicateTask(final IHASyncRequest req, - final IHAWriteMessage msg, final ByteBuffer b) { - - // Note: [req] MAY be null. - - if (msg == null) - throw new IllegalArgumentException(); - - if (b == null) - throw new IllegalArgumentException(); - - this.req = req; - - this.msg = msg; - - this.b = b; - - if (b.remaining() == 0) { - - // Empty buffer. - - throw new IllegalStateException("Empty buffer: req=" + req - + ", msg=" + msg + ", buffer=" + b); - - } - - if (req == null) { - - /* - * Live message. - * - * Use the quorum token on the message. It was put there by the - * WriteCacheService. This allows us to ensure that the qourum - * token remains valid for all messages replicated by the - * leader. - */ - - quorumToken = msg.getQuorumToken(); - - // Must be the leader for that token. - member.assertLeader(quorumToken); - - } else { - - /* - * Historical message. - */ - - // Use the current quorum token. - quorumToken = member.getQuorum().token(); - - // Must be the leader for that token. - member.assertLeader(quorumToken); - - } - - } - - /** - * This collects the assertion(s) that we make for the service that is - * attempting to robustly replicate a write into a single method. This - * was done in order to concentrate any conditional logic and design - * rationale into a single method. - * <p> - * Note: IFF we allow non-leaders to replicate HALog messages then this - * assert MUST be changed to verify that the quorum token remains valid - * and that this service remains joined with the met quorum, i.e., - * - * <pre> - * if (!quorum.isJoined(token)) - * throw new QuorumException(); - * </pre> - */ - private void assertQuorumState() throws QuorumException { ... [truncated message content] |