From: <tho...@us...> - 2013-12-16 15:01:54
|
Revision: 7656 http://bigdata.svn.sourceforge.net/bigdata/?rev=7656&view=rev Author: thompsonbry Date: 2013-12-16 15:01:47 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. Added more logging to the HAReceiveService. Suspect java is closing the client channel when we cancel the readFuture. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:54:41 UTC (rev 7655) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:01:47 UTC (rev 7656) @@ -683,9 +683,16 @@ } public void close() throws IOException { + + if (log.isInfoEnabled()) + log.info("Closing client connection"); + clientKey.cancel(); + try { + client.close(); + } finally { // try { clientSelector.close(); @@ -695,6 +702,7 @@ // } // } } + } /** @@ -1134,7 +1142,8 @@ } // while( rem > 0 && !EOS ) if (localBuffer.position() != message.getSize()) - throw new IOException("Receive length error: localBuffer.pos=" + throw new IOException("Receive length error: rem=" + rem + + ", EOS=" + EOS + ", localBuffer.pos=" + localBuffer.position() + ", message.size=" + message.getSize()); @@ -1651,7 +1660,8 @@ if (oldClient != null) { -// log.warn("Cleared Client reference."); + if (log.isInfoEnabled()) + log.info("Closing client connection"); try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 15:22:36
|
Revision: 7657 http://bigdata.svn.sourceforge.net/bigdata/?rev=7657&view=rev Author: thompsonbry Date: 2013-12-16 15:22:25 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to Martyn with logging to let us observe the thrown cause in the ReadTask when the Future of that task is cancelled. See #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:01:47 UTC (rev 7656) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:22:25 UTC (rev 7657) @@ -914,7 +914,38 @@ @Override public Void call() throws Exception { + + try { + + return doInnerCall(); + + } catch (Throwable t) { + + /* + * Log anything thrown out of this task. We check the Future of + * this task, but that does not tell us what exception is thrown + * in the Thread executing the task when the Future is cancelled + * and that thread is interrupted. In particular, we are looking + * for the InterruptedException, ClosedByInterruptException, + * etc. + */ + + log.error(t, t); + + if (t instanceof Exception) + throw (Exception) t; + + if (t instanceof RuntimeException) + throw (RuntimeException) t; + + throw new RuntimeException(t); + + } + + } + private Void doInnerCall() throws Exception { + // awaitAccept(); // // /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 15:56:45
|
Revision: 7658 http://bigdata.svn.sourceforge.net/bigdata/?rev=7658&view=rev Author: thompsonbry Date: 2013-12-16 15:56:36 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. more logging. Also, Client now blocks until connected in the ctor. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:22:25 UTC (rev 7657) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) @@ -656,7 +656,9 @@ */ client = server.accept(); client.configureBlocking(false); - + if (!client.finishConnect()) + throw new IOException("Upstream client not connected"); + clientSelector = Selector.open(); // must register OP_READ selector on the new client @@ -682,6 +684,16 @@ } + @Override + public String toString() { + + return super.toString() // + + "{client.isOpen()=" + client.isOpen()// + + ",clientSelector.isOpen=" + clientSelector.isOpen()// + + "}"; + + } + public void close() throws IOException { if (log.isInfoEnabled()) @@ -929,9 +941,8 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - - log.error(t, t); - + log.error("client=" + clientRef.get() + ", cause=" + t, t); + if (t instanceof Exception) throw (Exception) t; @@ -986,8 +997,11 @@ // // } - if (client == null || !client.client.isOpen()) { + if (client == null || !client.client.isOpen() + || !client.clientSelector.isOpen()) { + log.warn("Re-opening upstream client connection"); + final Client tmp = clientRef.getAndSet(null); if (tmp != null) { // Close existing connection if not open. @@ -1137,8 +1151,11 @@ iter.next(); iter.remove(); - if (!drainUtil.foundMarker()) { + if (!drainUtil.findMarker()) { + + // continue to drain until the marker. continue; + } final int rdlen = client.client.read(localBuffer); @@ -1341,8 +1358,15 @@ * could read large amounts of data only a few bytes at a time, however * this is not in reality a significant overhead. */ - boolean foundMarker() throws IOException { + boolean findMarker() throws IOException { + if (markerIndex == marker.length) { + + // Marker already found for this payload. + return true; + + } + if (log.isDebugEnabled()) log.debug("Looking for token, " + BytesUtil.toHexString(marker) + ", reads: " + nreads); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |