From: <tho...@us...> - 2013-12-12 23:38:58
|
Revision: 7636 http://bigdata.svn.sourceforge.net/bigdata/?rev=7636&view=rev Author: thompsonbry Date: 2013-12-12 23:38:47 +0000 (Thu, 12 Dec 2013) Log Message: ----------- Sync to Martyn and CI on #724 (write replication pipeline resynchronization). We have incorporated logic to drain to the marker in the replication protocol. The marker concept has been refactored. There is now an IHASendState that captures original and potentially routing information for the payload. This has been raised into the HAPipelineGlue interface. We still need to bring in the typed exception handling for forceRemoveService() invocations and examine QuorumPipelineImpl for possible lock contention issues around this resync protocol. Removed dead test suite from build.xml Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java branches/MGC_1_3_0/build.xml Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/AbstractHASendAndReceiveTestCase.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineGlue.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -37,6 +37,7 @@ import com.bigdata.ha.msg.IHALogRootBlocksRequest; import com.bigdata.ha.msg.IHALogRootBlocksResponse; import com.bigdata.ha.msg.IHARebuildRequest; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASendStoreResponse; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -128,14 +129,17 @@ * A request for an HALog (optional). This is only non-null when * historical {@link WriteCache} blocks are being replayed down * the write pipeline in order to synchronize a service. + * @param snd + * Metadata about the state of the sender and potentially the + * routing of the payload along the write replication pipeline. * @param msg * The metadata. * * @return The {@link Future} which will become available once the buffer * transfer is complete. */ - Future<Void> receiveAndReplicate(IHASyncRequest req, IHAWriteMessage msg) - throws IOException; + Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, + IHAWriteMessage msg) throws IOException; /** * Request metadata about the current write set from the quorum leader. Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipeline.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -33,6 +33,7 @@ import java.util.concurrent.Future; import com.bigdata.ha.halog.HALogWriter; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.io.writecache.WriteCache; @@ -85,11 +86,14 @@ * A synchronization request (optional). This is only non-null * when historical {@link WriteCache} blocks are being replayed * down the write pipeline in order to synchronize a service. + * @param snd + * Metadata about the state of the sender and potentially the + * routing of the payload along the write replication pipeline. * @param msg * The RMI metadata about the payload. */ - Future<Void> receiveAndReplicate(IHASyncRequest req, IHAWriteMessage msg) - throws IOException; + Future<Void> receiveAndReplicate(IHASyncRequest req, IHASendState snd, + IHAWriteMessage msg) throws IOException; /* * Note: Method removed since it does not appear necessary to let this Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -41,15 +41,17 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; -import com.bigdata.ha.msg.HAWriteMessageBase; -import com.bigdata.ha.msg.IHALogRequest; +import com.bigdata.ha.msg.HAMessageWrapper; +import com.bigdata.ha.msg.HASendState; import com.bigdata.ha.msg.IHAMessage; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.pipeline.HAReceiveService; @@ -190,7 +192,7 @@ * href="https://sourceforge.net/apps/trac/bigdata/ticket/724">HA wire * pulling and sudden kill testing</a>. */ - private final int RETRY_SLEEP = 100; //200; // 50; // milliseconds. + private final int RETRY_SLEEP = 30; //200; // 50; // milliseconds. /** * Once this timeout is elapsed, retrySend() will fail. @@ -207,12 +209,12 @@ /** * The {@link QuorumMember}. */ - protected final QuorumMember<S> member; + private final QuorumMember<S> member; /** * The service {@link UUID} for the {@link QuorumMember}. */ - protected final UUID serviceId; + private final UUID serviceId; /** * Lock managing the various mutable aspects of the pipeline state. @@ -244,6 +246,11 @@ private final InnerEventHandler innerEventHandler = new InnerEventHandler(); /** + * One up message identifier. + */ + private final AtomicLong messageId = new AtomicLong(0L); + + /** * Core implementation of the handler for the various events. Always run * while holding the {@link #lock}. * @@ -851,15 +858,19 @@ public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { // delegate handling of write cache blocks. - handleReplicatedWrite(msg.req, msg.msg, data); + handleReplicatedWrite(msg.getHASyncRequest(), + (IHAWriteMessage) msg + .getHAWriteMessage(), data); } @Override public void incReceive(final HAMessageWrapper msg, final int nreads, final int rdlen, final int rem) throws Exception { // delegate handling of incremental receive notify. - QuorumPipelineImpl.this.incReceive(msg.req, - msg.msg, nreads, rdlen, rem); + QuorumPipelineImpl.this.incReceive(// + msg.getHASyncRequest(), + (IHAWriteMessage) msg.getHAWriteMessage(), // + nreads, rdlen, rem); } }); // Start the receive service - will not return until service is @@ -1170,36 +1181,17 @@ /* * End of QuorumStateChangeListener. */ - - /** - * Glue class wraps the {@link IHAWriteMessage} and the - * {@link IHALogRequest} message and exposes the requires {@link IHAMessage} - * interface to the {@link HAReceiveService}. This class is never persisted. - * It just let's us handshake with the {@link HAReceiveService} and get back - * out the original {@link IHAWriteMessage} as well as the optional - * {@link IHALogRequest} message. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private static class HAMessageWrapper extends HAWriteMessageBase { - private static final long serialVersionUID = 1L; + private IHASendState newSendState() { - final IHASyncRequest req; - final IHAWriteMessage msg; - - public HAMessageWrapper(final IHASyncRequest req, - final IHAWriteMessage msg) { + final Quorum<?, ?> quorum = member.getQuorum(); - // Use size and checksum from real IHAWriteMessage. - super(msg.getSize(),msg.getChk()); - - this.req = req; // MAY be null; - this.msg = msg; - - } + final IHASendState snd = new HASendState(messageId.incrementAndGet(), + serviceId/* originalSenderId */, serviceId/* senderId */, + quorum.token(), quorum.replicationFactor()); + return snd; + } /* @@ -1214,7 +1206,8 @@ lock(); try { - ft = new FutureTask<Void>(new RobustReplicateTask(req, msg, b)); + ft = new FutureTask<Void>(new RobustReplicateTask(req, + newSendState(), msg, b)); } finally { @@ -1243,6 +1236,11 @@ private final IHASyncRequest req; /** + * Metadata about the state of the sender for this message. + */ + private final IHASendState snd; + + /** * The {@link IHAWriteMessage}. */ private final IHAWriteMessage msg; @@ -1265,10 +1263,14 @@ private final long quorumToken; public RobustReplicateTask(final IHASyncRequest req, - final IHAWriteMessage msg, final ByteBuffer b) { + final IHASendState snd, final IHAWriteMessage msg, + final ByteBuffer b) { // Note: [req] MAY be null. + if (snd == null) + throw new IllegalArgumentException(); + if (msg == null) throw new IllegalArgumentException(); @@ -1277,6 +1279,8 @@ this.req = req; + this.snd = snd; + this.msg = msg; this.b = b; @@ -1467,6 +1471,7 @@ * at com.bigdata.ha.pipeline.HAReceiveService.run(HAReceiveService.java:431) * </pre> */ + @Override public Void call() throws Exception { final long beginNanos = System.nanoTime(); @@ -1549,7 +1554,7 @@ final ByteBuffer b = this.b.duplicate(); - new SendBufferTask<S>(member, quorumToken, req, msg, b, + new SendBufferTask<S>(member, quorumToken, req, snd, msg, b, downstream, sendService, sendLock).call(); return; @@ -1674,6 +1679,7 @@ private final QuorumMember<S> member; private final long token; // member MUST remain leader for token. private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final PipelineState<S> downstream; @@ -1681,13 +1687,15 @@ private final Lock sendLock; public SendBufferTask(final QuorumMember<S> member, final long token, - final IHASyncRequest req, final IHAWriteMessage msg, - final ByteBuffer b, final PipelineState<S> downstream, + final IHASyncRequest req, final IHASendState snd, + final IHAWriteMessage msg, final ByteBuffer b, + final PipelineState<S> downstream, final HASendService sendService, final Lock sendLock) { this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.downstream = downstream; @@ -1696,6 +1704,7 @@ } + @Override public Void call() throws Exception { /* @@ -1723,13 +1732,13 @@ ExecutionException, IOException { // Get Future for send() outcome on local service. - final Future<Void> futSnd = sendService.send(b); + final Future<Void> futSnd = sendService.send(b, snd.getMarker()); try { // Get Future for receive outcome on the remote service (RMI). final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + .receiveAndReplicate(req, snd, msg); try { @@ -1780,7 +1789,8 @@ @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, - final IHAWriteMessage msg) throws IOException { + final IHASendState snd, final IHAWriteMessage msg) + throws IOException { /* * FIXME We should probably pass the quorum token through from the @@ -1837,7 +1847,7 @@ */ ft = new FutureTask<Void>(new ReceiveTask<S>(member, token, - req, msg, b, receiveService)); + req, snd, msg, b, receiveService)); // try { // @@ -1862,7 +1872,8 @@ */ ft = new FutureTask<Void>(new ReceiveAndReplicateTask<S>( - member, token, req, msg, b, downstream, receiveService)); + member, token, req, snd, msg, b, downstream, + receiveService)); } @@ -1891,6 +1902,7 @@ private final QuorumMember<S> member; private final long token; private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final HAReceiveService<HAMessageWrapper> receiveService; @@ -1898,6 +1910,7 @@ public ReceiveTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, + final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final HAReceiveService<HAMessageWrapper> receiveService ) { @@ -1905,16 +1918,18 @@ this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.receiveService = receiveService; } + @Override public Void call() throws Exception { - // wrap the messages together. + // wrap the messages together. final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + req, snd, msg); // Get Future for send() outcome on local service. final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, @@ -1957,6 +1972,7 @@ private final QuorumMember<S> member; private final long token; private final IHASyncRequest req; + private final IHASendState snd; private final IHAWriteMessage msg; private final ByteBuffer b; private final PipelineState<S> downstream; @@ -1964,6 +1980,7 @@ public ReceiveAndReplicateTask(final QuorumMember<S> member, final long token, final IHASyncRequest req, + final IHASendState snd, final IHAWriteMessage msg, final ByteBuffer b, final PipelineState<S> downstream, final HAReceiveService<HAMessageWrapper> receiveService) { @@ -1971,17 +1988,19 @@ this.member = member; this.token = token; this.req = req; // Note: MAY be null. + this.snd = snd; this.msg = msg; this.b = b; this.downstream = downstream; this.receiveService = receiveService; } + @Override public Void call() throws Exception { // wrap the messages together. final HAMessageWrapper wrappedMsg = new HAMessageWrapper( - req, msg); + req, snd, msg); // Get Future for send() outcome on local service. final Future<Void> futSnd = receiveService.receiveData(wrappedMsg, @@ -1992,7 +2011,7 @@ // Get future for receive outcome on the remote // service. final Future<Void> futRec = downstream.service - .receiveAndReplicate(req, msg); + .receiveAndReplicate(req, snd, msg); try { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.journal.AbstractJournal; @@ -233,10 +234,11 @@ @Override public Future<Void> receiveAndReplicate(final IHASyncRequest req, - final IHAWriteMessage msg) throws IOException { + final IHASendState snd, final IHAWriteMessage msg) + throws IOException { + + return pipelineImpl.receiveAndReplicate(req, snd, msg); - return pipelineImpl.receiveAndReplicate(req, msg); - } @Override Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAMessageWrapper.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,84 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +import com.bigdata.ha.pipeline.HAReceiveService; + +/** + * Glue class wraps the {@link IHAWriteMessage} and the {@link IHALogRequest} + * message and exposes the requires {@link IHAMessage} interface to the + * {@link HAReceiveService}. This class is never persisted. It just let's us + * handshake with the {@link HAReceiveService} and get back out the original + * {@link IHAWriteMessage} as well as the optional {@link IHALogRequest} + * message. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class HAMessageWrapper extends HAWriteMessageBase implements + IHAMessageWrapper { + + private static final long serialVersionUID = 1L; + + private final IHASyncRequest req; + private final IHASendState snd; + private final IHAWriteMessageBase msg; + + public HAMessageWrapper(final IHASyncRequest req, final IHASendState snd, + final IHAWriteMessageBase msg) { + + // Use size and checksum from real IHAWriteMessage. + super(msg.getSize(), msg.getChk()); + + this.req = req; // MAY be null; + this.snd = snd; + this.msg = msg; + + } + + @Override + public IHASyncRequest getHASyncRequest() { + return req; + } + + @Override + public IHASendState getHASendState() { + return snd; + } + + @Override + public IHAWriteMessageBase getHAWriteMessage() { + return msg; + } + + /** + * Return the {@link IHASendState#getMarker()} iff there is an associated + * {@link IHASendState} and otherwise <code>null</code>. + */ + public byte[] getMarker() { + + return snd == null ? null : snd.getMarker(); + + } + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,245 @@ +package com.bigdata.ha.msg; + +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; + +import com.bigdata.io.DataOutputBuffer; +import com.bigdata.rawstore.Bytes; + +public class HASendState implements IHASendState, Externalizable { + +// private static final Logger log = Logger.getLogger(HASendState.class); + + private static final long serialVersionUID = 1; + + private long messageId; + private UUID originalSenderId; + private UUID senderId; + private long token; + private int replicationFactor; + + /** + * De-serialization constructor. + */ + public HASendState() { + + } + + public HASendState(final long messageId, final UUID originalSenderId, + final UUID senderId, final long token, final int replicationFactor) { + + if (originalSenderId == null) + throw new IllegalArgumentException(); + + if (senderId == null) + throw new IllegalArgumentException(); + + if (replicationFactor <= 0) + throw new IllegalArgumentException(); + + this.messageId = messageId; + this.originalSenderId = originalSenderId; + this.senderId = senderId; + this.token = token; + this.replicationFactor = replicationFactor; + + } + + @Override + public long getMessageId() { + + return messageId; + + } + + @Override + public UUID getOriginalSenderId() { + + return originalSenderId; + } + + @Override + public UUID getSenderId() { + + return senderId; + } + + @Override + public long getQuorumToken() { + + return token; + + } + + @Override + public int getReplicationFactor() { + + return replicationFactor; + + } + + @Override + public byte[] getMarker() { + + final byte[] a = new byte[MAGIC_SIZE + currentVersionLen]; + + final DataOutputBuffer dob = new DataOutputBuffer(0/* len */, a); + + try { + + dob.writeLong(MAGIC); + + writeExternal2(dob); + + } catch (IOException e) { + + throw new RuntimeException(e); + + } + + return a; + + } + + @Override + public String toString() { + + return super.toString() + "{messageId=" + messageId + + ",originalSenderId=" + originalSenderId + ",senderId=" + + senderId + ",token=" + token + ", replicationFactor=" + + replicationFactor + "}"; + + } + + @Override + public boolean equals(final Object obj) { + + if (this == obj) + return true; + + if (!(obj instanceof IHASendState)) + return false; + + final IHASendState t = (IHASendState) obj; + + return messageId == t.getMessageId() + && originalSenderId.equals(t.getOriginalSenderId()) + && senderId.equals(t.getSenderId()) && token == t.getQuorumToken() + && replicationFactor == t.getReplicationFactor(); + + } + + @Override + public int hashCode() { + + // based on the messageId and the hashCode of the senderId + return ((int) (messageId ^ (messageId >>> 32))) + senderId.hashCode(); + } + + /** + * Magic data only included in the marker. + */ + private static final long MAGIC = 0x13759f98e8363caeL; + private static final int MAGIC_SIZE = Bytes.SIZEOF_LONG; + + private static final transient short VERSION0 = 0x0; + private static final transient int VERSION0_LEN = // + Bytes.SIZEOF_LONG + // messageId + Bytes.SIZEOF_UUID + // originalSenderId + Bytes.SIZEOF_UUID + // senderId + Bytes.SIZEOF_LONG + // token + Bytes.SIZEOF_INT // replicationFactor + ; + + private static final transient short currentVersion = VERSION0; + private static final transient int currentVersionLen = VERSION0_LEN; + + @Override + public void readExternal(final ObjectInput in) throws IOException, + ClassNotFoundException { + + final short version = in.readShort(); + + if (version != VERSION0) + throw new RuntimeException("Bad version for serialization"); + + messageId = in.readLong(); + + originalSenderId = new UUID(// + in.readLong(), /* MSB */ + in.readLong() /* LSB */ + ); + + senderId = new UUID(// + in.readLong(), /* MSB */ + in.readLong() /* LSB */ + ); + + token = in.readLong(); + + replicationFactor = in.readInt(); + + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + + writeExternal2(out); + + } + + private void writeExternal2(final DataOutput out) throws IOException { + + out.writeShort(currentVersion); + + out.writeLong(messageId); + + out.writeLong(originalSenderId.getMostSignificantBits()); + out.writeLong(originalSenderId.getLeastSignificantBits()); + + out.writeLong(senderId.getMostSignificantBits()); + out.writeLong(senderId.getLeastSignificantBits()); + + out.writeLong(token); + + out.writeInt(replicationFactor); + + } + + // static final private int MARKER_SIZE = 8; + // + // /** + // * Unique marker generation with JVM wide random number generator. + // * + // * @return A "pretty unique" marker. + // */ + // private byte[] genMarker() { + // + // final byte[] token = new byte[MARKER_SIZE]; + // + // while (!unique1(token)) { + // r.nextBytes(token); + // } + // + // return token; + // } + // + // /** + // * Checks that the first byte is not repeated in the remaining bytes, this + // * simplifies search for the token in the input stream. + // */ + // static private boolean unique1(final byte[] bytes) { + // final byte b = bytes[0]; + // for (int t = 1; t < bytes.length; t++) { + // if (bytes[t] == b) + // return false; + // } + // + // return true; + // } + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessage.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -159,6 +159,7 @@ return compressorKey; } + @Override public String toString() { return getClass().getName() // @@ -347,7 +348,9 @@ * @return */ public static boolean isDataCompressed() { - return compressData; + + return compressData; + } @Override @@ -375,6 +378,7 @@ } + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { @@ -414,6 +418,7 @@ firstOffset = in.readLong(); } + @Override public void writeExternal(final ObjectOutput out) throws IOException { super.writeExternal(out); if (currentVersion >= VERSION1 && uuid != null) { @@ -469,6 +474,7 @@ // return compressor.compress(buffer); // } + @Override public ByteBuffer expand(final ByteBuffer buffer) { final String compressorKey = getCompressorKey(); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HAWriteMessageBase.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -63,15 +63,15 @@ * The Alder32 checksum of the bytes to be transfered. */ public HAWriteMessageBase(final int sze, final int chk) { - + if (sze <= 0) throw new IllegalArgumentException(); this.sze = sze; this.chk = chk; - - } + + } /** * Deserialization constructor. @@ -97,7 +97,8 @@ return chk; } - + + @Override public String toString() { return super.toString() + "{size=" + sze + ",chksum=" + chk + "}"; @@ -131,6 +132,7 @@ private static final transient short currentVersion = VERSION0; + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { @@ -145,6 +147,7 @@ } + @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeShort(currentVersion); @@ -152,7 +155,7 @@ out.writeInt(sze); out.writeInt(chk); - + } } Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHAMessageWrapper.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,59 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.ha.msg; + +import com.bigdata.ha.pipeline.HAReceiveService; + +/** + * Glue interface wraps the {@link IHALogRequest}, {@link IHASendState}, and + * {@link IHAWriteMessage} interfaces exposes the requires {@link IHAMessage} + * interface to the {@link HAReceiveService}. This class is never persisted (it + * does NOT get written into the HALog files). It just let's us handshake with + * the {@link HAReceiveService} and get back out the original + * {@link IHAWriteMessage} as well as the optional {@link IHALogRequest} + * message. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHAMessageWrapper { + + /** + * Return the optional {@link IHASyncRequest}. When available, this provides + * information about the service request that resulted in the transmission + * of the payload along the pipeline. + */ + IHASyncRequest getHASyncRequest(); + + /** + * Return information about the state of the sending service. + */ + IHASendState getHASendState(); + + /** + * Return the message that describes the payload that will be replicated + * along the pipeline. + */ + IHAWriteMessageBase getHAWriteMessage(); + +} Added: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/IHASendState.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -0,0 +1,71 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package com.bigdata.ha.msg; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Interface for the state of the sender of an {@link IHAMessage}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IHASendState extends Serializable { + + /** + * A unique (one-up) message sequence identifier for the messages from the + * sender. This identifier may be used to verify that the bytes available + * from the replication stream are associated with the designed payload. + */ + long getMessageId(); + + /** + * The {@link UUID} of the originating service. This may be used to verify + * that a message was sourced the expected quorum leader. + */ + UUID getOriginalSenderId(); + + /** + * The {@link UUID} of the sending service. This may be used to verify that + * a message was sourced the expected upstream service. + */ + UUID getSenderId(); + + /** + * The current quorum token on the sender. + */ + long getQuorumToken(); + + /** + * The current replication factor on the sender. + */ + int getReplicationFactor(); + + /** + * A byte[] marker that must prefix the message payload, needed to skip + * stale data from failed read tasks. + */ + byte[] getMarker(); + +} Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -51,7 +51,9 @@ import org.apache.log4j.Logger; +import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; +import com.bigdata.ha.msg.HAMessageWrapper; import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -73,7 +75,7 @@ * @author Martyn Cutcher * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class HAReceiveService<M extends IHAWriteMessageBase> extends Thread { +public class HAReceiveService<M extends HAMessageWrapper> extends Thread { private static final Logger log = Logger .getLogger(HAReceiveService.class); @@ -225,6 +227,7 @@ /* * Note: toString() implementation is non-blocking. */ + @Override public String toString() { return super.toString() + "{addrSelf=" + addrSelf + ", addrNext=" @@ -725,7 +728,7 @@ * <p> * report the #of payloads. */ - static private class ReadTask<M extends IHAWriteMessageBase> implements + static private class ReadTask<M extends HAMessageWrapper> implements Callable<Void> { private final ServerSocketChannel server; @@ -983,18 +986,25 @@ private void doReceiveAndReplicate(final Client client) throws Exception { - /** - * The first cause if downstream replication fails. We make a note - * of this first cause, continue to drain the payload, and then - * rethrow the first cause once the payload has been fully drained. - * This is necessary to ensure that the socket channel does not have - * partial data remaining from an undrained payload. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA wire pulling and sure kill testing </a> - */ - Throwable downstreamFirstCause = null; +// /** +// * The first cause if downstream replication fails. We make a note +// * of this first cause, continue to drain the payload, and then +// * rethrow the first cause once the payload has been fully drained. +// * This is necessary to ensure that the socket channel does not have +// * partial data remaining from an undrained payload. +// * +// * @see <a +// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" +// * > HA wire pulling and sure kill testing </a> +// * +// * Note: It appears that attempting to drain the +// * payload is risky since there are a variety of ways in which +// * the process might be terminated. It seems to be safer to +// * drain the socket channel until we reach a marker that gives +// * us confidence that we are at the payload for the message +// * that is being processed. +// */ +// Throwable downstreamFirstCause = null; /* * We should now have parameters ready in the WriteMessage and can @@ -1012,6 +1022,9 @@ // for debug retain number of low level reads int reads = 0; + final DrainToMarkerUtil drainUtil = message.getHASendState() != null ? new DrainToMarkerUtil( + message.getHASendState().getMarker(), client) : null; + while (rem > 0 && !EOS) { // block up to the timeout. @@ -1070,9 +1083,16 @@ while (iter.hasNext()) { + // Check for termination. + client.checkFirstCause(); + iter.next(); iter.remove(); + if (!drainUtil.foundMarker()) { + continue; + } + final int rdlen = client.client.read(localBuffer); if (log.isTraceEnabled()) @@ -1098,17 +1118,17 @@ callback.incReceive(message, reads, rdlen, rem); } - if (downstreamFirstCause == null) { - try { +// if (downstreamFirstCause == null) { +// try { forwardReceivedBytes(client, rdlen); - } catch (ExecutionException ex) { - log.error( - "Downstream replication failure" - + ": will drain payload and then rethrow exception: rootCause=" - + ex, ex); - downstreamFirstCause = ex; - } - } +// } catch (ExecutionException ex) { +// log.error( +// "Downstream replication failure" +// + ": will drain payload and then rethrow exception: rootCause=" +// + ex, ex); +// downstreamFirstCause = ex; +// } +// } } // while(itr.hasNext()) @@ -1134,28 +1154,31 @@ + ", actual=" + (int) chk.getValue()); } - if (downstreamFirstCause != null) { - - /** - * Replication to the downstream service failed. The payload has - * been fully drained. This ensures that we do not leave part of - * the payload in the upstream socket channel. We now wrap and - * rethrow the root cause of the downstream failure. The leader - * will handle this by forcing the remove of the downstream - * service and then re-replicating the payload. - * - * @see <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" - * > HA wire pulling and sure kill testing </a> - */ +// if (downstreamFirstCause != null) { +// +// /** +// * Replication to the downstream service failed. The payload has +// * been fully drained. This ensures that we do not leave part of +// * the payload in the upstream socket channel. We now wrap and +// * rethrow the root cause of the downstream failure. The leader +// * will handle this by forcing the remove of the downstream +// * service and then re-replicating the payload. +// * +// * @see <a +// * href="https://sourceforge.net/apps/trac/bigdata/ticket/724" +// * > HA wire pulling and sure kill testing </a> +// */ +// +// throw new RuntimeException( +// "Downstream replication failure: msg=" + message +// + ", ex=" + downstreamFirstCause, +// downstreamFirstCause); +// +// } + + // Check for termination. + client.checkFirstCause(); - throw new RuntimeException( - "Downstream replication failure: msg=" + message - + ", ex=" + downstreamFirstCause, - downstreamFirstCause); - - } - if (callback != null) { /* @@ -1173,7 +1196,7 @@ } // call() /** - * Now forward the most recent transfer bytes downstream. + * Forward the most recent transfer bytes downstream. * <p> * * Note: [addrNext] is final. If the downstream address is changed, then @@ -1229,107 +1252,123 @@ } // Check for termination. client.checkFirstCause(); - // Send and await Future. - sendService.send(out).get(); + /* + * Send and await Future. If this is the first chunk of a + * payload and a marker exists, then send the marker as + * well. + */ + sendService + .send(out, + out.position() == 0 + && message.getHASendState() != null ? message + .getHASendState().getMarker() + : null).get(); } break; // break out of the inner while loop. } // while(true) } - -// private void ack(final Client client) throws IOException { -// -// if (log.isTraceEnabled()) -// log.trace("Will ACK"); -// -// ack(client.client, HASendService.ACK); -// -// if (log.isTraceEnabled()) -// log.trace("Did ACK"); -// -// } -// -// private void nack(final Client client) throws IOException { -// -// if (log.isTraceEnabled()) -// log.trace("Will NACK"); -// -// ack(client.client, HASendService.NACK); -// -// if (log.isTraceEnabled()) -// log.trace("Did NACK"); -// -// } -// -// /** -// * ACK/NACK the payload. -// * -// * @param client -// * @throws IOException -// */ -// private void ack(final SocketChannel client, final byte ret) -// throws IOException { -// -// // FIXME optimize. -// final ByteBuffer b = ByteBuffer.wrap(new byte[] { ret /* ACK */}); -// -// // The #of bytes to transfer. -// final int remaining = b.remaining(); -// -//// if (log.isTraceEnabled()) -//// log.trace("Will send " + remaining + " bytes"); -// -//// try { -// -// int nwritten = 0; -// -// while (nwritten < remaining) { -// -// /* -// * Write the data. Depending on the channel, will either -// * block or write as many bytes as can be written -// * immediately (this latter is true for socket channels in a -// * non-blocking mode). IF it blocks, should block until -// * finished or until this thread is interrupted, e.g., by -// * shutting down the thread pool on which it is running. -// * -// * Note: If the SocketChannel is closed by an interrupt, -// * then the send request for the [data] payload will fail. -// * However, the SocketChannel will be automatically reopened -// * for the next request (unless the HASendService has been -// * terminated). -// */ -// -// final int nbytes = client.write(b); -// -// nwritten += nbytes; -// -//// if (log.isTraceEnabled()) -//// log.trace("Sent " + nbytes + " bytes with " + nwritten -//// + " of out " + remaining + " written so far"); -// -// } -// return; -// -//// while (client.isOpen()) { -//// -//// if (client.write(b) > 0) { -//// -//// // Sent (N)ACK byte. -//// return; -//// -//// } -//// -//// } -// -//// // channel is closed. -//// throw new AsynchronousCloseException(); -// -// } - + } // class ReadTask /** + * Helper class to drain bytes from the upstream socket until we encounter a + * marker in the stream that immediately proceeds the desired payload. + * + * @author <a href="mailto:mar...@us...">Martyn + * Cutcher</a> + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/724" > HA + * wire pulling and sure kill testing </a> + */ + static private class DrainToMarkerUtil { + + final private byte[] marker; + final private byte[] markerBuffer; + final private ByteBuffer markerBB; + final private Client client; + + private int markerIndex = 0; + private int nmarkerreads = 0; + private int nmarkerbytematches = 0; + + DrainToMarkerUtil(final byte[] marker, final Client client) { + this.marker = marker; + this.markerBuffer = marker == null ? null : new byte[marker.length]; + this.markerBB = marker == null ? null : ByteBuffer + .wrap(markerBuffer); + this.client = client; + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(marker)); + + } + + /** + * Note that the logic for finding the token bytes depends on the first + * byte in the token being unique! + * <p> + * We have to be a bit clever to be sure we do not read beyond the token + * and therefore complicate the reading into the localBuffer. + * <p> + * This is optimized for the normal case where the marker is read as + * from the next bytes from the stream. In the worst case scenario this + * could read large amounts of data only a few bytes at a time, however + * this is not in reality a significant overhead. + */ + boolean foundMarker() throws IOException { + + if (log.isDebugEnabled()) + log.debug("Looking for token, " + BytesUtil.toHexString(marker) + + ", reads: " + nmarkerreads); + + while (markerIndex < marker.length) { + + final int remtok = marker.length - markerIndex; + markerBB.limit(remtok); + markerBB.position(0); + + final int rdLen = client.client.read(markerBB); + for (int i = 0; i < rdLen; i++) { + if (markerBuffer[i] != marker[markerIndex]) { + if (nmarkerreads < 2) + log.warn("TOKEN MISMATCH"); + markerIndex = 0; + if (markerBuffer[i] == marker[markerIndex]) { + markerIndex++; + } + } else { + markerIndex++; + nmarkerbytematches++; + } + } + + nmarkerreads++; + if (nmarkerreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + nmarkerreads); + } + + } + + if (markerIndex != marker.length) { // not sufficient data ready + if (log.isDebugEnabled()) + log.debug("Not found token yet!"); + return false; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + nmarkerreads + + " token reads and " + nmarkerbytematches + + " byte matches"); + + return true; + } + + } + + } + + /** * Receive data into the caller's buffer as described by the caller's * message. * @@ -1408,21 +1447,6 @@ public interface IHAReceiveCallback<M extends IHAWriteMessageBase> { /** - * Hook invoked once a buffer has been received. - * - * @param msg - * The message. - * @param data - * The buffer containing the data. The position() will be - * ZERO (0). The limit() will be the #of bytes available. The - * implementation MAY have side effects on the buffer state - * (position, limit, etc). - * - * @throws Exception - */ - void callback(M msg, ByteBuffer data) throws Exception; - - /** * Notify that some payload bytes have been incrementally received for * an {@link IHAMessage}. This is invoked each time some data has been * read from the upstream socket. @@ -1441,6 +1465,22 @@ * @throws Exception */ void incReceive(M msg, int nreads, int rdlen, int rem) throws Exception; + + /** + * Hook invoked once a buffer has been received. + * + * @param msg + * The message. + * @param data + * The buffer containing the data. The position() will be + * ZERO (0). The limit() will be the #of bytes available. The + * implementation MAY have side effects on the buffer state + * (position, limit, etc). + * + * @throws Exception + */ + void callback(M msg, ByteBuffer data) throws Exception; + } /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -287,6 +287,10 @@ * * @param buffer * The buffer. + * @param marker + * A marker that will be used to prefix the payload for the + * message in the write replication socket stream. The marker is + * used to ensure synchronization when reading on the stream. * * @return The {@link Future} which can be used to await the outcome of this * operation. @@ -301,8 +305,8 @@ * @todo throws IOException if the {@link SocketChannel} was not open and * could not be opened. */ - public Future<Void> send(final ByteBuffer buffer) { - + public Future<Void> send(final ByteBuffer buffer, final byte[] marker) { + if (buffer == null) throw new IllegalArgumentException(); @@ -320,10 +324,9 @@ // reopenChannel(); - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer(), marker)); } - /** * A series of timeouts used when we need to re-open the * {@link SocketChannel}. @@ -422,13 +425,17 @@ * * @param buffer * The buffer whose data are to be sent. + * @param marker + * A marker that will be used to prefix the payload for the + * message in the write replication socket stream. The marker is + * used to ensure synchronization when reading on the stream. * * @return The task which will send the data to the configured * {@link InetSocketAddress}. */ - protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { + protected Callable<Void> newIncSendTask(final ByteBuffer buffer, final byte[] marker) { - return new IncSendTask(buffer); + return new IncSendTask(buffer, marker); } @@ -485,25 +492,21 @@ */ protected /*static*/ class IncSendTask implements Callable<Void> { -// private final SocketChannel socketChannel; - private final ByteBuffer data; + private final ByteBuffer data; + private final byte[] marker; - public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { + public IncSendTask(final ByteBuffer data, final byte[] marker) { -// if (socketChannel == null) -// throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException(); - if (data == null) - throw new IllegalArgumentException(); + this.data = data; + this.marker = marker; + } -// this.socketChannel = socketChannel; - - this.data = data; + @Override + public Void call() throws Exception { - } - - public Void call() throws Exception { - // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); @@ -521,10 +524,22 @@ try { - int nwritten = 0; + int nmarker = 0; // #of marker bytes written. + int nwritten = 0; // #of payload bytes written. + + final ByteBuffer markerBB = marker != null ? ByteBuffer + .wrap(marker) : null; while (nwritten < remaining) { + + if (marker != null && nmarker < marker.length) { + + nmarker += socketChannel.write(markerBB); + continue; + + } + /* * Write the data. Depending on the channel, will either * block or write as many bytes as can be written Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-11 16:31:29 UTC (rev 7635) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-12-12 23:38:47 UTC (rev 7636) @@ -132,6 +132,7 @@ import com.bigdata.ha.msg.IHARemoteRebuildRequest; import com.bigdata.ha.msg.IHARootBlockRequest; import com.bigdata.ha.msg.IHARootBlockResponse; +import com.bigdata.ha.msg.IHASendState; import com.bigdata.ha.msg.IHASendStoreResponse; import com.bigdata.ha.msg.IHASnapshotDigestRequest; import ... [truncated message content] |