From: <tho...@us...> - 2013-12-09 17:36:29
|
Revision: 7627 http://bigdata.svn.sourceforge.net/bigdata/?rev=7627&view=rev Author: thompsonbry Date: 2013-12-09 17:36:22 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Sync to martyn on low-level socket behavior test suite. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 16:21:57 UTC (rev 7626) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:36:22 UTC (rev 7627) @@ -46,6 +46,12 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.io.TestCase3; +/** + * Test suite for basic socket behaviors. + * + * @author <a href="mailto:mar...@us...">Martyn Cutcher</a> + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ public class TestSocketsDirect extends TestCase3 { public TestSocketsDirect() { @@ -56,6 +62,65 @@ } /** + * Simple test of connecting to a server socket and the failure to connect + * to a port not associated with a server socket. + * + * @throws IOException + */ + public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { + + // Get two socket addressses. We will open a service on one and try to + // connect to the unused one on the other port. + final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); + final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss1 = new ServerSocket(); + try { + + ss1.bind(serverAddr1); + + assertTrue(ss1.getChannel() == null); + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr1); + if (!immediate1) { + // Did not connect immediately, so finish connect now. + if (!cs1.finishConnect()) { + fail("Did not connect."); + } + } + } finally { + cs1.close(); + } + + // Now the first Client SocketChannel + final SocketChannel cs2 = SocketChannel.open(); + try { + cs1.connect(serverAddr2); + fail("Expecting " + IOException.class); + } catch (IOException ex) { + if(log.isInfoEnabled()) + log.info("Ignoring expected exception: "+ex); + } finally { + cs2.close(); + } + + } finally { + + ss1.close(); + + } + + } + + /** * The use of threaded tasks in the send/receive service makes it difficult to * observer the socket state changes. * @@ -65,127 +130,279 @@ * ...with an accept followed by a read() of -1 on the returned Socket stream. * * @throws IOException + * @throws InterruptedException */ - public void testDirectSockets() throws IOException { + public void testDirectSockets() throws IOException, InterruptedException { - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + // The payload size that we will use. + final int DATA_LEN = 200; - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - - final boolean immediate1 = cs1.connect(serverAddr); - assertTrue("Expected immediate local connection", immediate1); - final Random r = new Random(); - final byte[] data = new byte[200]; + final byte[] data = new byte[DATA_LEN]; r.nextBytes(data); + final byte[] dst = new byte[DATA_LEN]; - final ByteBuffer src = ByteBuffer.wrap(data); + // The server side receive buffer size (once we open the server socket). + int receiveBufferSize = -1; - // Write some data - cs1.write(src); - - final byte[] dst = new byte[200]; + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - // Accept the client connection (after connect and write) - final Socket readSckt1 = accept(ss); - - InputStream instr = readSckt1.getInputStream(); - - // and read the data - instr.read(dst); - - // confirming the read is correct - assertTrue(BytesUtil.bytesEqual(data, dst)); - - assertTrue(ss.getChannel() == null); + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + try { - // now write some more data into the channel and then close it - cs1.write(ByteBuffer.wrap(data)); - - // close the client socket - cs1.close(); - - assertTrue(readSckt1.isConnected()); - assertFalse(readSckt1.isClosed()); - - // Now try writing some more data - try { - cs1.write(ByteBuffer.wrap(data)); - fail("Expected closed channel exception"); - } catch (ClosedChannelException e) { - // expected - } - - // the old stream should be closed - try { - final int rdlen = instr.read(dst); // should be closed - assertTrue(rdlen == 200); - assertTrue(BytesUtil.bytesEqual(data, dst)); + assertTrue(ss.getChannel() == null); + + // bind the server socket to the port. + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); - assertTrue(instr.read(dst) == -1); // read EOF - } catch (Exception e) { - fail("not expected"); - } - - // if so then should we explicitly close its socket? - readSckt1.close(); - assertTrue(readSckt1.isClosed()); - - assertFalse(ss.isClosed()); - assertTrue(ss.getChannel() == null); - - // Now open a new client Socket and connect to the server - - final SocketChannel cs2 = SocketChannel.open(); - final boolean immediate2 = cs2.connect(serverAddr); - - assertTrue("Expected immediate local connection", immediate2); - - // Now we should be able to accept the new connection - final Socket s2 = accept(ss); - - // ... write to the SocketChannel - final int wlen = cs2.write(ByteBuffer.wrap(data)); - - assertTrue(wlen == data.length); + // figure out the receive buffer size on the server socket. + receiveBufferSize = ss.getReceiveBufferSize(); - // failing to read from original stream - final int nrlen = instr.read(dst); - assertTrue(nrlen == -1); + if (log.isInfoEnabled()) + log.info("receiveBufferSize=" + receiveBufferSize + + ", payloadSize=" + DATA_LEN); + + if (receiveBufferSize < DATA_LEN) { - // but succeeding to read from the new Socket - final InputStream instr2 = s2.getInputStream(); - instr2.read(dst); - - assertTrue(BytesUtil.bytesEqual(data, dst)); - - // Can a downstream close be detected upstream? - instr2.close(); - - assertTrue(cs2.isOpen()); // Not after closing input stream - - s2.close(); - - assertTrue(cs2.isOpen()); // Nor after closing the socket - - // now write some more to the socket - final int wlen2 = cs2.write(ByteBuffer.wrap(data)); - assertTrue(wlen2 == data.length); - - // having closed the input, without a new connect request - // we should not be able to accept the new write - try { - final Socket s3 = accept(ss); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected + fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" + + receiveBufferSize + ", payloadSize=" + DATA_LEN); + + } + + /* + * InputStream for server side of socket connection - set below and + * then reused outside of the try/finally block. + */ + InputStream instr = null; + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr); + if (!immediate1) { + if (!cs1.finishConnect()) { + fail("Did not connect?"); + } + } + + assertTrue(ss.getChannel() == null); + + /* + * We are connected. + */ + + final ByteBuffer src = ByteBuffer.wrap(data); + + // Write some data on the client socket. + cs1.write(src); + + /* + * Accept client's connection on server (after connect and + * write). + */ + final Socket readSckt1 = accept(ss); + + // Stream to read the data from the socket on the server side. + instr = readSckt1.getInputStream(); + + // and read the data + instr.read(dst); + + // confirming the read is correct + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(ss.getChannel() == null); + + /* + * Attempting to read more returns ZERO because there is nothing + * in the buffer and the connection is still open on the client + * side. + * + * Note: instr.read(buf) will BLOCK until the data is available, + * the EOF is detected, or an exception is thrown. + */ + assertEquals(0,instr.available()); +// assertEquals(0, instr.read(dst)); + + /* + * Now write some more data into the channel and *then* close + * it. + */ + cs1.write(ByteBuffer.wrap(data)); + + // close the client side of the socket + cs1.close(); + + // The server side of client connection is still open. + assertTrue(readSckt1.isConnected()); + assertFalse(readSckt1.isClosed()); + + /* + * Now try writing some more data. This should be disallowed + * since we closed the client side of the socket. + */ + try { + cs1.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + /* + * Since we closed the client side of the socket, when we try to + * read more data on the server side of the connection. The data + * that we already buffered is still available on the server + * side of the socket. + */ + { + // the already buffered data should be available. + final int rdlen = instr.read(dst); + assertEquals(DATA_LEN, rdlen); + assertTrue(BytesUtil.bytesEqual(data, dst)); + } + + /* + * We have drained the buffered data. There is no more buffered + * data and client side is closed, so an attempt to read more + * data on the server side of the socket will return EOF (-1). + */ + assertEquals(-1, instr.read(dst)); // read EOF + + // if so then should we explicitly close its socket? + readSckt1.close(); + assertTrue(readSckt1.isClosed()); + + assertFalse(ss.isClosed()); + assertTrue(ss.getChannel() == null); + + } finally { + cs1.close(); + } + + /* + * Now open a new client Socket and connect to the server. + */ + final SocketChannel cs2 = SocketChannel.open(); + try { + + // connect to the server socket again. + final boolean immediate2 = cs2.connect(serverAddr); + if (!immediate2) { + if (!cs2.finishConnect()) { + fail("Did not connect?"); + } + } + + // Now server should accept the new client connection + final Socket s2 = accept(ss); + + // Client writes to the SocketChannel + final int wlen = cs2.write(ByteBuffer.wrap(data)); + assertEquals(DATA_LEN, wlen); // verify data written. + + // failing to read from original stream + final int nrlen = instr.read(dst); + assertEquals(-1, nrlen); + + // but succeeding to read from the new Socket + final InputStream instr2 = s2.getInputStream(); + instr2.read(dst); + assertTrue(BytesUtil.bytesEqual(data, dst)); + + /* + * Question: Can a downstream close be detected upstream? + * + * Answer: No. Closing the server socket does not tell the + * client that the socket was closed. + */ + { + // close server side input stream. + instr2.close(); + + // but the client still thinks its connected. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + // close server stocket. + s2.close(); + + // client still thinks it is connected after closing server + // socket. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + } + + /* + * Now write some more to the socket. We have closed the + * accepted connection on the server socket. Our observations + * show that the 1st write succeeds. The second write then fails + * with 'IOException: "Broken pipe"' + * + * The server socket is large (256k). We are not filling it up, + * but the 2nd write always fails. Further, the client never + * believes that the connection is closed until the 2nd write, + */ + { + final int writeSize = 1; + int nwritesOk = 0; + long nbytesReceived = 0L; + while (true) { + try { + // write a payload. + final int wlen2 = cs2.write(ByteBuffer.wrap(data, + 0, writeSize)); + // if write succeeds, should have written all bytes. + assertEquals(writeSize, wlen2); + nwritesOk++; + nbytesReceived += wlen2; + // does the client think the connection is still open? + assertTrue(cs2.isOpen()); // yes. + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: nwritesOk=" + + nwritesOk + ", nbytesReceived=" + + nbytesReceived + ", ex=" + ex); + break; + } + } + } + + /* + * Having closed the input, without a new connect request we + * should not be able to accept the new write. + */ + try { + final Socket s3 = accept(ss); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } finally { + cs2.close(); + } + + } finally { + ss.close(); } } @@ -249,19 +466,22 @@ } - // wrap the ServerSocket accept with a timeout check - Socket accept(final ServerSocket ss) { + /** wrap the ServerSocket accept with a timeout check. */ + private Socket accept(final ServerSocket ss) { + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { @Override public Void call() throws Exception { - + av.set(ss.accept()); - + return null; - }}); - + } + }); + return av.get(); } @@ -283,7 +503,21 @@ } } - private void assertNoTimeout(long timeout, TimeUnit unit, Callable<Void> callable) { + /** + * Throws {@link AssertionFailedError} if the {@link Callable} does not + * succeed within the timeout. + * + * @param timeout + * @param unit + * @param callable + * + * @throws AssertionFailedError + * if the {@link Callable} does not succeed within the timeout. + * @throws AssertionFailedError + * if the {@link Callable} fails. + */ + private void assertNoTimeout(final long timeout, final TimeUnit unit, + final Callable<Void> callable) { final ExecutorService es = Executors.newSingleThreadExecutor(); try { final Future<Void> ret = es.submit(callable); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |