From: <tho...@us...> - 2013-12-16 14:54:49
|
Revision: 7655 http://bigdata.svn.sourceforge.net/bigdata/?rev=7655&view=rev Author: thompsonbry Date: 2013-12-16 14:54:41 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn. - HASendService : log level changes only. - HAReceiveService: (a) lifted the heap byte[] for amortizing checksum data xfer costs out of the ReadTask to reduce heap churn; (b) logging changes, especially for DrainToMarker. @see #724 Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:08:57 UTC (rev 7654) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 14:54:41 UTC (rev 7655) @@ -224,6 +224,15 @@ */ private final AtomicReference<InetSocketAddress> addrNextRef; + /** + * Private buffer used to incrementally compute the checksum of the data as + * it is received. The purpose of this buffer is to take advantage of more + * efficient bulk copy operations from the NIO buffer into a local byte[] on + * the Java heap against which we then track the evolving checksum of the + * data. + */ + private final byte[] heapBuffer = new byte[512]; + /* * Note: toString() implementation is non-blocking. */ @@ -533,8 +542,8 @@ // Setup task to read buffer for that message. readFuture = waitFuture = new FutureTask<Void>( new ReadTask<M>(server, clientRef, msg, - localBuffer, sendService, addrNextRef, - callback)); + localBuffer, heapBuffer, sendService, + addrNextRef, callback)); // [waitFuture] is available for receiveData(). futureReady.signalAll(); @@ -568,7 +577,7 @@ try { readFuture.get(); } catch (Exception e) { - log.warn(e, e); + log.error(e, e); } lock.lockInterruptibly(); @@ -757,21 +766,10 @@ private final Adler32 chk = new Adler32(); + private final byte[] heapBuffer; + /** - * Private buffer used to incrementally compute the checksum of the data - * as it is received. The purpose of this buffer is to take advantage of - * more efficient bulk copy operations from the NIO buffer into a local - * byte[] on the Java heap against which we then track the evolving - * checksum of the data. * - * FIXME Why isn't this buffer scoped to the outer HAReceiveService? By - * being an inner class field, we allocate it once per payload - * received.... - */ - private final byte[] a = new byte[512]; - - /** - * * @param server * @param clientRef * The client socket, selector, etc. @@ -797,7 +795,8 @@ */ public ReadTask(final ServerSocketChannel server, final AtomicReference<Client> clientRef, final M message, - final ByteBuffer localBuffer, final HASendService downstream, + final ByteBuffer localBuffer, final byte[] heapBuffer, + final HASendService downstream, final AtomicReference<InetSocketAddress> addrNextRef, final IHAReceiveCallback<M> callback) { @@ -807,6 +806,8 @@ throw new IllegalArgumentException(); if (message == null) throw new IllegalArgumentException(); + if (heapBuffer == null) + throw new IllegalArgumentException(); if (localBuffer == null) throw new IllegalArgumentException(); if (downstream == null) @@ -816,6 +817,7 @@ this.clientRef = clientRef; this.message = message; this.localBuffer = localBuffer; + this.heapBuffer = heapBuffer; this.sendService = downstream; this.addrNextRef = addrNextRef; this.callback = callback; @@ -868,8 +870,10 @@ } /** - * Update the running checksum. - * + * Update the running checksum. This uses the {@link #heapBuffer} to + * amoritize the cost of the transfers for the incremental checksum + * maintenance. + * * @param rdlen * The #of bytes read in the last read from the socket into * the {@link #localBuffer}. @@ -885,21 +889,22 @@ // rewind to the first byte to be read. b.position(mark - rdlen); - for (int pos = mark - rdlen; pos < mark; pos += a.length) { + for (int pos = mark - rdlen; pos < mark; pos += heapBuffer.length) { // #of bytes to copy into the local byte[]. - final int len = Math.min(mark - pos, a.length); + final int len = Math.min(mark - pos, heapBuffer.length); // copy into Java heap byte[], advancing b.position(). - b.get(a, 0/* off */, len); + b.get(heapBuffer, 0/* off */, len); // update the running checksum. - chk.update(a, 0/* off */, len); + chk.update(heapBuffer, 0/* off */, len); } } + @Override public Void call() throws Exception { // awaitAccept(); @@ -1265,11 +1270,14 @@ final private ByteBuffer markerBB; final private Client client; + private boolean foundMarkerInInitialPosition = true; private int markerIndex = 0; - private int nmarkerreads = 0; + private int nreads = 0; private int nmarkerbytematches = 0; + private long bytesRead = 0L; DrainToMarkerUtil(final byte[] marker, final Client client) { + this.marker = marker; this.markerBuffer = marker == null ? null : new byte[marker.length]; this.markerBB = marker == null ? null : ByteBuffer @@ -1297,7 +1305,7 @@ if (log.isDebugEnabled()) log.debug("Looking for token, " + BytesUtil.toHexString(marker) - + ", reads: " + nmarkerreads); + + ", reads: " + nreads); while (markerIndex < marker.length) { @@ -1306,10 +1314,23 @@ markerBB.position(0); final int rdLen = client.client.read(markerBB); + if (rdLen == -1) { + throw new IOException("EOF: nreads=" + nreads + + ", bytesRead=" + bytesRead); + } + nreads++; + bytesRead += rdLen; for (int i = 0; i < rdLen; i++) { if (markerBuffer[i] != marker[markerIndex]) { - if (nmarkerreads < 2) - log.warn("TOKEN MISMATCH"); + if (foundMarkerInInitialPosition) { + /* + * The marker was not found in the initial position + * in the stream. We are going to drain data until + * we can match the marker. + */ + foundMarkerInInitialPosition = false; + log.error("Marker not found: skipping"); + } markerIndex = 0; if (markerBuffer[i] == marker[markerIndex]) { markerIndex++; @@ -1320,21 +1341,25 @@ } } - nmarkerreads++; - if (nmarkerreads % 10000 == 0) { + if (nreads % 10000 == 0) { if (log.isDebugEnabled()) - log.debug("...still looking, reads: " + nmarkerreads); + log.debug("...still looking: reads=" + nreads + + ", bytesRead=" + bytesRead); } } - if (markerIndex != marker.length) { // not sufficient data ready + if (markerIndex != marker.length) { + /* + * Partial marker has been read, but we do not have enough data + * for a full match yet. + */ if (log.isDebugEnabled()) log.debug("Not found token yet!"); return false; } else { if (log.isDebugEnabled()) - log.debug("Found token after " + nmarkerreads + log.debug("Found token after " + nreads + " token reads and " + nmarkerbytematches + " byte matches"); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 14:08:57 UTC (rev 7654) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 14:54:41 UTC (rev 7655) @@ -247,8 +247,8 @@ * {@link HAReceiveService}. */ synchronized public void terminate() { - if (log.isDebugEnabled()) - log.debug(toString() + " : stopping."); + if (log.isInfoEnabled()) + log.info(toString() + " : stopping."); final ExecutorService tmp = executorRef.getAndSet(null); if (tmp == null) { // Not running. @@ -446,13 +446,13 @@ socketChannel.set(sc = openChannel(addrNext.get())); - if (log.isTraceEnabled()) - log.trace("Opened channel on try: " + tryno); + if (log.isInfoEnabled()) + log.info("Opened channel on try: " + tryno); } catch (IOException e) { if (log.isInfoEnabled()) - log.info("Failed to open channel on try: " + tryno); + log.info("Failed to open channel on try: " + tryno); if (tryno < retryMillis.length) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 16:19:37
|
Revision: 7659 http://bigdata.svn.sourceforge.net/bigdata/?rev=7659&view=rev Author: thompsonbry Date: 2013-12-16 16:19:27 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Sync to martyn with more logging. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; @@ -686,9 +687,12 @@ @Override public String toString() { - + final Socket s = client.socket(); return super.toString() // + "{client.isOpen()=" + client.isOpen()// + + ",client.isConnected()=" + client.isConnected()// + + ",socket.isInputShutdown()=" + + (s == null ? "N/A" : s.isInputShutdown())// + ",clientSelector.isOpen=" + clientSelector.isOpen()// + "}"; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 15:56:36 UTC (rev 7658) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) @@ -553,21 +553,58 @@ */ protected /*static*/ class IncSendTask implements Callable<Void> { - private final ByteBuffer data; - private final byte[] marker; + private final ByteBuffer data; + private final byte[] marker; - public IncSendTask(final ByteBuffer data, final byte[] marker) { + public IncSendTask(final ByteBuffer data, final byte[] marker) { - if (data == null) - throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException(); - this.data = data; - this.marker = marker; - } + this.data = data; + this.marker = marker; + } - @Override - public Void call() throws Exception { + @Override + public Void call() throws Exception { + try { + + return doInnerCall(); + + } catch (Throwable t) { + + /* + * Log anything thrown out of this task. We check the Future of + * this task, but that does not tell us what exception is thrown + * in the Thread executing the task when the Future is cancelled + * and that thread is interrupted. In particular, we are looking + * for the InterruptedException, ClosedByInterruptException, + * etc. + */ + + final SocketChannel sc = socketChannel.get(); + + log.error("socketChannel=" + + sc + + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() + + ", sc.isConnected()" + sc.isConnected()) + + ", cause=" + t, t); + + if (t instanceof Exception) + throw (Exception) t; + + if (t instanceof RuntimeException) + throw (RuntimeException) t; + + throw new RuntimeException(t); + + } + + } + + private Void doInnerCall() throws Exception { + // defer until we actually run. final SocketChannel socketChannel = reopenChannel(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-16 16:54:44
|
Revision: 7660 http://bigdata.svn.sourceforge.net/bigdata/?rev=7660&view=rev Author: thompsonbry Date: 2013-12-16 16:54:38 +0000 (Mon, 16 Dec 2013) Log Message: ----------- Pushed down a read() method that closes the upstream socket channel if there is an error reading from A. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -698,7 +698,7 @@ } - public void close() throws IOException { + private void close() throws IOException { if (log.isInfoEnabled()) log.info("Closing client connection"); @@ -718,7 +718,32 @@ // } // } } + + } + /** + * Wraps {@link SocketChannel#read(ByteBuffer)} to test for an EOF and + * calls {@link #close()} if an EOF is reached. + * + * @param dst + * The destination buffer. + * + * @return The #of bytes read. + * + * @throws IOException + */ + private int read(final ByteBuffer dst) throws IOException { + + final int rdlen = client.read(dst); + + if (rdlen == -1) { + + close(); + + } + + return rdlen; + } /** @@ -726,7 +751,7 @@ * control back to the leader. The leader will then handle this in * {@link QuorumPipelineImpl}'s retrySend() method. */ - public void checkFirstCause() throws RuntimeException { + private void checkFirstCause() throws RuntimeException { final Throwable t = firstCause.getAndSet(null); @@ -1162,7 +1187,7 @@ } - final int rdlen = client.client.read(localBuffer); + final int rdlen = client.read(localBuffer); if (log.isTraceEnabled()) log.trace("Read " + rdlen + " bytes with " @@ -1381,7 +1406,7 @@ markerBB.limit(remtok); markerBB.position(0); - final int rdLen = client.client.read(markerBB); + final int rdLen = client.read(markerBB); if (rdLen == -1) { throw new IOException("EOF: nreads=" + nreads + ", bytesRead=" + bytesRead); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:19:27 UTC (rev 7659) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) @@ -587,8 +587,8 @@ log.error("socketChannel=" + sc - + (sc == null ? "" : ", sc.isOpen()" + sc.isOpen() - + ", sc.isConnected()" + sc.isConnected()) + + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + + ", sc.isConnected()=" + sc.isConnected()) + ", cause=" + t, t); if (t instanceof Exception) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |