From: <tho...@us...> - 2013-12-16 16:54:44
|
Revision: 7660 http://bigdata.svn.sourceforge.net/bigdata/?rev=7660&view=rev Author: thompsonbry Date: 2013-12-16 16:54:38 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Pushed down a read() method that closes the upstream socket channel if there is an error reading from A. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -698,7 +698,7 @@ } - public void close() throws IOException { + private void close() throws IOException { if (log.isInfoEnabled()) log.info("Closing client connection"); @@ -718,7 +718,32 @@ // } // } } + + } + /** + * Wraps {@link SocketChannel#read(ByteBuffer)} to test for an EOF and + * calls {@link #close()} if an EOF is reached. + * + * @param dst + * The destination buffer. + * + * @return The #of bytes read. + * + * @throws IOException + */ + private int read(final ByteBuffer dst) throws IOException { + + final int rdlen = client.read(dst); + + if (rdlen == -1) { + + close(); + + } + + return rdlen; + } /** @@ -726,7 +751,7 @@ * control back to the leader. The leader will then handle this in * {@link QuorumPipelineImpl}'s retrySend() method. */ - public void checkFirstCause() throws RuntimeException { + private void checkFirstCause() throws RuntimeException { final Throwable t = firstCause.getAndSet(null); @@ -1162,7 +1187,7 @@ } - final int rdlen = client.client.read(localBuffer); + final int rdlen = client.read(localBuffer); if (log.isTraceEnabled()) log.trace("Read " + rdlen + " bytes with " @@ -1381,7 +1406,7 @@ markerBB.limit(remtok); markerBB.position(0); - final int rdLen = client.client.read(markerBB); + final int rdLen = client.read(markerBB); if (rdLen == -1) { throw new IOException("EOF: nreads=" + nreads + ", bytesRead=" + bytesRead); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -587,8 +587,8 @@ log.error("socketChannel=" + sc - + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() - + ", sc.isConnected()" + sc.isConnected()) + + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + + ", sc.isConnected()=" + sc.isConnected()) + ", cause=" + t, t); if (t instanceof Exception) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |