From: <tho...@us...> - 2013-12-16 18:06:27
|
Revision: 7661 http://bigdata.svn.sourceforge.net/bigdata/?rev=7661&view=rev Author: thompsonbry Date: 2013-12-16 18:06:19 +0000 (Mon, 16 Dec 2013) Log Message: ----------- QuorumPipelineImpl - just javadoc. HASendService - transparency for the HASendState if task fails. Also includes hack for small send chunks - must be enabled in the code. HASendState - added method to decode the marker. HAReceiveService - added decode of the marker and more information when closing the client connection. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1785,7 +1785,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); + futSnd.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload. } } catch (Throwable t) { launderPipelineException(true/* isLeader */, member, t); @@ -1795,10 +1795,14 @@ } /** - * Launder an exception thrown during pipeline replication. + * Launder an exception thrown during pipeline replication. + * * @param isLeader + * <code>true</code> iff this service is the quorum leader. * @param member + * The {@link QuorumMember} for this service. * @param t + * The throwable. */ static private void launderPipelineException(final boolean isLeader, final QuorumMember<?> member, final Throwable t) { @@ -2155,7 +2159,7 @@ } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); + futRec.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload? } } catch (Throwable t) { launderPipelineException(false/* isLeader */, member, t); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1,5 +1,6 @@ package com.bigdata.ha.msg; +import java.io.DataInput; import java.io.DataOutput; import java.io.Externalizable; import java.io.IOException; @@ -7,6 +8,7 @@ import java.io.ObjectOutput; import java.util.UUID; +import com.bigdata.io.DataInputBuffer; import com.bigdata.io.DataOutputBuffer; import com.bigdata.rawstore.Bytes; @@ -162,6 +164,12 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + readExternal2(in); + + } + + private void readExternal2(final DataInput in) throws IOException { + final short version = in.readShort(); if (version != VERSION0) @@ -185,6 +193,38 @@ } + /** + * Decode the value returned by {@link #getMarker()}. This has the magic + * followed by {@link #writeExternal2(DataOutput)}. It does not have the + * object serialization metadata. + * + * @param a + * The encoded marker. + * + * @return The decoded marker -or- <code>null</code> iff the argument is + * <code>null</code>. + */ + static public IHASendState decode(final byte[] a) throws IOException { + + if (a == null) + return null; + + final HASendState tmp = new HASendState(); + + final DataInputBuffer dis = new DataInputBuffer(a); + + final long magic = dis.readLong(); + + if (magic != MAGIC) + throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" + + magic); + + tmp.readExternal2(dis); + + return tmp; + + } + @Override public void writeExternal(final ObjectOutput out) throws IOException { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -55,6 +55,7 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.HAMessageWrapper; +import com.bigdata.ha.msg.HASendState; import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -665,7 +666,10 @@ // must register OP_READ selector on the new client clientKey = client.register(clientSelector, SelectionKey.OP_READ); - + + if (log.isInfoEnabled()) + log.info("Accepted new connection"); + // this.downstream = downstream; // // // Prepare downstream (if any) for incremental transfers @@ -687,7 +691,9 @@ @Override public String toString() { + final Socket s = client.socket(); + return super.toString() // + "{client.isOpen()=" + client.isOpen()// + ",client.isConnected()=" + client.isConnected()// @@ -701,7 +707,7 @@ private void close() throws IOException { if (log.isInfoEnabled()) - log.info("Closing client connection"); + log.info("Closing client connection: " + this); clientKey.cancel(); @@ -970,7 +976,14 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - log.error("client=" + clientRef.get() + ", cause=" + t, t); + log.error( + "client=" + + clientRef.get() + + ", msg=" + + message + + ", marker=" + + HASendState.decode(message.getHASendState() + .getMarker()) + ", cause=" + t, t); if (t instanceof Exception) throw (Exception) t; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.HASendState; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Haltable; @@ -589,7 +590,8 @@ + sc + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + ", sc.isConnected()=" + sc.isConnected()) - + ", cause=" + t, t); + + ", marker=" + HASendState.decode(marker) + ", cause=" + + t, t); if (t instanceof Exception) throw (Exception) t; @@ -668,8 +670,28 @@ * buffer. */ - final int nbytes = socketChannel.write(data); + final int nbytes; + if (false&&log.isDebugEnabled()) { // add debug latency + final int limit = data.limit(); + if (data.position() < (limit - 50000)) { + data.limit(data.position() + 50000); + } + nbytes = socketChannel.write(data); + data.limit(limit); + nwritten += nbytes; + log.debug("Written " + nwritten + " of total " + + data.limit()); + + if (nwritten < limit) { + Thread.sleep(1); + } + } else { + + nbytes = socketChannel.write(data); + nwritten += nbytes; + } + nwritten += nbytes; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |