From: <tho...@us...> - 2013-12-16 16:19:37
|
Revision: 7659 http://bigdata.svn.sourceforge.net/bigdata/?rev=7659&view=rev Author: thompsonbry Date: 2013-12-16 16:19:27 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn with more logging. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; @@ -686,9 +687,12 @@ @Override public String toString() { - + final Socket s = client.socket(); return super.toString() // + "{client.isOpen()=" + client.isOpen()// + + ",client.isConnected()=" + client.isConnected()// + + ",socket.isInputShutdown()=" + + (s == null ? "N/A" : s.isInputShutdown())// + ",clientSelector.isOpen=" + clientSelector.isOpen()// + "}"; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -553,21 +553,58 @@ */ protected /*static*/ class IncSendTask implements Callable<Void> { - private final ByteBuffer data; - private final byte[] marker; + private final ByteBuffer data; + private final byte[] marker; - public IncSendTask(final ByteBuffer data, final byte[] marker) { + public IncSendTask(final ByteBuffer data, final byte[] marker) { - if (data == null) - throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException(); - this.data = data; - this.marker = marker; - } + this.data = data; + this.marker = marker; + } - @Override - public Void call() throws Exception { + @Override + public Void call() throws Exception { + try { + + return doInnerCall(); + + } catch (Throwable t) { + + /* + * Log anything thrown out of this task. We check the Future of + * this task, but that does not tell us what exception is thrown + * in the Thread executing the task when the Future is cancelled + * and that thread is interrupted. In particular, we are looking + * for the InterruptedException, ClosedByInterruptException, + * etc. + */ + + final SocketChannel sc = socketChannel.get(); + + log.error("socketChannel=" + + sc + + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() + + ", sc.isConnected()" + sc.isConnected()) + + ", cause=" + t, t); + + if (t instanceof Exception) + throw (Exception) t; + + if (t instanceof RuntimeException) + throw (RuntimeException) t; + + throw new RuntimeException(t); + + } + + } + + private Void doInnerCall() throws Exception { + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |