From: <tho...@us...> - 2013-12-16 15:56:45
|
Revision: 7658 http://bigdata.svn.sourceforge.net/bigdata/?rev=7658&view=rev Author: thompsonbry Date: 2013-12-16 15:56:36 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. more logging. Also, Client now blocks until connected in the ctor. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:22:25 UTC (rev 7657) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) @@ -656,7 +656,9 @@ */ client = server.accept(); client.configureBlocking(false); - + if (!client.finishConnect()) + throw new IOException("Upstream client not connected"); + clientSelector = Selector.open(); // must register OP_READ selector on the new client @@ -682,6 +684,16 @@ } + @Override + public String toString() { + + return super.toString() // + + "{client.isOpen()=" + client.isOpen()// + + ",clientSelector.isOpen=" + clientSelector.isOpen()// + + "}"; + + } + public void close() throws IOException { if (log.isInfoEnabled()) @@ -929,9 +941,8 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - - log.error(t, t); - + log.error("client=" + clientRef.get() + ", cause=" + t, t); + if (t instanceof Exception) throw (Exception) t; @@ -986,8 +997,11 @@ // // } - if (client == null || !client.client.isOpen()) { + if (client == null || !client.client.isOpen() + || !client.clientSelector.isOpen()) { + log.warn("Re-opening upstream client connection"); + final Client tmp = clientRef.getAndSet(null); if (tmp != null) { // Close existing connection if not open. @@ -1137,8 +1151,11 @@ iter.next(); iter.remove(); - if (!drainUtil.foundMarker()) { + if (!drainUtil.findMarker()) { + + // continue to drain until the marker. continue; + } final int rdlen = client.client.read(localBuffer); @@ -1341,8 +1358,15 @@ * could read large amounts of data only a few bytes at a time, however * this is not in reality a significant overhead. */ - boolean foundMarker() throws IOException { + boolean findMarker() throws IOException { + if (markerIndex == marker.length) { + + // Marker already found for this payload. + return true; + + } + if (log.isDebugEnabled()) log.debug("Looking for token, " + BytesUtil.toHexString(marker) + ", reads: " + nreads); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |