From: <mar...@us...> - 2013-12-09 16:22:04
|
Revision: 7626 http://bigdata.svn.sourceforge.net/bigdata/?rev=7626&view=rev Author: martyncutcher Date: 2013-12-09 16:21:57 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Socket tests to experiment/test socket connection errors Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Added: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 16:21:57 UTC (rev 7626) @@ -0,0 +1,300 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package com.bigdata.ha.pipeline; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.AssertionFailedError; + +import com.bigdata.btree.BytesUtil; +import com.bigdata.io.TestCase3; + +public class TestSocketsDirect extends TestCase3 { + + public TestSocketsDirect() { + } + + public TestSocketsDirect(String name) { + super(name); + } + + /** + * The use of threaded tasks in the send/receive service makes it difficult to + * observer the socket state changes. + * + * So let's begin by writing some tests over the raw sockets. + * + * Note that connecting and then immediately closing the socket is perfectly okay. + * ...with an accept followed by a read() of -1 on the returned Socket stream. + * + * @throws IOException + */ + public void testDirectSockets() throws IOException { + + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + + final boolean immediate1 = cs1.connect(serverAddr); + assertTrue("Expected immediate local connection", immediate1); + + final Random r = new Random(); + final byte[] data = new byte[200]; + r.nextBytes(data); + + final ByteBuffer src = ByteBuffer.wrap(data); + + // Write some data + cs1.write(src); + + final byte[] dst = new byte[200]; + + // Accept the client connection (after connect and write) + final Socket readSckt1 = accept(ss); + + InputStream instr = readSckt1.getInputStream(); + + // and read the data + instr.read(dst); + + // confirming the read is correct + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(ss.getChannel() == null); + + // now write some more data into the channel and then close it + cs1.write(ByteBuffer.wrap(data)); + + // close the client socket + cs1.close(); + + assertTrue(readSckt1.isConnected()); + assertFalse(readSckt1.isClosed()); + + // Now try writing some more data + try { + cs1.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + // the old stream should be closed + try { + final int rdlen = instr.read(dst); // should be closed + assertTrue(rdlen == 200); + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(instr.read(dst) == -1); // read EOF + } catch (Exception e) { + fail("not expected"); + } + + // if so then should we explicitly close its socket? + readSckt1.close(); + assertTrue(readSckt1.isClosed()); + + assertFalse(ss.isClosed()); + assertTrue(ss.getChannel() == null); + + // Now open a new client Socket and connect to the server + + final SocketChannel cs2 = SocketChannel.open(); + final boolean immediate2 = cs2.connect(serverAddr); + + assertTrue("Expected immediate local connection", immediate2); + + // Now we should be able to accept the new connection + final Socket s2 = accept(ss); + + // ... write to the SocketChannel + final int wlen = cs2.write(ByteBuffer.wrap(data)); + + assertTrue(wlen == data.length); + + // failing to read from original stream + final int nrlen = instr.read(dst); + assertTrue(nrlen == -1); + + // but succeeding to read from the new Socket + final InputStream instr2 = s2.getInputStream(); + instr2.read(dst); + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // Can a downstream close be detected upstream? + instr2.close(); + + assertTrue(cs2.isOpen()); // Not after closing input stream + + s2.close(); + + assertTrue(cs2.isOpen()); // Nor after closing the socket + + // now write some more to the socket + final int wlen2 = cs2.write(ByteBuffer.wrap(data)); + assertTrue(wlen2 == data.length); + + // having closed the input, without a new connect request + // we should not be able to accept the new write + try { + final Socket s3 = accept(ss); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } + + /** + * Confirms that multiple clients can communicate with same Server + * + * @throws IOException + */ + public void testMultipleClients() throws IOException { + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); + + final int nclients = 10; + + final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); + final ArrayList<Socket> sockets = new ArrayList<Socket>(); + + final Random r = new Random(); + final byte[] data = new byte[200]; + r.nextBytes(data); + assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + for (int c = 0; c < nclients; c++) { + final SocketChannel cs = SocketChannel.open(); + cs.connect(serverAddr); + + clients.add(cs); + sockets.add(ss.accept()); + + // write to each SocketChannel (after connect/accept) + cs.write(ByteBuffer.wrap(data)); + } + return null; + } + + }); + + // Now read from all Sockets accepted on the server + final byte[] dst = new byte[200]; + for (Socket s : sockets) { + assertFalse(s.isClosed()); + + final InputStream instr = s.getInputStream(); + + assertFalse(-1 == instr.read(dst)); // doesn't return -1 + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // Close each Socket to ensure it is different + s.close(); + + assertTrue(s.isClosed()); + } + + } + + // wrap the ServerSocket accept with a timeout check + Socket accept(final ServerSocket ss) { + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + + av.set(ss.accept()); + + return null; + }}); + + return av.get(); + } + + private void assertTimeout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + final Future<Void> ret = es.submit(callable); + try { + ret.get(timeout, unit); + fail("Expected timeout"); + } catch (TimeoutException e) { + // that is expected + return; + } catch (Exception e) { + fail("Expected timeout"); + } finally { + log.warn("Cancelling task - should interrupt accept()"); + ret.cancel(true); + es.shutdown(); + } + } + + private void assertNoTimeout(long timeout, TimeUnit unit, Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + } catch (TimeoutException e) { + fail("Unexpected timeout"); + } catch (Exception e) { + fail("Unexpected Exception", e); + } finally { + es.shutdown(); + } + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-09 17:36:29
|
Revision: 7627 http://bigdata.svn.sourceforge.net/bigdata/?rev=7627&view=rev Author: thompsonbry Date: 2013-12-09 17:36:22 +0000 (Mon, 09 Dec 2013) Log Message: ----------- Sync to martyn on low-level socket behavior test suite. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 16:21:57 UTC (rev 7626) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:36:22 UTC (rev 7627) @@ -46,6 +46,12 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.io.TestCase3; +/** + * Test suite for basic socket behaviors. + * + * @author <a href="mailto:mar...@us...">Martyn Cutcher</a> + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ public class TestSocketsDirect extends TestCase3 { public TestSocketsDirect() { @@ -56,6 +62,65 @@ } /** + * Simple test of connecting to a server socket and the failure to connect + * to a port not associated with a server socket. + * + * @throws IOException + */ + public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { + + // Get two socket addressses. We will open a service on one and try to + // connect to the unused one on the other port. + final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); + final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss1 = new ServerSocket(); + try { + + ss1.bind(serverAddr1); + + assertTrue(ss1.getChannel() == null); + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr1); + if (!immediate1) { + // Did not connect immediately, so finish connect now. + if (!cs1.finishConnect()) { + fail("Did not connect."); + } + } + } finally { + cs1.close(); + } + + // Now the first Client SocketChannel + final SocketChannel cs2 = SocketChannel.open(); + try { + cs1.connect(serverAddr2); + fail("Expecting " + IOException.class); + } catch (IOException ex) { + if(log.isInfoEnabled()) + log.info("Ignoring expected exception: "+ex); + } finally { + cs2.close(); + } + + } finally { + + ss1.close(); + + } + + } + + /** * The use of threaded tasks in the send/receive service makes it difficult to * observer the socket state changes. * @@ -65,127 +130,279 @@ * ...with an accept followed by a read() of -1 on the returned Socket stream. * * @throws IOException + * @throws InterruptedException */ - public void testDirectSockets() throws IOException { + public void testDirectSockets() throws IOException, InterruptedException { - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + // The payload size that we will use. + final int DATA_LEN = 200; - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - - final boolean immediate1 = cs1.connect(serverAddr); - assertTrue("Expected immediate local connection", immediate1); - final Random r = new Random(); - final byte[] data = new byte[200]; + final byte[] data = new byte[DATA_LEN]; r.nextBytes(data); + final byte[] dst = new byte[DATA_LEN]; - final ByteBuffer src = ByteBuffer.wrap(data); + // The server side receive buffer size (once we open the server socket). + int receiveBufferSize = -1; - // Write some data - cs1.write(src); - - final byte[] dst = new byte[200]; + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - // Accept the client connection (after connect and write) - final Socket readSckt1 = accept(ss); - - InputStream instr = readSckt1.getInputStream(); - - // and read the data - instr.read(dst); - - // confirming the read is correct - assertTrue(BytesUtil.bytesEqual(data, dst)); - - assertTrue(ss.getChannel() == null); + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + try { - // now write some more data into the channel and then close it - cs1.write(ByteBuffer.wrap(data)); - - // close the client socket - cs1.close(); - - assertTrue(readSckt1.isConnected()); - assertFalse(readSckt1.isClosed()); - - // Now try writing some more data - try { - cs1.write(ByteBuffer.wrap(data)); - fail("Expected closed channel exception"); - } catch (ClosedChannelException e) { - // expected - } - - // the old stream should be closed - try { - final int rdlen = instr.read(dst); // should be closed - assertTrue(rdlen == 200); - assertTrue(BytesUtil.bytesEqual(data, dst)); + assertTrue(ss.getChannel() == null); + + // bind the server socket to the port. + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); - assertTrue(instr.read(dst) == -1); // read EOF - } catch (Exception e) { - fail("not expected"); - } - - // if so then should we explicitly close its socket? - readSckt1.close(); - assertTrue(readSckt1.isClosed()); - - assertFalse(ss.isClosed()); - assertTrue(ss.getChannel() == null); - - // Now open a new client Socket and connect to the server - - final SocketChannel cs2 = SocketChannel.open(); - final boolean immediate2 = cs2.connect(serverAddr); - - assertTrue("Expected immediate local connection", immediate2); - - // Now we should be able to accept the new connection - final Socket s2 = accept(ss); - - // ... write to the SocketChannel - final int wlen = cs2.write(ByteBuffer.wrap(data)); - - assertTrue(wlen == data.length); + // figure out the receive buffer size on the server socket. + receiveBufferSize = ss.getReceiveBufferSize(); - // failing to read from original stream - final int nrlen = instr.read(dst); - assertTrue(nrlen == -1); + if (log.isInfoEnabled()) + log.info("receiveBufferSize=" + receiveBufferSize + + ", payloadSize=" + DATA_LEN); + + if (receiveBufferSize < DATA_LEN) { - // but succeeding to read from the new Socket - final InputStream instr2 = s2.getInputStream(); - instr2.read(dst); - - assertTrue(BytesUtil.bytesEqual(data, dst)); - - // Can a downstream close be detected upstream? - instr2.close(); - - assertTrue(cs2.isOpen()); // Not after closing input stream - - s2.close(); - - assertTrue(cs2.isOpen()); // Nor after closing the socket - - // now write some more to the socket - final int wlen2 = cs2.write(ByteBuffer.wrap(data)); - assertTrue(wlen2 == data.length); - - // having closed the input, without a new connect request - // we should not be able to accept the new write - try { - final Socket s3 = accept(ss); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected + fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" + + receiveBufferSize + ", payloadSize=" + DATA_LEN); + + } + + /* + * InputStream for server side of socket connection - set below and + * then reused outside of the try/finally block. + */ + InputStream instr = null; + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr); + if (!immediate1) { + if (!cs1.finishConnect()) { + fail("Did not connect?"); + } + } + + assertTrue(ss.getChannel() == null); + + /* + * We are connected. + */ + + final ByteBuffer src = ByteBuffer.wrap(data); + + // Write some data on the client socket. + cs1.write(src); + + /* + * Accept client's connection on server (after connect and + * write). + */ + final Socket readSckt1 = accept(ss); + + // Stream to read the data from the socket on the server side. + instr = readSckt1.getInputStream(); + + // and read the data + instr.read(dst); + + // confirming the read is correct + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(ss.getChannel() == null); + + /* + * Attempting to read more returns ZERO because there is nothing + * in the buffer and the connection is still open on the client + * side. + * + * Note: instr.read(buf) will BLOCK until the data is available, + * the EOF is detected, or an exception is thrown. + */ + assertEquals(0,instr.available()); +// assertEquals(0, instr.read(dst)); + + /* + * Now write some more data into the channel and *then* close + * it. + */ + cs1.write(ByteBuffer.wrap(data)); + + // close the client side of the socket + cs1.close(); + + // The server side of client connection is still open. + assertTrue(readSckt1.isConnected()); + assertFalse(readSckt1.isClosed()); + + /* + * Now try writing some more data. This should be disallowed + * since we closed the client side of the socket. + */ + try { + cs1.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + /* + * Since we closed the client side of the socket, when we try to + * read more data on the server side of the connection. The data + * that we already buffered is still available on the server + * side of the socket. + */ + { + // the already buffered data should be available. + final int rdlen = instr.read(dst); + assertEquals(DATA_LEN, rdlen); + assertTrue(BytesUtil.bytesEqual(data, dst)); + } + + /* + * We have drained the buffered data. There is no more buffered + * data and client side is closed, so an attempt to read more + * data on the server side of the socket will return EOF (-1). + */ + assertEquals(-1, instr.read(dst)); // read EOF + + // if so then should we explicitly close its socket? + readSckt1.close(); + assertTrue(readSckt1.isClosed()); + + assertFalse(ss.isClosed()); + assertTrue(ss.getChannel() == null); + + } finally { + cs1.close(); + } + + /* + * Now open a new client Socket and connect to the server. + */ + final SocketChannel cs2 = SocketChannel.open(); + try { + + // connect to the server socket again. + final boolean immediate2 = cs2.connect(serverAddr); + if (!immediate2) { + if (!cs2.finishConnect()) { + fail("Did not connect?"); + } + } + + // Now server should accept the new client connection + final Socket s2 = accept(ss); + + // Client writes to the SocketChannel + final int wlen = cs2.write(ByteBuffer.wrap(data)); + assertEquals(DATA_LEN, wlen); // verify data written. + + // failing to read from original stream + final int nrlen = instr.read(dst); + assertEquals(-1, nrlen); + + // but succeeding to read from the new Socket + final InputStream instr2 = s2.getInputStream(); + instr2.read(dst); + assertTrue(BytesUtil.bytesEqual(data, dst)); + + /* + * Question: Can a downstream close be detected upstream? + * + * Answer: No. Closing the server socket does not tell the + * client that the socket was closed. + */ + { + // close server side input stream. + instr2.close(); + + // but the client still thinks its connected. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + // close server stocket. + s2.close(); + + // client still thinks it is connected after closing server + // socket. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + } + + /* + * Now write some more to the socket. We have closed the + * accepted connection on the server socket. Our observations + * show that the 1st write succeeds. The second write then fails + * with 'IOException: "Broken pipe"' + * + * The server socket is large (256k). We are not filling it up, + * but the 2nd write always fails. Further, the client never + * believes that the connection is closed until the 2nd write, + */ + { + final int writeSize = 1; + int nwritesOk = 0; + long nbytesReceived = 0L; + while (true) { + try { + // write a payload. + final int wlen2 = cs2.write(ByteBuffer.wrap(data, + 0, writeSize)); + // if write succeeds, should have written all bytes. + assertEquals(writeSize, wlen2); + nwritesOk++; + nbytesReceived += wlen2; + // does the client think the connection is still open? + assertTrue(cs2.isOpen()); // yes. + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: nwritesOk=" + + nwritesOk + ", nbytesReceived=" + + nbytesReceived + ", ex=" + ex); + break; + } + } + } + + /* + * Having closed the input, without a new connect request we + * should not be able to accept the new write. + */ + try { + final Socket s3 = accept(ss); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } finally { + cs2.close(); + } + + } finally { + ss.close(); } } @@ -249,19 +466,22 @@ } - // wrap the ServerSocket accept with a timeout check - Socket accept(final ServerSocket ss) { + /** wrap the ServerSocket accept with a timeout check. */ + private Socket accept(final ServerSocket ss) { + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { @Override public Void call() throws Exception { - + av.set(ss.accept()); - + return null; - }}); - + } + }); + return av.get(); } @@ -283,7 +503,21 @@ } } - private void assertNoTimeout(long timeout, TimeUnit unit, Callable<Void> callable) { + /** + * Throws {@link AssertionFailedError} if the {@link Callable} does not + * succeed within the timeout. + * + * @param timeout + * @param unit + * @param callable + * + * @throws AssertionFailedError + * if the {@link Callable} does not succeed within the timeout. + * @throws AssertionFailedError + * if the {@link Callable} fails. + */ + private void assertNoTimeout(final long timeout, final TimeUnit unit, + final Callable<Void> callable) { final ExecutorService es = Executors.newSingleThreadExecutor(); try { final Future<Void> ret = es.submit(callable); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-09 17:47:49
|
Revision: 7628 http://bigdata.svn.sourceforge.net/bigdata/?rev=7628&view=rev Author: thompsonbry Date: 2013-12-09 17:47:42 +0000 (Mon, 09 Dec 2013) Log Message: ----------- sync to martyn Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:36:22 UTC (rev 7627) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-09 17:47:42 UTC (rev 7628) @@ -78,10 +78,22 @@ final ServerSocket ss1 = new ServerSocket(); try { + // bind the ServerSocket to the specified port. ss1.bind(serverAddr1); assertTrue(ss1.getChannel() == null); + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + // Now the first Client SocketChannel final SocketChannel cs1 = SocketChannel.open(); try { @@ -112,6 +124,17 @@ cs2.close(); } + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + } finally { ss1.close(); @@ -388,7 +411,8 @@ /* * Having closed the input, without a new connect request we - * should not be able to accept the new write. + * should not be able to accept the new write since the data + * were written on a different client connection. */ try { final Socket s3 = accept(ss); @@ -413,55 +437,105 @@ * @throws IOException */ public void testMultipleClients() throws IOException { - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - final ServerSocket ss = new ServerSocket(); - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - + // The payload size that we will use. + final int DATA_LEN = 200; + final Random r = new Random(); + final byte[] data = new byte[DATA_LEN]; + r.nextBytes(data); + final int nclients = 10; - + final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); final ArrayList<Socket> sockets = new ArrayList<Socket>(); - - final Random r = new Random(); - final byte[] data = new byte[200]; - r.nextBytes(data); - assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { - @Override - public Void call() throws Exception { - for (int c = 0; c < nclients; c++) { - final SocketChannel cs = SocketChannel.open(); - cs.connect(serverAddr); - - clients.add(cs); - sockets.add(ss.accept()); - - // write to each SocketChannel (after connect/accept) - cs.write(ByteBuffer.wrap(data)); - } - return null; - } - - }); - - // Now read from all Sockets accepted on the server - final byte[] dst = new byte[200]; - for (Socket s : sockets) { - assertFalse(s.isClosed()); - - final InputStream instr = s.getInputStream(); + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + try { + + // bind the ServerSocket to the specified port. + ss.bind(serverAddr); - assertFalse(-1 == instr.read(dst)); // doesn't return -1 + assertTrue(ss.getChannel() == null); + + final int receiveBufferSize = ss.getReceiveBufferSize(); + + // Make sure that we have enough room to receive all client writes + // before draining any of them. + assertTrue(DATA_LEN * nclients <= receiveBufferSize); - assertTrue(BytesUtil.bytesEqual(data, dst)); + assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + + for (int c = 0; c < nclients; c++) { + + // client connects to server. + final SocketChannel cs = SocketChannel.open(); + cs.connect(serverAddr); + clients.add(cs); + + // accept connection on server. + sockets.add(ss.accept()); + + // write to each SocketChannel (after connect/accept) + cs.write(ByteBuffer.wrap(data)); + } + + return null; + + } + + }); + + /* + * Now read from all Sockets accepted on the server. + * + * Note: This is a simple loop, not a parallel read. The same buffer + * is reused on each iteration. + */ + { + + final byte[] dst = new byte[DATA_LEN]; + + for (Socket s : sockets) { + + assertFalse(s.isClosed()); + + final InputStream instr = s.getInputStream(); + + assertFalse(-1 == instr.read(dst)); // doesn't return -1 + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // Close each Socket to ensure it is different + s.close(); + + assertTrue(s.isClosed()); + + } - // Close each Socket to ensure it is different - s.close(); + } + + } finally { + + // ensure client side connections are closed. + for (SocketChannel ch : clients) { + if (ch != null) + ch.close(); + } - assertTrue(s.isClosed()); + // ensure server side connections are closed. + for (Socket s : sockets) { + if (s != null) + s.close(); + } + + // close the server socket. + ss.close(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-13 02:47:19
|
Revision: 7638 http://bigdata.svn.sourceforge.net/bigdata/?rev=7638&view=rev Author: thompsonbry Date: 2013-12-13 02:47:11 +0000 (Fri, 13 Dec 2013) Log Message: ----------- removing file with java 7 dependencies that should not have been in the last commit. Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-12 23:40:15 UTC (rev 7637) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-13 02:47:11 UTC (rev 7638) @@ -1,818 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -package com.bigdata.ha.pipeline; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketOption; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import junit.framework.AssertionFailedError; - -import com.bigdata.btree.BytesUtil; -import com.bigdata.io.TestCase3; - -/** - * Test suite for basic socket behaviors. - * <p> - * Note: Tests in this suite should use direct byte buffers (non-heap NIO) - * buffers in order accurately model the conditions that bigdata uses for write - * replication. If you use heap byte[]s, then they are copied into an NIO direct - * buffer before they are transmitted over a socket. By using NIO direct - * buffers, we stay within the zero-copy pattern for sockets. - * <p> - * Note: Tests in this suite need to use {@link ServerSocketChannel#open()} to - * get access to the stream oriented listening interface for the server side of - * the socket. This is what is used by the {@link HAReceiveService}. It also - * sets up the {@link ServerSocketChannel} in a non-blocking mode and then uses - * the selectors to listen for available data. See {@link HAReceiveService}. - * - * @author <a href="mailto:mar...@us...">Martyn - * Cutcher</a> - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ -public class TestSocketsDirect extends TestCase3 { - - public TestSocketsDirect() { - } - - public TestSocketsDirect(String name) { - super(name); - } - - /** - * Writes out the available options for the client and server socket. - * - * @throws IOException - */ - public void testDirectSockets_options() throws IOException { - - // Get a socket addresss for an unused port. - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocketChannel - final ServerSocketChannel ssc = ServerSocketChannel.open(); - try { - - // bind the ServerSocket to the specified port. - ssc.bind(serverAddr); - - // Now the first Client SocketChannel - final SocketChannel cs = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate = cs.connect(serverAddr); - if (!immediate) { - // Did not connect immediately, so finish connect now. - if (!cs.finishConnect()) { - fail("Did not connect."); - } - } - - /* - * Write out the client socket options. - */ - log.info("Client:: isOpen=" + cs.isOpen()); - log.info("Client:: isBlocking=" + cs.isBlocking()); - log.info("Client:: isRegistered=" + cs.isRegistered()); - log.info("Client:: isConnected=" + cs.isConnected()); - log.info("Client:: isConnectionPending=" - + cs.isConnectionPending()); - for (SocketOption<?> opt : cs.supportedOptions()) { - log.info("Client:: " + opt + " := " + cs.getOption(opt)); - } - - /* - * Note: We need to use ServerSocketChannel.open() to get access - * to the stream oriented listening interface for the server - * side of the socket. - */ - log.info("Server:: isOpen=" + ssc.isOpen()); - log.info("Server:: isBlocking=" + ssc.isBlocking()); - log.info("Server:: isRegistered=" + ssc.isRegistered()); - for (SocketOption<?> opt : ssc.supportedOptions()) { - log.info("Server:: " + opt + " := " + cs.getOption(opt)); - } - - } finally { - cs.close(); - } - - } finally { - - ssc.close(); - - } - - } - - /** - * Simple test of connecting to a server socket and the failure to connect - * to a port not associated with a server socket. - * - * @throws IOException - */ - public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { - - // Get two socket addressses. We will open a service on one and try to - // connect to the unused one on the other port. - final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); - final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss1 = new ServerSocket(); - try { - - // bind the ServerSocket to the specified port. - ss1.bind(serverAddr1); - - assertTrue(ss1.getChannel() == null); - - /* - * Without a new connect request we should not be able to accept() a - * new connection. - */ - try { - accept(ss1); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate1 = cs1.connect(serverAddr1); - if (!immediate1) { - // Did not connect immediately, so finish connect now. - if (!cs1.finishConnect()) { - fail("Did not connect."); - } - } - } finally { - cs1.close(); - } - - // Now the first Client SocketChannel - final SocketChannel cs2 = SocketChannel.open(); - try { - cs1.connect(serverAddr2); - fail("Expecting " + IOException.class); - } catch (IOException ex) { - if(log.isInfoEnabled()) - log.info("Ignoring expected exception: "+ex); - } finally { - cs2.close(); - } - - /* - * Without a new connect request we should not be able to accept() a - * new connection. - */ - try { - accept(ss1); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - } finally { - - ss1.close(); - - } - - } - - /** - * Test of a large write on a socket to understand what happens when the - * write is greater than the combined size of the client send buffer and the - * server receive buffer and the server side of the socket is either not - * accepted or already shutdown. - * - * @throws IOException - * @throws InterruptedException - */ - public void testDirectSockets_largeWrite_NotAccepted() throws IOException, - InterruptedException { - - final Random r = new Random(); - - // Get a socket addresss for an unused port. - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - try { - - // Size of the server socket receive buffer. - final int receiveBufferSize = ss.getReceiveBufferSize(); - - // Allocate buffer twice as large as the receive buffer. - final byte[] largeBuffer = new byte[receiveBufferSize * 10]; - - if (log.isInfoEnabled()) { - log.info("receiveBufferSize=" + receiveBufferSize - + ", largeBufferSize=" + largeBuffer.length); - } - - // fill buffer with random data. - r.nextBytes(largeBuffer); - - // bind the ServerSocket to the specified port. - ss.bind(serverAddr); - - // Now the first Client SocketChannel - final SocketChannel cs = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate = cs.connect(serverAddr); - if (!immediate) { - // Did not connect immediately, so finish connect now. - if (!cs.finishConnect()) { - fail("Did not connect."); - } - } - - /* - * Attempt to write data. The server socket is not yet accepted. - * This should hit a timeout. - */ - assertTimeout(10L, TimeUnit.SECONDS, new WriteBufferTask(cs, - ByteBuffer.wrap(largeBuffer))); - - accept(ss); - - } finally { - cs.close(); - } - - } finally { - - ss.close(); - - } - - } - - /** - * The use of threaded tasks in the send/receive service makes it difficult to - * observer the socket state changes. - * - * So let's begin by writing some tests over the raw sockets. - * - * Note that connecting and then immediately closing the socket is perfectly okay. - * ...with an accept followed by a read() of -1 on the returned Socket stream. - * - * @throws IOException - * @throws InterruptedException - */ - public void testDirectSockets() throws IOException, InterruptedException { - - // The payload size that we will use. - final int DATA_LEN = 200; - - final Random r = new Random(); - final byte[] data = new byte[DATA_LEN]; - r.nextBytes(data); - final byte[] dst = new byte[DATA_LEN]; - - // The server side receive buffer size (once we open the server socket). - int receiveBufferSize = -1; - - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocket - final ServerSocket ss = new ServerSocket(); - try { - - assertTrue(ss.getChannel() == null); - - // bind the server socket to the port. - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - // figure out the receive buffer size on the server socket. - receiveBufferSize = ss.getReceiveBufferSize(); - - if (log.isInfoEnabled()) - log.info("receiveBufferSize=" + receiveBufferSize - + ", payloadSize=" + DATA_LEN); - - if (receiveBufferSize < DATA_LEN) { - - fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" - + receiveBufferSize + ", payloadSize=" + DATA_LEN); - - } - - { - /* - * InputStream for server side of socket connection - set below and - * then reused outside of the try/finally block. - */ - InputStream instr = null; - - // Now the first Client SocketChannel - final SocketChannel cs1 = SocketChannel.open(); - try { - - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate1 = cs1.connect(serverAddr); - if (!immediate1) { - if (!cs1.finishConnect()) { - fail("Did not connect?"); - } - } - - assertTrue(ss.getChannel() == null); - - /* - * We are connected. - */ - - final ByteBuffer src = ByteBuffer.wrap(data); - - // Write some data on the client socket. - cs1.write(src); - - /* - * Accept client's connection on server (after connect and - * write). - */ - final Socket readSckt1 = accept(ss); - - // Stream to read the data from the socket on the server - // side. - instr = readSckt1.getInputStream(); - - // and read the data - instr.read(dst); - - // confirming the read is correct - assertTrue(BytesUtil.bytesEqual(data, dst)); - - assertTrue(ss.getChannel() == null); - - /* - * Attempting to read more returns ZERO because there is - * nothing in the buffer and the connection is still open on - * the client side. - * - * Note: instr.read(buf) will BLOCK until the data is - * available, the EOF is detected, or an exception is - * thrown. - */ - assertEquals(0, instr.available()); - // assertEquals(0, instr.read(dst)); - - /* - * Now write some more data into the channel and *then* - * close it. - */ - cs1.write(ByteBuffer.wrap(data)); - - // close the client side of the socket - cs1.close(); - - // The server side of client connection is still open. - assertTrue(readSckt1.isConnected()); - assertFalse(readSckt1.isClosed()); - - /* - * Now try writing some more data. This should be disallowed - * since we closed the client side of the socket. - */ - try { - cs1.write(ByteBuffer.wrap(data)); - fail("Expected closed channel exception"); - } catch (ClosedChannelException e) { - // expected - } - - /* - * Since we closed the client side of the socket, when we - * try to read more data on the server side of the - * connection. The data that we already buffered is still - * available on the server side of the socket. - */ - { - // the already buffered data should be available. - final int rdlen = instr.read(dst); - assertEquals(DATA_LEN, rdlen); - assertTrue(BytesUtil.bytesEqual(data, dst)); - } - - /* - * We have drained the buffered data. There is no more - * buffered data and client side is closed, so an attempt to - * read more data on the server side of the socket will - * return EOF (-1). - */ - assertEquals(-1, instr.read(dst)); // read EOF - - // if so then should we explicitly close its socket? - readSckt1.close(); - assertTrue(readSckt1.isClosed()); - - /* - * Still reports EOF after the accepted server socket is - * closed. - */ - assertEquals(-1, instr.read(dst)); - - assertFalse(ss.isClosed()); - assertTrue(ss.getChannel() == null); - - } finally { - cs1.close(); - } - - // failing to read from original stream - final int nrlen = instr.read(dst); - assertEquals(-1, nrlen); - } - - /* - * Now open a new client Socket and connect to the server. - */ - final SocketChannel cs2 = SocketChannel.open(); - try { - - // connect to the server socket again. - final boolean immediate2 = cs2.connect(serverAddr); - if (!immediate2) { - if (!cs2.finishConnect()) { - fail("Did not connect?"); - } - } - - // Now server should accept the new client connection - final Socket s2 = accept(ss); - - // Client writes to the SocketChannel - final int wlen = cs2.write(ByteBuffer.wrap(data)); - assertEquals(DATA_LEN, wlen); // verify data written. - - // but succeeding to read from the new Socket - final InputStream instr2 = s2.getInputStream(); - instr2.read(dst); - assertTrue(BytesUtil.bytesEqual(data, dst)); - - /* - * Question: Can a downstream close be detected upstream? - * - * Answer: No. Closing the server socket does not tell the - * client that the socket was closed. - */ - { - // close server side input stream. - instr2.close(); - - // but the client still thinks its connected. - assertTrue(cs2.isOpen()); - - // Does the client believe it is still open after a brief - // sleep? - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - - // close server stocket. - s2.close(); - - // client still thinks it is connected after closing server - // socket. - assertTrue(cs2.isOpen()); - - // Does the client believe it is still open after a brief - // sleep? - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - - } - - /* - * Now write some more to the socket. We have closed the - * accepted connection on the server socket. Our observations - * show that the 1st write succeeds. The second write then fails - * with 'IOException: "Broken pipe"' - * - * The server socket is large (256k). We are not filling it up, - * but the 2nd write always fails. Further, the client never - * believes that the connection is closed until the 2nd write, - */ - { - final int writeSize = 1; - int nwritesOk = 0; - long nbytesReceived = 0L; - while (true) { - try { - // write a payload. - final int wlen2 = cs2.write(ByteBuffer.wrap(data, - 0, writeSize)); - // if write succeeds, should have written all bytes. - assertEquals(writeSize, wlen2); - nwritesOk++; - nbytesReceived += wlen2; - // does the client think the connection is still open? - assertTrue(cs2.isOpen()); // yes. - Thread.sleep(1000); - assertTrue(cs2.isOpen()); // yes. - } catch (IOException ex) { - if (log.isInfoEnabled()) - log.info("Expected exception: nwritesOk=" - + nwritesOk + ", nbytesReceived=" - + nbytesReceived + ", ex=" + ex); - break; - } - } - } - - /* - * Having closed the input, without a new connect request we - * should not be able to accept the new write since the data - * were written on a different client connection. - */ - try { - final Socket s3 = accept(ss); - fail("Expected timeout failure"); - } catch (AssertionFailedError afe) { - // expected - } - - } finally { - cs2.close(); - } - - } finally { - ss.close(); - } - - } - - /** - * Confirms that multiple clients can communicate with same Server - * - * @throws IOException - */ - public void testMultipleClients() throws IOException { - - // The payload size that we will use. - final int DATA_LEN = 200; - final Random r = new Random(); - final byte[] data = new byte[DATA_LEN]; - r.nextBytes(data); - - final int nclients = 10; - - final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); - final ArrayList<Socket> sockets = new ArrayList<Socket>(); - - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - final ServerSocket ss = new ServerSocket(); - try { - - // bind the ServerSocket to the specified port. - ss.bind(serverAddr); - - assertTrue(ss.getChannel() == null); - - final int receiveBufferSize = ss.getReceiveBufferSize(); - - // Make sure that we have enough room to receive all client writes - // before draining any of them. - assertTrue(DATA_LEN * nclients <= receiveBufferSize); - - assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { - - @Override - public Void call() throws Exception { - - for (int c = 0; c < nclients; c++) { - - // client connects to server. - final SocketChannel cs = SocketChannel.open(); - cs.connect(serverAddr); - clients.add(cs); - - // accept connection on server. - sockets.add(ss.accept()); - - // write to each SocketChannel (after connect/accept) - cs.write(ByteBuffer.wrap(data)); - } - - return null; - - } - - }); - - /* - * Now read from all Sockets accepted on the server. - * - * Note: This is a simple loop, not a parallel read. The same buffer - * is reused on each iteration. - */ - { - - final byte[] dst = new byte[DATA_LEN]; - - for (Socket s : sockets) { - - assertFalse(s.isClosed()); - - final InputStream instr = s.getInputStream(); - - assertFalse(-1 == instr.read(dst)); // doesn't return -1 - - assertTrue(BytesUtil.bytesEqual(data, dst)); - - // Close each Socket to ensure it is different - s.close(); - - assertTrue(s.isClosed()); - - } - - } - - } finally { - - // ensure client side connections are closed. - for (SocketChannel ch : clients) { - if (ch != null) - ch.close(); - } - - // ensure server side connections are closed. - for (Socket s : sockets) { - if (s != null) - s.close(); - } - - // close the server socket. - ss.close(); - - } - - } - - /** wrap the ServerSocket accept with a timeout check. */ - private Socket accept(final ServerSocket ss) { - - final AtomicReference<Socket> av = new AtomicReference<Socket>(); - - assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { - - @Override - public Void call() throws Exception { - - av.set(ss.accept()); - - return null; - } - }); - - return av.get(); - } - - /** - * Fail the test if the {@link Callable} completes before the specified - * timeout. - * - * @param timeout - * @param unit - * @param callable - */ - private void assertTimeout(final long timeout, final TimeUnit unit, - final Callable<Void> callable) { - final ExecutorService es = Executors.newSingleThreadExecutor(); - final Future<Void> ret = es.submit(callable); - final long begin = System.currentTimeMillis(); - try { - // await Future with timeout. - ret.get(timeout, unit); - final long elapsed = System.currentTimeMillis() - begin; - fail("Expected timeout: elapsed=" + elapsed + "ms, timeout=" - + timeout + " " + unit); - } catch (TimeoutException e) { - // that is expected - final long elapsed = System.currentTimeMillis() - begin; - if (log.isInfoEnabled()) - log.info("timeout after " + elapsed + "ms"); - return; - } catch (Exception e) { - final long elapsed = System.currentTimeMillis() - begin; - fail("Expected timeout: elapsed=" + elapsed + ", timeout=" - + timeout + " " + unit, e); - } finally { - log.warn("Cancelling task - should interrupt accept()"); - ret.cancel(true/* mayInterruptIfRunning */); - es.shutdown(); - } - } - - /** - * Throws {@link AssertionFailedError} if the {@link Callable} does not - * succeed within the timeout. - * - * @param timeout - * @param unit - * @param callable - * - * @throws AssertionFailedError - * if the {@link Callable} does not succeed within the timeout. - * @throws AssertionFailedError - * if the {@link Callable} fails. - */ - private void assertNoTimeout(final long timeout, final TimeUnit unit, - final Callable<Void> callable) { - final ExecutorService es = Executors.newSingleThreadExecutor(); - try { - final Future<Void> ret = es.submit(callable); - ret.get(timeout, unit); - } catch (TimeoutException e) { - fail("Unexpected timeout"); - } catch (Exception e) { - fail("Unexpected Exception", e); - } finally { - es.shutdown(); - } - } - - /** - * Task writes the data on the client {@link SocketChannel}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ - private static class WriteBufferTask implements Callable<Void> { - - final private ByteBuffer buf; - final private SocketChannel cs; - - public WriteBufferTask(final SocketChannel cs, final ByteBuffer buf) { - this.cs = cs; - this.buf = buf; - } - - @Override - public Void call() throws Exception { - cs.write(buf); - return null; - } - - } - -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-19 21:11:26
|
Revision: 7684 http://bigdata.svn.sourceforge.net/bigdata/?rev=7684&view=rev Author: thompsonbry Date: 2013-12-19 21:11:18 +0000 (Thu, 19 Dec 2013) Log Message: ----------- Adding my version of TestSocketsDirect prior to merging in changes from the main development branch. See #779 Added Paths: ----------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Added: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java (rev 0) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-19 21:11:18 UTC (rev 7684) @@ -0,0 +1,818 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package com.bigdata.ha.pipeline; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketOption; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.AssertionFailedError; + +import com.bigdata.btree.BytesUtil; +import com.bigdata.io.TestCase3; + +/** + * Test suite for basic socket behaviors. + * <p> + * Note: Tests in this suite should use direct byte buffers (non-heap NIO) + * buffers in order accurately model the conditions that bigdata uses for write + * replication. If you use heap byte[]s, then they are copied into an NIO direct + * buffer before they are transmitted over a socket. By using NIO direct + * buffers, we stay within the zero-copy pattern for sockets. + * <p> + * Note: Tests in this suite need to use {@link ServerSocketChannel#open()} to + * get access to the stream oriented listening interface for the server side of + * the socket. This is what is used by the {@link HAReceiveService}. It also + * sets up the {@link ServerSocketChannel} in a non-blocking mode and then uses + * the selectors to listen for available data. See {@link HAReceiveService}. + * + * @author <a href="mailto:mar...@us...">Martyn + * Cutcher</a> + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class TestSocketsDirect extends TestCase3 { + + public TestSocketsDirect() { + } + + public TestSocketsDirect(String name) { + super(name); + } + + /** + * Writes out the available options for the client and server socket. + * + * @throws IOException + */ + public void testDirectSockets_options() throws IOException { + + // Get a socket addresss for an unused port. + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + // First our ServerSocketChannel + final ServerSocketChannel ssc = ServerSocketChannel.open(); + try { + + // bind the ServerSocket to the specified port. + ssc.bind(serverAddr); + + // Now the first Client SocketChannel + final SocketChannel cs = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate = cs.connect(serverAddr); + if (!immediate) { + // Did not connect immediately, so finish connect now. + if (!cs.finishConnect()) { + fail("Did not connect."); + } + } + + /* + * Write out the client socket options. + */ + log.info("Client:: isOpen=" + cs.isOpen()); + log.info("Client:: isBlocking=" + cs.isBlocking()); + log.info("Client:: isRegistered=" + cs.isRegistered()); + log.info("Client:: isConnected=" + cs.isConnected()); + log.info("Client:: isConnectionPending=" + + cs.isConnectionPending()); + for (SocketOption<?> opt : cs.supportedOptions()) { + log.info("Client:: " + opt + " := " + cs.getOption(opt)); + } + + /* + * Note: We need to use ServerSocketChannel.open() to get access + * to the stream oriented listening interface for the server + * side of the socket. + */ + log.info("Server:: isOpen=" + ssc.isOpen()); + log.info("Server:: isBlocking=" + ssc.isBlocking()); + log.info("Server:: isRegistered=" + ssc.isRegistered()); + for (SocketOption<?> opt : ssc.supportedOptions()) { + log.info("Server:: " + opt + " := " + cs.getOption(opt)); + } + + } finally { + cs.close(); + } + + } finally { + + ssc.close(); + + } + + } + + /** + * Simple test of connecting to a server socket and the failure to connect + * to a port not associated with a server socket. + * + * @throws IOException + */ + public void testDirectSockets_exceptionIfPortNotOpen() throws IOException { + + // Get two socket addressses. We will open a service on one and try to + // connect to the unused one on the other port. + final InetSocketAddress serverAddr1 = new InetSocketAddress(getPort(0)); + final InetSocketAddress serverAddr2 = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss1 = new ServerSocket(); + try { + + // bind the ServerSocket to the specified port. + ss1.bind(serverAddr1); + + assertTrue(ss1.getChannel() == null); + + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr1); + if (!immediate1) { + // Did not connect immediately, so finish connect now. + if (!cs1.finishConnect()) { + fail("Did not connect."); + } + } + } finally { + cs1.close(); + } + + // Now the first Client SocketChannel + final SocketChannel cs2 = SocketChannel.open(); + try { + cs1.connect(serverAddr2); + fail("Expecting " + IOException.class); + } catch (IOException ex) { + if(log.isInfoEnabled()) + log.info("Ignoring expected exception: "+ex); + } finally { + cs2.close(); + } + + /* + * Without a new connect request we should not be able to accept() a + * new connection. + */ + try { + accept(ss1); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } finally { + + ss1.close(); + + } + + } + + /** + * Test of a large write on a socket to understand what happens when the + * write is greater than the combined size of the client send buffer and the + * server receive buffer and the server side of the socket is either not + * accepted or already shutdown. + * + * @throws IOException + * @throws InterruptedException + */ + public void testDirectSockets_largeWrite_NotAccepted() throws IOException, + InterruptedException { + + final Random r = new Random(); + + // Get a socket addresss for an unused port. + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + try { + + // Size of the server socket receive buffer. + final int receiveBufferSize = ss.getReceiveBufferSize(); + + // Allocate buffer twice as large as the receive buffer. + final byte[] largeBuffer = new byte[receiveBufferSize * 10]; + + if (log.isInfoEnabled()) { + log.info("receiveBufferSize=" + receiveBufferSize + + ", largeBufferSize=" + largeBuffer.length); + } + + // fill buffer with random data. + r.nextBytes(largeBuffer); + + // bind the ServerSocket to the specified port. + ss.bind(serverAddr); + + // Now the first Client SocketChannel + final SocketChannel cs = SocketChannel.open(); + try { + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate = cs.connect(serverAddr); + if (!immediate) { + // Did not connect immediately, so finish connect now. + if (!cs.finishConnect()) { + fail("Did not connect."); + } + } + + /* + * Attempt to write data. The server socket is not yet accepted. + * This should hit a timeout. + */ + assertTimeout(10L, TimeUnit.SECONDS, new WriteBufferTask(cs, + ByteBuffer.wrap(largeBuffer))); + + accept(ss); + + } finally { + cs.close(); + } + + } finally { + + ss.close(); + + } + + } + + /** + * The use of threaded tasks in the send/receive service makes it difficult to + * observer the socket state changes. + * + * So let's begin by writing some tests over the raw sockets. + * + * Note that connecting and then immediately closing the socket is perfectly okay. + * ...with an accept followed by a read() of -1 on the returned Socket stream. + * + * @throws IOException + * @throws InterruptedException + */ + public void testDirectSockets() throws IOException, InterruptedException { + + // The payload size that we will use. + final int DATA_LEN = 200; + + final Random r = new Random(); + final byte[] data = new byte[DATA_LEN]; + r.nextBytes(data); + final byte[] dst = new byte[DATA_LEN]; + + // The server side receive buffer size (once we open the server socket). + int receiveBufferSize = -1; + + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + // First our ServerSocket + final ServerSocket ss = new ServerSocket(); + try { + + assertTrue(ss.getChannel() == null); + + // bind the server socket to the port. + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); + + // figure out the receive buffer size on the server socket. + receiveBufferSize = ss.getReceiveBufferSize(); + + if (log.isInfoEnabled()) + log.info("receiveBufferSize=" + receiveBufferSize + + ", payloadSize=" + DATA_LEN); + + if (receiveBufferSize < DATA_LEN) { + + fail("Service socket receive buffer is smaller than test payload size: receiveBufferSize=" + + receiveBufferSize + ", payloadSize=" + DATA_LEN); + + } + + { + /* + * InputStream for server side of socket connection - set below and + * then reused outside of the try/finally block. + */ + InputStream instr = null; + + // Now the first Client SocketChannel + final SocketChannel cs1 = SocketChannel.open(); + try { + + /* + * Note: true if connection made. false if connection in + * progress. + */ + final boolean immediate1 = cs1.connect(serverAddr); + if (!immediate1) { + if (!cs1.finishConnect()) { + fail("Did not connect?"); + } + } + + assertTrue(ss.getChannel() == null); + + /* + * We are connected. + */ + + final ByteBuffer src = ByteBuffer.wrap(data); + + // Write some data on the client socket. + cs1.write(src); + + /* + * Accept client's connection on server (after connect and + * write). + */ + final Socket readSckt1 = accept(ss); + + // Stream to read the data from the socket on the server + // side. + instr = readSckt1.getInputStream(); + + // and read the data + instr.read(dst); + + // confirming the read is correct + assertTrue(BytesUtil.bytesEqual(data, dst)); + + assertTrue(ss.getChannel() == null); + + /* + * Attempting to read more returns ZERO because there is + * nothing in the buffer and the connection is still open on + * the client side. + * + * Note: instr.read(buf) will BLOCK until the data is + * available, the EOF is detected, or an exception is + * thrown. + */ + assertEquals(0, instr.available()); + // assertEquals(0, instr.read(dst)); + + /* + * Now write some more data into the channel and *then* + * close it. + */ + cs1.write(ByteBuffer.wrap(data)); + + // close the client side of the socket + cs1.close(); + + // The server side of client connection is still open. + assertTrue(readSckt1.isConnected()); + assertFalse(readSckt1.isClosed()); + + /* + * Now try writing some more data. This should be disallowed + * since we closed the client side of the socket. + */ + try { + cs1.write(ByteBuffer.wrap(data)); + fail("Expected closed channel exception"); + } catch (ClosedChannelException e) { + // expected + } + + /* + * Since we closed the client side of the socket, when we + * try to read more data on the server side of the + * connection. The data that we already buffered is still + * available on the server side of the socket. + */ + { + // the already buffered data should be available. + final int rdlen = instr.read(dst); + assertEquals(DATA_LEN, rdlen); + assertTrue(BytesUtil.bytesEqual(data, dst)); + } + + /* + * We have drained the buffered data. There is no more + * buffered data and client side is closed, so an attempt to + * read more data on the server side of the socket will + * return EOF (-1). + */ + assertEquals(-1, instr.read(dst)); // read EOF + + // if so then should we explicitly close its socket? + readSckt1.close(); + assertTrue(readSckt1.isClosed()); + + /* + * Still reports EOF after the accepted server socket is + * closed. + */ + assertEquals(-1, instr.read(dst)); + + assertFalse(ss.isClosed()); + assertTrue(ss.getChannel() == null); + + } finally { + cs1.close(); + } + + // failing to read from original stream + final int nrlen = instr.read(dst); + assertEquals(-1, nrlen); + } + + /* + * Now open a new client Socket and connect to the server. + */ + final SocketChannel cs2 = SocketChannel.open(); + try { + + // connect to the server socket again. + final boolean immediate2 = cs2.connect(serverAddr); + if (!immediate2) { + if (!cs2.finishConnect()) { + fail("Did not connect?"); + } + } + + // Now server should accept the new client connection + final Socket s2 = accept(ss); + + // Client writes to the SocketChannel + final int wlen = cs2.write(ByteBuffer.wrap(data)); + assertEquals(DATA_LEN, wlen); // verify data written. + + // but succeeding to read from the new Socket + final InputStream instr2 = s2.getInputStream(); + instr2.read(dst); + assertTrue(BytesUtil.bytesEqual(data, dst)); + + /* + * Question: Can a downstream close be detected upstream? + * + * Answer: No. Closing the server socket does not tell the + * client that the socket was closed. + */ + { + // close server side input stream. + instr2.close(); + + // but the client still thinks its connected. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + // close server stocket. + s2.close(); + + // client still thinks it is connected after closing server + // socket. + assertTrue(cs2.isOpen()); + + // Does the client believe it is still open after a brief + // sleep? + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + + } + + /* + * Now write some more to the socket. We have closed the + * accepted connection on the server socket. Our observations + * show that the 1st write succeeds. The second write then fails + * with 'IOException: "Broken pipe"' + * + * The server socket is large (256k). We are not filling it up, + * but the 2nd write always fails. Further, the client never + * believes that the connection is closed until the 2nd write, + */ + { + final int writeSize = 1; + int nwritesOk = 0; + long nbytesReceived = 0L; + while (true) { + try { + // write a payload. + final int wlen2 = cs2.write(ByteBuffer.wrap(data, + 0, writeSize)); + // if write succeeds, should have written all bytes. + assertEquals(writeSize, wlen2); + nwritesOk++; + nbytesReceived += wlen2; + // does the client think the connection is still open? + assertTrue(cs2.isOpen()); // yes. + Thread.sleep(1000); + assertTrue(cs2.isOpen()); // yes. + } catch (IOException ex) { + if (log.isInfoEnabled()) + log.info("Expected exception: nwritesOk=" + + nwritesOk + ", nbytesReceived=" + + nbytesReceived + ", ex=" + ex); + break; + } + } + } + + /* + * Having closed the input, without a new connect request we + * should not be able to accept the new write since the data + * were written on a different client connection. + */ + try { + final Socket s3 = accept(ss); + fail("Expected timeout failure"); + } catch (AssertionFailedError afe) { + // expected + } + + } finally { + cs2.close(); + } + + } finally { + ss.close(); + } + + } + + /** + * Confirms that multiple clients can communicate with same Server + * + * @throws IOException + */ + public void testMultipleClients() throws IOException { + + // The payload size that we will use. + final int DATA_LEN = 200; + final Random r = new Random(); + final byte[] data = new byte[DATA_LEN]; + r.nextBytes(data); + + final int nclients = 10; + + final ArrayList<SocketChannel> clients = new ArrayList<SocketChannel>(); + final ArrayList<Socket> sockets = new ArrayList<Socket>(); + + final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); + + final ServerSocket ss = new ServerSocket(); + try { + + // bind the ServerSocket to the specified port. + ss.bind(serverAddr); + + assertTrue(ss.getChannel() == null); + + final int receiveBufferSize = ss.getReceiveBufferSize(); + + // Make sure that we have enough room to receive all client writes + // before draining any of them. + assertTrue(DATA_LEN * nclients <= receiveBufferSize); + + assertNoTimeout(10, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + + for (int c = 0; c < nclients; c++) { + + // client connects to server. + final SocketChannel cs = SocketChannel.open(); + cs.connect(serverAddr); + clients.add(cs); + + // accept connection on server. + sockets.add(ss.accept()); + + // write to each SocketChannel (after connect/accept) + cs.write(ByteBuffer.wrap(data)); + } + + return null; + + } + + }); + + /* + * Now read from all Sockets accepted on the server. + * + * Note: This is a simple loop, not a parallel read. The same buffer + * is reused on each iteration. + */ + { + + final byte[] dst = new byte[DATA_LEN]; + + for (Socket s : sockets) { + + assertFalse(s.isClosed()); + + final InputStream instr = s.getInputStream(); + + assertFalse(-1 == instr.read(dst)); // doesn't return -1 + + assertTrue(BytesUtil.bytesEqual(data, dst)); + + // Close each Socket to ensure it is different + s.close(); + + assertTrue(s.isClosed()); + + } + + } + + } finally { + + // ensure client side connections are closed. + for (SocketChannel ch : clients) { + if (ch != null) + ch.close(); + } + + // ensure server side connections are closed. + for (Socket s : sockets) { + if (s != null) + s.close(); + } + + // close the server socket. + ss.close(); + + } + + } + + /** wrap the ServerSocket accept with a timeout check. */ + private Socket accept(final ServerSocket ss) { + + final AtomicReference<Socket> av = new AtomicReference<Socket>(); + + assertNoTimeout(1, TimeUnit.SECONDS, new Callable<Void>() { + + @Override + public Void call() throws Exception { + + av.set(ss.accept()); + + return null; + } + }); + + return av.get(); + } + + /** + * Fail the test if the {@link Callable} completes before the specified + * timeout. + * + * @param timeout + * @param unit + * @param callable + */ + private void assertTimeout(final long timeout, final TimeUnit unit, + final Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + final Future<Void> ret = es.submit(callable); + final long begin = System.currentTimeMillis(); + try { + // await Future with timeout. + ret.get(timeout, unit); + final long elapsed = System.currentTimeMillis() - begin; + fail("Expected timeout: elapsed=" + elapsed + "ms, timeout=" + + timeout + " " + unit); + } catch (TimeoutException e) { + // that is expected + final long elapsed = System.currentTimeMillis() - begin; + if (log.isInfoEnabled()) + log.info("timeout after " + elapsed + "ms"); + return; + } catch (Exception e) { + final long elapsed = System.currentTimeMillis() - begin; + fail("Expected timeout: elapsed=" + elapsed + ", timeout=" + + timeout + " " + unit, e); + } finally { + log.warn("Cancelling task - should interrupt accept()"); + ret.cancel(true/* mayInterruptIfRunning */); + es.shutdown(); + } + } + + /** + * Throws {@link AssertionFailedError} if the {@link Callable} does not + * succeed within the timeout. + * + * @param timeout + * @param unit + * @param callable + * + * @throws AssertionFailedError + * if the {@link Callable} does not succeed within the timeout. + * @throws AssertionFailedError + * if the {@link Callable} fails. + */ + private void assertNoTimeout(final long timeout, final TimeUnit unit, + final Callable<Void> callable) { + final ExecutorService es = Executors.newSingleThreadExecutor(); + try { + final Future<Void> ret = es.submit(callable); + ret.get(timeout, unit); + } catch (TimeoutException e) { + fail("Unexpected timeout"); + } catch (Exception e) { + fail("Unexpected Exception", e); + } finally { + es.shutdown(); + } + } + + /** + * Task writes the data on the client {@link SocketChannel}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private static class WriteBufferTask implements Callable<Void> { + + final private ByteBuffer buf; + final private SocketChannel cs; + + public WriteBufferTask(final SocketChannel cs, final ByteBuffer buf) { + this.cs = cs; + this.buf = buf; + } + + @Override + public Void call() throws Exception { + cs.write(buf); + return null; + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-19 22:11:18
|
Revision: 7687 http://bigdata.svn.sourceforge.net/bigdata/?rev=7687&view=rev Author: thompsonbry Date: 2013-12-19 22:11:12 +0000 (Thu, 19 Dec 2013) Log Message: ----------- removed java 7 specific code in test suite./ Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-19 22:09:49 UTC (rev 7686) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/ha/pipeline/TestSocketsDirect.java 2013-12-19 22:11:12 UTC (rev 7687) @@ -28,7 +28,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketOption; +//import java.net.SocketOption; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; @@ -76,74 +76,75 @@ super(name); } - /** - * Writes out the available options for the client and server socket. - * - * @throws IOException - */ - public void testDirectSockets_options() throws IOException { - - // Get a socket addresss for an unused port. - final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); - - // First our ServerSocketChannel - final ServerSocketChannel ssc = ServerSocketChannel.open(); - try { - - // bind the ServerSocket to the specified port. - ssc.bind(serverAddr); - - // Now the first Client SocketChannel - final SocketChannel cs = SocketChannel.open(); - try { - /* - * Note: true if connection made. false if connection in - * progress. - */ - final boolean immediate = cs.connect(serverAddr); - if (!immediate) { - // Did not connect immediately, so finish connect now. - if (!cs.finishConnect()) { - fail("Did not connect."); - } - } - - /* - * Write out the client socket options. - */ - log.info("Client:: isOpen=" + cs.isOpen()); - log.info("Client:: isBlocking=" + cs.isBlocking()); - log.info("Client:: isRegistered=" + cs.isRegistered()); - log.info("Client:: isConnected=" + cs.isConnected()); - log.info("Client:: isConnectionPending=" - + cs.isConnectionPending()); - for (SocketOption<?> opt : cs.supportedOptions()) { - log.info("Client:: " + opt + " := " + cs.getOption(opt)); - } - - /* - * Note: We need to use ServerSocketChannel.open() to get access - * to the stream oriented listening interface for the server - * side of the socket. - */ - log.info("Server:: isOpen=" + ssc.isOpen()); - log.info("Server:: isBlocking=" + ssc.isBlocking()); - log.info("Server:: isRegistered=" + ssc.isRegistered()); - for (SocketOption<?> opt : ssc.supportedOptions()) { - log.info("Server:: " + opt + " := " + cs.getOption(opt)); - } - - } finally { - cs.close(); - } - - } finally { - - ssc.close(); - - } - - } + // FIXME RESTORE WHEN WE MOVE TO JAVA 7. +// /** +// * Writes out the available options for the client and server socket. +// * +// * @throws IOException +// */ +// public void testDirectSockets_options() throws IOException { +// +// // Get a socket addresss for an unused port. +// final InetSocketAddress serverAddr = new InetSocketAddress(getPort(0)); +// +// // First our ServerSocketChannel +// final ServerSocketChannel ssc = ServerSocketChannel.open(); +// try { +// +// // bind the ServerSocket to the specified port. +// ssc.bind(serverAddr); +// +// // Now the first Client SocketChannel +// final SocketChannel cs = SocketChannel.open(); +// try { +// /* +// * Note: true if connection made. false if connection in +// * progress. +// */ +// final boolean immediate = cs.connect(serverAddr); +// if (!immediate) { +// // Did not connect immediately, so finish connect now. +// if (!cs.finishConnect()) { +// fail("Did not connect."); +// } +// } +// +// /* +// * Write out the client socket options. +// */ +// log.info("Client:: isOpen=" + cs.isOpen()); +// log.info("Client:: isBlocking=" + cs.isBlocking()); +// log.info("Client:: isRegistered=" + cs.isRegistered()); +// log.info("Client:: isConnected=" + cs.isConnected()); +// log.info("Client:: isConnectionPending=" +// + cs.isConnectionPending()); +// for (SocketOption<?> opt : cs.supportedOptions()) { +// log.info("Client:: " + opt + " := " + cs.getOption(opt)); +// } +// +// /* +// * Note: We need to use ServerSocketChannel.open() to get access +// * to the stream oriented listening interface for the server +// * side of the socket. +// */ +// log.info("Server:: isOpen=" + ssc.isOpen()); +// log.info("Server:: isBlocking=" + ssc.isBlocking()); +// log.info("Server:: isRegistered=" + ssc.isRegistered()); +// for (SocketOption<?> opt : ssc.supportedOptions()) { +// log.info("Server:: " + opt + " := " + cs.getOption(opt)); +// } +// +// } finally { +// cs.close(); +// } +// +// } finally { +// +// ssc.close(); +// +// } +// +// } /** * Simple test of connecting to a server socket and the failure to connect This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |