From: <tho...@us...> - 2013-04-10 17:35:28
|
Revision: 7040 http://bigdata.svn.sourceforge.net/bigdata/?rev=7040&view=rev Author: thompsonbry Date: 2013-04-10 17:35:17 +0000 (Wed, 10 Apr 2013) Log Message: ----------- We fixed the problem with the transition to RunMet causing the asynchronous close of the SocketChannel for the write pipeline by NOT propagating the interrupt to the SendIncTask's Future in HAJournal. This was documented for both sendHALog() and sendHAStore(). It now successfully transitions to RunMet during a long load before the commit point. 100% Green on the CI HA test suite. Modified Paths: -------------- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java branches/READ_CACHE/bigdata/src/test/com/bigdata/io/TestCase3.java branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3SnapshotPolicy.java branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties branches/READ_CACHE/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties Added Paths: ----------- branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceiveInterrupts.java Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -906,9 +907,9 @@ } catch (Throwable t) { - if (InnerCause.isInnerCause(t, InterruptedException.class)) { + if (InnerCause.isInnerCause(t, InterruptedException.class)||InnerCause.isInnerCause(t, CancellationException.class)) { - throw (InterruptedException) t; + throw new RuntimeException(t); } @@ -1020,9 +1021,9 @@ } catch (Throwable t) { - if (InnerCause.isInnerCause(t, InterruptedException.class)) { + if (InnerCause.isInnerCause(t, InterruptedException.class)||InnerCause.isInnerCause(t, CancellationException.class)) { - throw (InterruptedException) t; + throw new RuntimeException( t ); } Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -28,6 +28,7 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -512,7 +513,7 @@ message = null; // [waitFuture] is available for receiveData(). - futureReady.signal(); + futureReady.signalAll(); } finally { @@ -828,70 +829,101 @@ } public Void call() throws Exception { - -// awaitAccept(); -// -// /* -// * Get the client connection and open the channel in a non-blocking -// * mode so we will read whatever is available and loop until all -// * data has been read. -// */ -// final SocketChannel client = server.accept(); -// client.configureBlocking(false); -// -// final Selector clientSelector = Selector.open(); + +// awaitAccept(); +// +// /* +// * Get the client connection and open the channel in a non-blocking +// * mode so we will read whatever is available and loop until all +// * data has been read. +// */ +// final SocketChannel client = server.accept(); +// client.configureBlocking(false); +// +// final Selector clientSelector = Selector.open(); // -// // must register OP_READ selector on the new client -// final SelectionKey clientKey = client.register(clientSelector, -// SelectionKey.OP_READ); +// // must register OP_READ selector on the new client +// final SelectionKey clientKey = client.register(clientSelector, +// SelectionKey.OP_READ); - Client client = clientRef.get(); + Client client = clientRef.get(); -// if (client != null) { +// if (client != null) { // -// /* -// * Note: We need to know when the client connection is no longer -// * valid. The code here does not appear to do the trick. -// * changeUpStream() is handling this instead. -// * -// * We need to decide whether the client is no longer valid -// * (either because the upstream HASendService has changed (our -// * predecessor in the pipeline might have died) or because it -// * has closed is socket connection to this HAReceiveService). -// * -// * Either way, we need to establish a client connection using -// * awaitAccept(). -// */ -// if (!client.client.isConnected()) { -// log.warn("Closing old client connection."); -// clientRef.set(client = null); -// } +// /* +// * Note: We need to know when the client connection is no longer +// * valid. The code here does not appear to do the trick. +// * changeUpStream() is handling this instead. +// * +// * We need to decide whether the client is no longer valid +// * (either because the upstream HASendService has changed (our +// * predecessor in the pipeline might have died) or because it +// * has closed is socket connection to this HAReceiveService). +// * +// * Either way, we need to establish a client connection using +// * awaitAccept(). +// */ +// if (!client.client.isConnected()) { +// log.warn("Closing old client connection."); +// clientRef.set(client = null); +// } // -// } - - if (client == null) { +// } + + if (client == null || !client.client.isOpen()) { - /* - * Accept and the initialize a connection from the upstream - * HASendService. - */ - - // Accept a client connection (blocks) - awaitAccept(); + final Client tmp = clientRef.getAndSet(null); + if (tmp != null) { + // Close existing connection if not open. + tmp.close(); + } - // New client connection. - client = new Client(server);//, sendService, addrNext); + /* + * Accept and the initialize a connection from the upstream + * HASendService. + */ + + // Accept a client connection (blocks) + awaitAccept(); - // save off reference. - clientRef.set(client); + // New client connection. + client = new Client(server);//, sendService, addrNext); + // save off reference. + clientRef.set(client); + } +// boolean success = false; +// try { + doReceiveAndReplicate(client); +// success = true; + // success. + return null; +// } finally { +// try { +// if(success) { +// ack(client); +// } else { +// nack(client); +// } +// } catch (IOException ex) { +// // log and ignore. +// log.error(ex, ex); +// } +// } + + } // call. + + private void doReceiveAndReplicate(final Client client) + throws Exception { + /* * We should now have parameters ready in the WriteMessage and can * begin transferring data from the stream to the writeCache. */ final long begin = System.currentTimeMillis(); + long mark = begin; // #of bytes remaining (to be received). int rem = message.getSize(); @@ -909,15 +941,44 @@ if (nkeys == 0) { - // Nothing available. - final long elapsed = System.currentTimeMillis() - begin; + /* + * Nothing available. + */ + // time since last mark. + final long now = System.currentTimeMillis(); + final long elapsed = now - mark; + if (elapsed > 10000) { // Issue warning if we have been blocked for a while. log.warn("Blocked: awaiting " + rem + " out of " + message.getSize() + " bytes."); + mark = now;// reset mark. } + + if (!client.client.isOpen() + || !client.clientSelector.isOpen()) { + + /* + * The channel has been closed. The request must be + * failed. TODO Or set EOF:=true? + * + * Note: The [callback] is NOT notified. The service + * that issued the RMI request to this service to + * receive the payload over the HAReceivedService will + * see this exception thrown back across the RMI + * request. + * + * @see HAReceiveService.receiveData(). + */ + + throw new AsynchronousCloseException(); + + } + // no keys. nothing to read. + continue; + } final Set<SelectionKey> keys = client.clientSelector @@ -938,7 +999,7 @@ + " bytes remaining."); if (rdlen > 0) { - reads++; + reads++; updateChk(rdlen); } @@ -1015,7 +1076,7 @@ + ", number of reads: " + reads + ", buffer: " + localBuffer); - if (message.getChk() != (int) chk.getValue()) { + if (message.getChk() != (int) chk.getValue()) { throw new ChecksumError("msg=" + message.toString() + ", actual=" + chk.getValue()); } @@ -1024,10 +1085,97 @@ callback.callback(message, localBuffer); } - // success. - return null; + } // call() - } // call() +// private void ack(final Client client) throws IOException { +// +// if (log.isTraceEnabled()) +// log.trace("Will ACK"); +// +// ack(client.client, HASendService.ACK); +// +// if (log.isTraceEnabled()) +// log.trace("Did ACK"); +// +// } +// +// private void nack(final Client client) throws IOException { +// +// if (log.isTraceEnabled()) +// log.trace("Will NACK"); +// +// ack(client.client, HASendService.NACK); +// +// if (log.isTraceEnabled()) +// log.trace("Did NACK"); +// +// } +// +// /** +// * ACK/NACK the payload. +// * +// * @param client +// * @throws IOException +// */ +// private void ack(final SocketChannel client, final byte ret) +// throws IOException { +// +// // FIXME optimize. +// final ByteBuffer b = ByteBuffer.wrap(new byte[] { ret /* ACK */}); +// +// // The #of bytes to transfer. +// final int remaining = b.remaining(); +// +//// if (log.isTraceEnabled()) +//// log.trace("Will send " + remaining + " bytes"); +// +//// try { +// +// int nwritten = 0; +// +// while (nwritten < remaining) { +// +// /* +// * Write the data. Depending on the channel, will either +// * block or write as many bytes as can be written +// * immediately (this latter is true for socket channels in a +// * non-blocking mode). IF it blocks, should block until +// * finished or until this thread is interrupted, e.g., by +// * shutting down the thread pool on which it is running. +// * +// * Note: If the SocketChannel is closed by an interrupt, +// * then the send request for the [data] payload will fail. +// * However, the SocketChannel will be automatically reopened +// * for the next request (unless the HASendService has been +// * terminated). +// */ +// +// final int nbytes = client.write(b); +// +// nwritten += nbytes; +// +//// if (log.isTraceEnabled()) +//// log.trace("Sent " + nbytes + " bytes with " + nwritten +//// + " of out " + remaining + " written so far"); +// +// } +// return; +// +//// while (client.isOpen()) { +//// +//// if (client.write(b) > 0) { +//// +//// // Sent (N)ACK byte. +//// return; +//// +//// } +//// +//// } +// +//// // channel is closed. +//// throw new AsynchronousCloseException(); +// +// } } // class ReadTask @@ -1069,7 +1217,7 @@ localBuffer = buffer;// DO NOT duplicate()! (side-effects required) localBuffer.limit(message.getSize()); localBuffer.position(0); - messageReady.signal(); + messageReady.signalAll(); if (log.isTraceEnabled()) log.trace("Will accept data for message: msg=" + msg); Modified: branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -82,6 +82,9 @@ private static final Logger log = Logger.getLogger(HASendService.class); +// static final byte ACK = 1; +// static final byte NACK = 0; + /** * The Internet socket address of the receiving service. */ @@ -295,20 +298,43 @@ if (log.isTraceEnabled()) log.trace("Will send " + buffer.remaining() + " bytes"); - /* +// reopenChannel(); + + return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + + } + + /** + * A series of timeouts used when we need to re-open the + * {@link SocketChannel}. + */ + private final static long[] retryMillis = new long[] { 1, 5, 10, 50, 100 }; + + /** + * (Re-)open the {@link SocketChannel} if it is closed and this service is + * still running. + * + * @return The {@link SocketChannel}. + */ + private SocketChannel reopenChannel() { + + /* * Synchronize on the socketChannel object to serialize attempts to open * the SocketChannel. */ synchronized (socketChannel) { - SocketChannel sc = socketChannel.get(); + int tryno = 0; + + SocketChannel sc = null; - if (sc == null) { + while ((((sc = socketChannel.get()) == null) || !sc.isOpen()) + && isRunning()) { try { /* - * Open the SocketChannel. + * (Re-)open the SocketChannel. * * TODO we may have to retry or play with the timeout for * the socket connect request since the downstream node may @@ -333,24 +359,38 @@ * since B does not setup its receive service until after A * has seen the pipelineChange() event. */ - + socketChannel.set(sc = openChannel(addrNext.get())); - + } catch (IOException e) { + if (tryno < retryMillis.length) { + + try { + // sleep and retry. + Thread.sleep(retryMillis[tryno]); + tryno++; + continue; + } catch (InterruptedException e1) { + // rethrow original exception. + throw new RuntimeException(e); + } + + } + // do not wrap. throw new RuntimeException(e); - - } - } - - } + } // catch - return tmp.submit(newIncSendTask(buffer.asReadOnlyBuffer())); + } // while - } - + return socketChannel.get(); + + } // synchronized(socketChannel) + + } // reopenChannel() + /** * Factory for the {@link SendTask}. * @@ -362,7 +402,7 @@ */ protected Callable<Void> newIncSendTask(final ByteBuffer buffer) { - return new IncSendTask(socketChannel.get(), buffer); + return new IncSendTask(buffer); } @@ -376,7 +416,7 @@ * * @throws IOException */ - static protected SocketChannel openChannel(final InetSocketAddress addr) + static private SocketChannel openChannel(final InetSocketAddress addr) throws IOException { final SocketChannel socketChannel = SocketChannel.open(); @@ -410,20 +450,20 @@ * {@link ByteBuffer} to the receiving service on a specified * {@link InetSocketAddress}. */ - protected static class IncSendTask implements Callable<Void> { + protected /*static*/ class IncSendTask implements Callable<Void> { - private final SocketChannel socketChannel; +// private final SocketChannel socketChannel; private final ByteBuffer data; - public IncSendTask(final SocketChannel socketChannel, final ByteBuffer data) { + public IncSendTask(/*final SocketChannel socketChannel, */final ByteBuffer data) { - if (socketChannel == null) - throw new IllegalArgumentException(); +// if (socketChannel == null) +// throw new IllegalArgumentException(); if (data == null) throw new IllegalArgumentException(); - this.socketChannel = socketChannel; +// this.socketChannel = socketChannel; this.data = data; @@ -431,9 +471,21 @@ public Void call() throws Exception { + // defer until we actually run. + final SocketChannel socketChannel = reopenChannel(); + + if (!isRunning()) + throw new RuntimeException("Not Running."); + + if (socketChannel == null) + throw new AssertionError(); + // The #of bytes to transfer. final int remaining = data.remaining(); + if (log.isTraceEnabled()) + log.trace("Will send " + remaining + " bytes"); + try { int nwritten = 0; @@ -447,6 +499,12 @@ * non-blocking mode). IF it blocks, should block until * finished or until this thread is interrupted, e.g., by * shutting down the thread pool on which it is running. + * + * Note: If the SocketChannel is closed by an interrupt, + * then the send request for the [data] payload will fail. + * However, the SocketChannel will be automatically reopened + * for the next request (unless the HASendService has been + * terminated). */ final int nbytes = socketChannel.write(data); @@ -458,6 +516,14 @@ + " of out " + remaining + " written so far"); } + + /* + * The ACK by the receiver divides the HASend requests into + * distinct operations. Without this handshaking, the next + * available payload would be on the way as soon as the last + * byte of the current payload was written. + */ +// awaitAck(socketChannel); } finally { @@ -478,6 +544,145 @@ } +// /** +// * +// * @param socketChannel +// * @throws IOException +// */ +// private void awaitAck(final SocketChannel socketChannel) +// throws IOException { +// +// log.debug("Awaiting (N)ACK"); +// +// // FIXME Optimize. +// final ByteBuffer b = ByteBuffer.wrap(new byte[] { -1 }); +// +// while (socketChannel.isOpen()) { +// +// final int nread = socketChannel.read(b); +// +// if (nread == 1) { +// +// final byte ret = b.array()[0]; +// +// if (ret == ACK) { +// +// // Received ACK. +// log.debug("ACK"); +// return; +// +// } +// +// log.error("NACK"); +// return; +// +// } +// +// throw new IOException("Expecting ACK, not " + nread + " bytes"); +// +// } +// +// // channel is closed. +// throw new AsynchronousCloseException(); +// +//// /* +//// * We should now have parameters ready in the WriteMessage and can +//// * begin transferring data from the stream to the writeCache. +//// */ +//// final long begin = System.currentTimeMillis(); +//// long mark = begin; +//// +//// // #of bytes remaining (to be received). +//// int rem = b.remaining(); +//// +//// // End of stream flag. +//// boolean EOS = false; +//// +//// // for debug retain number of low level reads +//// int reads = 0; +//// +//// while (rem > 0 && !EOS) { +//// +//// // block up to the timeout. +//// final int nkeys = client.clientSelector.select(10000/* ms */); +//// +//// if (nkeys == 0) { +//// +//// /* +//// * Nothing available. +//// */ +//// +//// // time since last mark. +//// final long now = System.currentTimeMillis(); +//// final long elapsed = now - mark; +//// +//// if (elapsed > 10000) { +//// // Issue warning if we have been blocked for a while. +//// log.warn("Blocked: awaiting " + rem + " out of " +//// + message.getSize() + " bytes."); +//// mark = now;// reset mark. +//// } +//// +//// if (!client.client.isOpen() +//// || !client.clientSelector.isOpen()) { +//// +//// /* +//// * The channel has been closed. The request must be +//// * failed. TODO Or set EOF:=true? +//// * +//// * Note: The [callback] is NOT notified. The service +//// * that issued the RMI request to this service to +//// * receive the payload over the HAReceivedService will +//// * see this exception thrown back across the RMI +//// * request. +//// * +//// * @see HAReceiveService.receiveData(). +//// */ +//// +//// throw new AsynchronousCloseException(); +//// +//// } +//// +//// // no keys. nothing to read. +//// continue; +//// +//// } +//// +//// final Set<SelectionKey> keys = client.clientSelector +//// .selectedKeys(); +//// +//// final Iterator<SelectionKey> iter = keys.iterator(); +//// +//// while (iter.hasNext()) { +//// +//// iter.next(); +//// iter.remove(); +//// +//// final int rdlen = client.client.read(b); +//// +//// if (log.isTraceEnabled()) +//// log.trace("Read " + rdlen + " bytes of " +//// + (rdlen > 0 ? rem - rdlen : rem) +//// + " bytes remaining."); +//// +//// if (rdlen > 0) { +//// reads++; +//// } +//// +//// if (rdlen == -1) { +//// // The stream is closed? +//// EOS = true; +//// break; +//// } +//// +//// rem -= rdlen; +//// +//// } +//// +//// } // while( rem > 0 ) +// +// } + } } Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestAll.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -64,6 +64,9 @@ // Test of HASendService and HAReceiveService (2 nodes). suite.addTestSuite(TestHASendAndReceive.class); + // Test of HASendService and HAReceiveService w/ interrupts (2 nodes). + suite.addTestSuite(TestHASendAndReceiveInterrupts.class); + // Test of HASendService and HAReceiveService (3 nodes). suite.addTestSuite(TestHASendAndReceive3Nodes.class); Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -24,10 +24,7 @@ package com.bigdata.ha.pipeline; -import java.io.IOException; -import java.net.BindException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -37,8 +34,6 @@ import com.bigdata.ha.msg.HAWriteMessageBase; import com.bigdata.ha.msg.IHAWriteMessageBase; -import com.bigdata.ha.pipeline.HAReceiveService; -import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; import com.bigdata.io.TestCase3; @@ -52,64 +47,6 @@ */ public class TestHASendAndReceive extends TestCase3 { - /** - * A random number generated - the seed is NOT fixed. - */ - private final Random r = new Random(); - -// /** -// * Returns random data that will fit in N bytes. N is chosen randomly in -// * 1:256. -// * -// * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> of -// * random length and having random contents. -// */ -// public ByteBuffer getRandomData() { -// -// final int nbytes = r.nextInt(256) + 1; -// -// return getRandomData(nbytes); -// -// } - - /** - * Returns random data that will fit in <i>nbytes</i>. - * - * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> - * having random contents. - */ - private ByteBuffer getRandomData(final int nbytes) { - - final byte[] bytes = new byte[nbytes]; - - r.nextBytes(bytes); - - return ByteBuffer.wrap(bytes); - - } - - /** - * Returns random data that will fit in <i>nbytes</i>. - * - * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> - * having random contents. - */ - private ByteBuffer getRandomData(final ByteBuffer b, final int nbytes) { - - final byte[] a = new byte[nbytes]; - - r.nextBytes(a); - - b.limit(nbytes); - b.position(0); - b.put(a); - - b.flip(); - - return b; - - } - public TestHASendAndReceive() { } @@ -124,8 +61,11 @@ private HAReceiveService<IHAWriteMessageBase> receiveService; private ChecksumUtility chk; + @Override protected void setUp() throws Exception { + super.setUp(); + r = new Random(); chk = new ChecksumUtility(); /* @@ -149,8 +89,11 @@ } + @Override protected void tearDown() throws Exception { + super.tearDown(); + if (receiveService != null) { receiveService.terminate(); receiveService = null; @@ -167,35 +110,6 @@ } /** - * Return an open port on current machine. Try the suggested port first. If - * suggestedPort is zero, just select a random port - */ - private static int getPort(int suggestedPort) throws IOException { - - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - - final int port = openSocket.getLocalPort(); - - openSocket.close(); - - if (suggestedPort != 0 && port != suggestedPort) { - - log.warn("suggestedPort is busy: suggestedPort=" + suggestedPort - + ", using port=" + port + " instead"); - - } - - return port; - - } - - /** * Should we expect concurrency of the Socket send and RMI? It seems that we * should be able to handle it whatever the logical argument. The only * constraint should be on the processing of each pair of socket/RMI @@ -217,16 +131,6 @@ final ByteBuffer rcv = ByteBuffer.allocate(2000); final Future<Void> futRec = receiveService.receiveData(msg1, rcv); final Future<Void> futSnd = sendService.send(tst1); -// while (!futSnd.isDone() && !futRec.isDone()) { -// try { -// futSnd.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// try { -// futRec.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// } futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst1, rcv); @@ -238,16 +142,6 @@ final ByteBuffer rcv2 = ByteBuffer.allocate(2000); final Future<Void> futSnd = sendService.send(tst2); final Future<Void> futRec = receiveService.receiveData(msg2, rcv2); -// while (!futSnd.isDone() && !futRec.isDone()) { -// try { -// futSnd.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// try { -// futRec.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// } futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst2, rcv2); @@ -275,16 +169,6 @@ // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); final Future<Void> futSnd = sendService.send(tst); -// while (!futSnd.isDone() && !futRec.isDone()) { -// try { -// futSnd.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignored) { -// } -// try { -// futRec.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignored) { -// } -// } futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); assertEquals(tst, rcv); // make sure buffer has been transmitted @@ -316,16 +200,6 @@ // FutureTask return ensures remote ready for Socket data final Future<Void> futRec = receiveService.receiveData(msg, rcv); final Future<Void> futSnd = sendService.send(tst); -// while (!futSnd.isDone() && !futRec.isDone()) { -// try { -// futSnd.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignored) { -// } -// try { -// futRec.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignored) { -// } -// } futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec.get(timeout,TimeUnit.MILLISECONDS); // make sure buffer has been transmitted Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceive3Nodes.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -25,11 +25,8 @@ package com.bigdata.ha.pipeline; import java.io.IOException; -import java.net.BindException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.nio.ByteBuffer; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -37,8 +34,6 @@ import com.bigdata.ha.msg.HAWriteMessageBase; import com.bigdata.ha.msg.IHAWriteMessageBase; -import com.bigdata.ha.pipeline.HAReceiveService; -import com.bigdata.ha.pipeline.HASendService; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.IBufferAccess; import com.bigdata.io.TestCase3; @@ -54,66 +49,8 @@ */ public class TestHASendAndReceive3Nodes extends TestCase3 { - private ChecksumUtility chk = new ChecksumUtility(); + private ChecksumUtility chk; - /** - * A random number generated - the seed is NOT fixed. - */ - private Random r = new Random(); - -// /** -// * Returns random data that will fit in N bytes. N is chosen randomly in -// * 1:256. -// * -// * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> of -// * random length and having random contents. -// */ -// public ByteBuffer getRandomData() { -// -// final int nbytes = r.nextInt(256) + 1; -// -// return getRandomData(nbytes); -// -// } - - /** - * Returns random data that will fit in <i>nbytes</i>. - * - * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> - * having random contents. - */ - private ByteBuffer getRandomData(final int nbytes) { - - final byte[] bytes = new byte[nbytes]; - - r.nextBytes(bytes); - - return ByteBuffer.wrap(bytes); - - } - - /** - * Returns random data that will fit in <i>nbytes</i>. - * - * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> - * having random contents. - */ - private ByteBuffer getRandomData(final ByteBuffer b, final int nbytes) { - - final byte[] a = new byte[nbytes]; - - r.nextBytes(a); - - b.limit(nbytes); - b.position(0); - b.put(a); - - b.flip(); - - return b; - - } - public TestHASendAndReceive3Nodes() { } @@ -141,6 +78,10 @@ @Override protected void setUp() throws Exception { + super.setUp(); + + chk = new ChecksumUtility(); + /* * Setup C at the end of the pipeline. */ @@ -201,6 +142,8 @@ @Override protected void tearDown() throws Exception { + super.tearDown(); + if (receiveServiceB != null) { receiveServiceB.terminate(); receiveServiceB = null; @@ -219,38 +162,8 @@ chk = null; - r = null; - } - /** - * Return an open port on current machine. Try the suggested port first. If - * suggestedPort is zero, just select a random port - */ - private static int getPort(int suggestedPort) throws IOException { - - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - - final int port = openSocket.getLocalPort(); - - openSocket.close(); - - if (suggestedPort != 0 && port != suggestedPort) { - - log.warn("suggestedPort is busy: suggestedPort=" + suggestedPort + ", using port=" + port + " instead"); - - } - - return port; - - } - public void testSimpleExchange() throws InterruptedException, ExecutionException, TimeoutException { @@ -263,20 +176,6 @@ final Future<Void> futRec1 = receiveServiceB.receiveData(msg1, rcv1); final Future<Void> futRec2 = receiveServiceC.receiveData(msg1, rcv2); final Future<Void> futSnd = sendServiceA.send(tst1); -// while (!futSnd.isDone() && !futRec2.isDone()) { -// try { -// futSnd.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// try { -// futRec1.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// try { -// futRec2.get(10L, TimeUnit.MILLISECONDS); -// } catch (TimeoutException ignore) { -// } -// } futSnd.get(timeout,TimeUnit.MILLISECONDS); futRec1.get(timeout,TimeUnit.MILLISECONDS); futRec2.get(timeout,TimeUnit.MILLISECONDS); @@ -300,7 +199,9 @@ futSnd.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } catch (ExecutionException e) { - assertTrue(e.getCause().getMessage().equals("Checksum Error")); + if (!InnerCause.isInnerCause(e, ChecksumError.class)) { + fail("Expecting " + ChecksumError.class + ", not " + e, e); + } } try { futRec2.get(10L, TimeUnit.MILLISECONDS); Added: branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceiveInterrupts.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceiveInterrupts.java (rev 0) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/ha/pipeline/TestHASendAndReceiveInterrupts.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -0,0 +1,270 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.ha.pipeline; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import com.bigdata.ha.msg.HAWriteMessageBase; +import com.bigdata.ha.msg.IHAWriteMessage; +import com.bigdata.ha.msg.IHAWriteMessageBase; +import com.bigdata.ha.pipeline.HAReceiveService.IHAReceiveCallback; +import com.bigdata.io.DirectBufferPool; +import com.bigdata.io.TestCase3; +import com.bigdata.util.ChecksumUtility; + +/** + * Test the raw socket protocol implemented by {@link HASendService} and + * {@link HAReceiveService}. This test suite looks at the interrupt handling + * behavior. Interrupts during an IO cause the {@link SocketChannel} to be + * closed. If the service is still open, then we need to transparently re-open + * the socket channel. + * <p> + * Interrupts wind up closing the {@link HASendService} side of the channel when + * a remote service makes a request to the quorum leader to send some data along + * the write pipeline, and then cancels that operation. For example, when a + * service needs to resynchronize with the quorum it will request either HALogs + * or a snapshot from the leader. Those data are sent over the write pipeline. + * There may be concurrent requests and data transfers in progress. Those + * requests will be serialized (one raw buffer of at a time). The + * {@link IHAWriteMessage}s for different requests (and their payloads) can be + * interleaved, but each {@link IHAWriteMessage} and payload is transmitted + * atomically (the packets sent along the socket pertain to only a single + * payload / {@link IHAWriteMessage}). When the service has caught up, if it + * enters an error state, or if it shuts down, then the service will cancel the + * remote future on the leader for the request (e.g., a "sendHALog()" request). + * This can cause an interrupt of a blocked {@link SocketChannel} IO and that + * will cause the {@link SocketChannel} to be closed asynchronously. + * <p> + * Both the {@link HASendService} and the {@link HAReceiveService} need to + * notice the asynchronous {@link SocketChannel} close and cure it (unless they + * are terminated or terminating). The {@link HAReceiveService} needs to + * propagate the {@link AsynchronousCloseException} through the + * {@link IHAReceiveCallback} and drop the paylaod. The {@link HASendService} + * needs to drop the payload and re-open the {@link SocketChannel}. The next + * {@link IHAWriteMessage} and payload should be traversed normally once the + * {@link SocketChannel} has been re-opened. + * + * @author martyn Cutcher + */ +public class TestHASendAndReceiveInterrupts extends TestCase3 { + + public TestHASendAndReceiveInterrupts() { + + } + + public TestHASendAndReceiveInterrupts(String name) { + + super(name); + + } + + private class MyCallback implements IHAReceiveCallback<IHAWriteMessageBase> { + + @Override + public void callback(IHAWriteMessageBase msg, ByteBuffer data) + throws Exception { + // TODO Auto-generated method stub + + } + + } + + private HASendService sendService; + private HAReceiveService<IHAWriteMessageBase> receiveService; + private ChecksumUtility chk; + private IHAReceiveCallback<IHAWriteMessageBase> callback; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + + chk = new ChecksumUtility(); + + /* + * Note: ZERO (0) indicates that a random free port will be selected. If + * you use a fixed port then there is a danger that the port will not be + * able to be reopened immediately after it has been closed, in which + * case you will see something like: "bind address already in use". + */ + final int port = getPort(0);// 3000 + + if (log.isInfoEnabled()) + log.info("Using port=" + port); + + final InetSocketAddress addr = new InetSocketAddress(port); + + callback = new MyCallback(); + + receiveService = new HAReceiveService<IHAWriteMessageBase>(addr, + null/* nextAddr */, callback); + receiveService.start(); + + sendService = new HASendService(); + sendService.start(addr); + + } + + @Override + protected void tearDown() throws Exception { + + super.tearDown(); + + if (receiveService != null) { + receiveService.terminate(); + receiveService = null; + } + + if (sendService != null) { +// sendService.closeIncSend(); + sendService.terminate(); + sendService = null; + } + + chk = null; + + } + + public void testRecoverFromInterrupts() throws InterruptedException, + ExecutionException, TimeoutException { + + // replace receiveService with one with callback + final InetSocketAddress addrSelf = receiveService.getAddrSelf(); + final InetSocketAddress addrNext = null; + + receiveService.terminate(); + + // Callback will check fail value to determine if task should be + // interrupted + final AtomicReference<Future<Void>> receiveFail = new AtomicReference<Future<Void>>( + null); + + receiveService = new HAReceiveService<IHAWriteMessageBase>(addrSelf, + addrNext, new IHAReceiveCallback<IHAWriteMessageBase>() { + public void callback(final IHAWriteMessageBase msg, + final ByteBuffer data) throws Exception { + final Future<Void> task = receiveFail.get(); + if (task != null) { + task.cancel(true); + } + } + }); + + receiveService.start(); + + /* + * Note: Must be larger than the socket buffer / packet size or the 2nd + * message will get buffered before we can interrupt it. + * + * FIXME The problem here is that we are queuing the 2nd message + * immediately on the HASendService's Executor. The actual IncSendTask + * does not wait once it has finished deliverying data to the socket + * channel. It immediately exits, and the data transfer for the next + * queued request begins. In order to avoid messing up the payload for + * the next request, we might need to wait on the Future of the remote + * ReceiveService.receiveData() task before terminating the IncSendTask + * (or before scheduling the next one). Otherwise we may not be in a + * position where we can figure out whether or not to restart a given + * payload. If the Send Future (of the IncSendTask) was cancelled, then + * we want to drop the payload associated with that specific Future. + */ + final int msgSize = 256 + r.nextInt(1024); + final int receiveBufSize = msgSize + r.nextInt(128); + + final long timeout = 5000;// ms + { + // 1st xfer. + final ByteBuffer tst1 = getRandomData(msgSize); + final IHAWriteMessageBase msg1 = new HAWriteMessageBase(msgSize, + chk.checksum(tst1)); + final ByteBuffer rcv1 = ByteBuffer.allocate(receiveBufSize); + final Future<Void> futSnd1 = sendService.send(tst1); + + // 2nd xfer. + final ByteBuffer tst2 = getRandomData(msgSize); + final IHAWriteMessageBase msg2 = new HAWriteMessageBase(msgSize, + chk.checksum(tst2)); + final ByteBuffer rcv2 = ByteBuffer.allocate(receiveBufSize); + final Future<Void> futSnd2 = sendService.send(tst2); + + // We will interrupt the 2nd send. + receiveFail.set(futSnd2); + + // 3rd xfer. + final ByteBuffer tst3 = getRandomData(msgSize); + final IHAWriteMessageBase msg3 = new HAWriteMessageBase(msgSize, + chk.checksum(tst3)); + final ByteBuffer rcv3 = ByteBuffer.allocate(receiveBufSize); + final Future<Void> futSnd3 = sendService.send(tst3); + + final Future<Void> futRec1 = receiveService.receiveData(msg1, rcv1); + + futSnd2.get(); // should throw exception UNLESS IO done before + final Future<Void> futRec2 = receiveService.receiveData(msg2, rcv2); + + final Future<Void> futRec3 = receiveService.receiveData(msg3, rcv2); + + // first send. should be good. + futSnd1.get(timeout,TimeUnit.MILLISECONDS); + futRec1.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst1, rcv1); + + // seecond 2nd. should be cancelled. + futSnd2.get(timeout,TimeUnit.MILLISECONDS); + futRec2.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst1, rcv2); + + // 3rd send. should be good. TODO Longer sequences w/ variable message sizes to hit various timings. + futSnd3.get(timeout,TimeUnit.MILLISECONDS); + futRec3.get(timeout,TimeUnit.MILLISECONDS); + assertEquals(tst3, rcv3); + + } + + } + + public void testStressRecoverFromInterrupts() throws Exception { + tearDown(); + for (int i = 0; i < 100; i++) { + setUp(); + try { + testRecoverFromInterrupts(); + } catch (Throwable t) { + fail("Fail on pass " + i + " : " + t, t); + } finally { + tearDown(); + } + } + } + +} Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/io/TestCase3.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/io/TestCase3.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/io/TestCase3.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -27,7 +27,11 @@ package com.bigdata.io; +import java.io.IOException; +import java.net.BindException; +import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.util.Random; import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; @@ -46,7 +50,13 @@ */ public class TestCase3 extends TestCase2 { + /** + * A random number generated - the seed is NOT fixed. + */ + protected Random r; + + /** * */ public TestCase3() { @@ -61,10 +71,19 @@ } + @Override + protected void setUp() throws Exception { + + r = new Random(); + + } + protected void tearDown() throws Exception { super.tearDown(); + r = null; + TestHelper.checkJournalsClosed(this); } @@ -253,4 +272,70 @@ } + /** + * Returns random data that will fit in <i>nbytes</i>. + * + * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> + * having random contents. + */ + protected ByteBuffer getRandomData(final int nbytes) { + + final byte[] bytes = new byte[nbytes]; + + r.nextBytes(bytes); + + return ByteBuffer.wrap(bytes); + + } + + /** + * Returns random data that will fit in <i>nbytes</i>. + * + * @return A new {@link ByteBuffer} wrapping a new <code>byte[]</code> + * having random contents. + */ + protected ByteBuffer getRandomData(final ByteBuffer b, final int nbytes) { + + final byte[] a = new byte[nbytes]; + + r.nextBytes(a); + + b.limit(nbytes); + b.position(0); + b.put(a); + + b.flip(); + + return b; + + } + + /** + * Return an open port on current machine. Try the suggested port first. If + * suggestedPort is zero, just select a random port + */ + protected static int getPort(int suggestedPort) throws IOException { + + ServerSocket openSocket; + try { + openSocket = new ServerSocket(suggestedPort); + } catch (BindException ex) { + // the port is busy, so look for a random open port + openSocket = new ServerSocket(0); + } + + final int port = openSocket.getLocalPort(); + + openSocket.close(); + + if (suggestedPort != 0 && port != suggestedPort) { + + log.warn("suggestedPort is busy: suggestedPort=" + suggestedPort + ", using port=" + port + " instead"); + + } + + return port; + + } + } Modified: branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -29,9 +29,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.BindException; import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.FileChannel; @@ -2728,33 +2726,33 @@ * HA pipeline tests. */ - /** - * Return an open port on current machine. Try the suggested port first. If - * suggestedPort is zero, just select a random port - */ - private static int getPort(final int suggestedPort) throws IOException { +// /** +// * Return an open port on current machine. Try the suggested port first. If +// * suggestedPort is zero, just select a random port +// */ +// private static int getPort(final int suggestedPort) throws IOException { +// +// ServerSocket openSocket; +// try { +// openSocket = new ServerSocket(suggestedPort); +// } catch (BindException ex) { +// // the port is busy, so look for a random open port +// openSocket = new ServerSocket(0); +// } +// +// final int port = openSocket.getLocalPort(); +// +// openSocket.close(); +// +// if (suggestedPort != 0 && port != suggestedPort) { +// +// log.warn("suggestedPort is busy: suggestedPort=" + suggestedPort +// + ", using port=" + port + " instead"); +// +// } +// +// return port; +// +// } - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - - final int port = openSocket.getLocalPort(); - - openSocket.close(); - - if (suggestedPort != 0 && port != suggestedPort) { - - log.warn("suggestedPort is busy: suggestedPort=" + suggestedPort - + ", using port=" + port + " instead"); - - } - - return port; - - } - } Modified: branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java =================================================================== --- branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-04-10 09:07:45 UTC (rev 7039) +++ branches/READ_CACHE/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal.java 2013-04-10 17:35:17 UTC (rev 7040) @@ -771,6 +771,7 @@ final IBufferAccess buf = DirectBufferPool.INSTANCE.acquire(); long nsent = 0; + boolean success = false; try { while (r.hasMoreBuffers()) { @@ -792,23 +793,48 @@ try { // wait for message to make it through the pipeline. ft.get(); + nsent++; } finally { - ft.cancel(true/* mayInterruptIfRunning */); + /* + * DO NOT PROPAGATE THE INTERRUPT TO THE PIPELINE. + * + * Note: Either the Future isDone... [truncated message content] |