From: <tho...@us...> - 2013-12-16 18:06:27
|
Revision: 7661 http://bigdata.svn.sourceforge.net/bigdata/?rev=7661&view=rev Author: thompsonbry Date: 2013-12-16 18:06:19 +0000 (Mon, 16 Dec 2013) Log Message: ----------- QuorumPipelineImpl - just javadoc. HASendService - transparency for the HASendState if task fails. Also includes hack for small send chunks - must be enabled in the code. HASendState - added method to decode the marker. HAReceiveService - added decode of the marker and more information when closing the client connection. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1785,7 +1785,7 @@ } finally { // cancel the local Future. - futSnd.cancel(true/* mayInterruptIfRunning */); + futSnd.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload. } } catch (Throwable t) { launderPipelineException(true/* isLeader */, member, t); @@ -1795,10 +1795,14 @@ } /** - * Launder an exception thrown during pipeline replication. + * Launder an exception thrown during pipeline replication. + * * @param isLeader + * <code>true</code> iff this service is the quorum leader. * @param member + * The {@link QuorumMember} for this service. * @param t + * The throwable. */ static private void launderPipelineException(final boolean isLeader, final QuorumMember<?> member, final Throwable t) { @@ -2155,7 +2159,7 @@ } finally { // cancel the local Future. - futRec.cancel(true/* mayInterruptIfRunning */); + futRec.cancel(true/* mayInterruptIfRunning */); // FIXME Cancel hits wrong send() payload? } } catch (Throwable t) { launderPipelineException(false/* isLeader */, member, t); Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/msg/HASendState.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -1,5 +1,6 @@ package com.bigdata.ha.msg; +import java.io.DataInput; import java.io.DataOutput; import java.io.Externalizable; import java.io.IOException; @@ -7,6 +8,7 @@ import java.io.ObjectOutput; import java.util.UUID; +import com.bigdata.io.DataInputBuffer; import com.bigdata.io.DataOutputBuffer; import com.bigdata.rawstore.Bytes; @@ -162,6 +164,12 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + readExternal2(in); + + } + + private void readExternal2(final DataInput in) throws IOException { + final short version = in.readShort(); if (version != VERSION0) @@ -185,6 +193,38 @@ } + /** + * Decode the value returned by {@link #getMarker()}. This has the magic + * followed by {@link #writeExternal2(DataOutput)}. It does not have the + * object serialization metadata. + * + * @param a + * The encoded marker. + * + * @return The decoded marker -or- <code>null</code> iff the argument is + * <code>null</code>. + */ + static public IHASendState decode(final byte[] a) throws IOException { + + if (a == null) + return null; + + final HASendState tmp = new HASendState(); + + final DataInputBuffer dis = new DataInputBuffer(a); + + final long magic = dis.readLong(); + + if (magic != MAGIC) + throw new IOException("Bad magic: expected=" + MAGIC + ", actual=" + + magic); + + tmp.readExternal2(dis); + + return tmp; + + } + @Override public void writeExternal(final ObjectOutput out) throws IOException { Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -55,6 +55,7 @@ import com.bigdata.btree.BytesUtil; import com.bigdata.ha.QuorumPipelineImpl; import com.bigdata.ha.msg.HAMessageWrapper; +import com.bigdata.ha.msg.HASendState; import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; @@ -665,7 +666,10 @@ // must register OP_READ selector on the new client clientKey = client.register(clientSelector, SelectionKey.OP_READ); - + + if (log.isInfoEnabled()) + log.info("Accepted new connection"); + // this.downstream = downstream; // // // Prepare downstream (if any) for incremental transfers @@ -687,7 +691,9 @@ @Override public String toString() { + final Socket s = client.socket(); + return super.toString() // + "{client.isOpen()=" + client.isOpen()// + ",client.isConnected()=" + client.isConnected()// @@ -701,7 +707,7 @@ private void close() throws IOException { if (log.isInfoEnabled()) - log.info("Closing client connection"); + log.info("Closing client connection: " + this); clientKey.cancel(); @@ -970,7 +976,14 @@ * for the InterruptedException, ClosedByInterruptException, * etc. */ - log.error("client=" + clientRef.get() + ", cause=" + t, t); + log.error( + "client=" + + clientRef.get() + + ", msg=" + + message + + ", marker=" + + HASendState.decode(message.getHASendState() + .getMarker()) + ", cause=" + t, t); if (t instanceof Exception) throw (Exception) t; Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 16:54:38 UTC (rev 7660) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-16 18:06:19 UTC (rev 7661) @@ -39,6 +39,7 @@ import org.apache.log4j.Logger; +import com.bigdata.ha.msg.HASendState; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Haltable; @@ -589,7 +590,8 @@ + sc + (sc == null ? "" : ", sc.isOpen()=" + sc.isOpen() + ", sc.isConnected()=" + sc.isConnected()) - + ", cause=" + t, t); + + ", marker=" + HASendState.decode(marker) + ", cause=" + + t, t); if (t instanceof Exception) throw (Exception) t; @@ -668,8 +670,28 @@ * buffer. */ - final int nbytes = socketChannel.write(data); + final int nbytes; + if (false&&log.isDebugEnabled()) { // add debug latency + final int limit = data.limit(); + if (data.position() < (limit - 50000)) { + data.limit(data.position() + 50000); + } + nbytes = socketChannel.write(data); + data.limit(limit); + nwritten += nbytes; + log.debug("Written " + nwritten + " of total " + + data.limit()); + + if (nwritten < limit) { + Thread.sleep(1); + } + } else { + + nbytes = socketChannel.write(data); + nwritten += nbytes; + } + nwritten += nbytes; if (log.isTraceEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-12-18 15:21:45
|
Revision: 7674 http://bigdata.svn.sourceforge.net/bigdata/?rev=7674&view=rev Author: thompsonbry Date: 2013-12-18 15:21:37 +0000 (Wed, 18 Dec 2013) Log Message: ----------- Added logic in the resetPipeline implementation to await a pipeline change (up to a timeout) such that the problem service is no longer a neighbor of a given service. See #724. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/HAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + public class HAPipelineResetRequest implements IHAPipelineResetRequest { /** @@ -31,11 +33,16 @@ private static final long serialVersionUID = 1L; private long token; - - public HAPipelineResetRequest(final long token) { + private UUID problemServiceId; + private long timeoutNanos; + + public HAPipelineResetRequest(final long token, + final UUID problemServiceId, final long timeoutNanos) { this.token = token; + this.problemServiceId = problemServiceId; + this.timeoutNanos = timeoutNanos; } - + @Override public long token() { return token; @@ -43,7 +50,18 @@ @Override public String toString() { - return super.toString() + "{token=" + token + "}"; + return super.toString() + "{token=" + token + ", problemServiceId=" + + problemServiceId + ", timeoutNanos=" + timeoutNanos + "}"; } + + @Override + public UUID getProblemServiceId() { + return problemServiceId; + } + + @Override + public long getTimeoutNanos() { + return timeoutNanos; + } } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/IHAPipelineResetRequest.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -23,6 +23,8 @@ */ package com.bigdata.ha; +import java.util.UUID; + import com.bigdata.ha.msg.IHAMessage; /** @@ -36,5 +38,21 @@ * The quorum token in effect on the leader when this request was generated. */ long token(); + + /** + * The {@link UUID} of the service that the leader has forced from the + * pipeline + * + * @return The {@link UUID} of the problem service -or- <code>null</code> if + * the leader did not identify a problem service. + */ + UUID getProblemServiceId(); + /** + * How long to await the state where the problem service is no longer part + * of the write pipeline for a service that is upstream or downstream of the + * problem service. + */ + long getTimeoutNanos(); + } Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:03:44 UTC (rev 7673) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-18 15:21:37 UTC (rev 7674) @@ -46,7 +46,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -229,6 +229,12 @@ */ private final ReentrantLock lock = new ReentrantLock(); + /** + * Condition signalled when the write replication pipeline has been changed + * (either the upstream and/or downstream service was changed). + */ + private final Condition pipelineChanged = lock.newCondition(); + /** send service (iff this is the leader). */ private HASendService sendService; @@ -595,6 +601,10 @@ super.pipelineChange(oldDownStreamId, newDownStreamId); lock.lock(); try { + if (oldDownStreamId == newDownStreamId) { + // Nothing to do (both null or same UUID reference). + return; + } if (oldDownStreamId != null && newDownStreamId != null && oldDownStreamId.equals(newDownStreamId)) { /* @@ -604,8 +614,10 @@ return; } // The address of the next service in the pipeline. - final InetSocketAddress addrNext = newDownStreamId == null ? null - : getAddrNext(newDownStreamId); +// final InetSocketAddress addrNext = getAddrNext(newDownStreamId); + final PipelineState<S> nextState = getAddrNext(newDownStreamId); + final InetSocketAddress addrNext = nextState == null ? null + : nextState.addr; if (log.isInfoEnabled()) log.info("oldDownStreamId=" + oldDownStreamId + ",newDownStreamId=" + newDownStreamId @@ -636,7 +648,10 @@ receiveService.changeDownStream(addrNext); } // populate and/or clear the cache. - cachePipelineState(newDownStreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(newDownStreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); if (log.isDebugEnabled()) log.debug("pipelineChange - done."); } finally { @@ -657,6 +672,8 @@ if (log.isInfoEnabled()) log.info("receiveService=" + receiveService); receiveService.changeUpStream(); + // Signal pipeline change. + pipelineChanged.signalAll(); } } finally { lock.unlock(); @@ -688,7 +705,7 @@ * * @return It's {@link InetSocketAddress} */ - private InetSocketAddress getAddrNext(final UUID downStreamId) { + private PipelineState<S> getAddrNext(final UUID downStreamId) { if (downStreamId == null) return null; @@ -697,9 +714,10 @@ try { - final InetSocketAddress addrNext = service.getWritePipelineAddr(); + final InetSocketAddress addrNext = service + .getWritePipelineAddr(); - return addrNext; + return new PipelineState<S>(service, addrNext); } catch (IOException e) { @@ -757,50 +775,52 @@ } // clear cache. pipelineStateRef.set(null); + // Signal pipeline change. + pipelineChanged.signalAll(); } finally { lock.unlock(); } } - /** - * Populate or clear the {@link #pipelineState} cache. - * <p> - * Note: The only times we need to populate the {@link #pipelineState} are - * in response to a {@link #pipelineChange(UUID, UUID)} event or in response - * to message a {@link #pipelineElectedLeader()} event. - * - * @param downStreamId - * The downstream service {@link UUID}. - */ - private void cachePipelineState(final UUID downStreamId) { - - if (downStreamId == null) { - - pipelineStateRef.set(null); - - return; - - } - - final S nextService = member.getService(downStreamId); - - final PipelineState<S> pipelineState = new PipelineState<S>(); - - try { - - pipelineState.addr = nextService.getWritePipelineAddr(); - - } catch (IOException e) { - - throw new RuntimeException(e); - - } - - pipelineState.service = nextService; - - pipelineStateRef.set(pipelineState); - - } +// /** +// * Populate or clear the {@link #pipelineState} cache. +// * <p> +// * Note: The only times we need to populate the {@link #pipelineState} are +// * in response to a {@link #pipelineChange(UUID, UUID)} event or in response +// * to message a {@link #pipelineElectedLeader()} event. +// * +// * @param downStreamId +// * The downstream service {@link UUID}. +// */ +// private void cachePipelineState(final UUID downStreamId) { +// +// if (downStreamId == null) { +// +// pipelineStateRef.set(null); +// +// return; +// +// } +// +// final S nextService = member.getService(downStreamId); +// +// final PipelineState<S> pipelineState = new PipelineState<S>(); +// +// try { +// +// pipelineState.addr = nextService.getWritePipelineAddr(); +// +// } catch (IOException e) { +// +// throw new RuntimeException(e); +// +// } +// +// pipelineState.service = nextService; +// +// pipelineStateRef.set(pipelineState); +// +// } /** * Setup the send service. @@ -825,15 +845,20 @@ * handle downstreamId != null conditionally. */ final UUID downstreamId = member.getDownstreamServiceId(); - if (downstreamId != null) { - // The address of the next service in the pipeline. - final InetSocketAddress addrNext = member.getService( - downstreamId).getWritePipelineAddr(); + // The address of the next service in the pipeline. + final PipelineState<S> nextState = getAddrNext(downstreamId); + if (nextState != null) { +// // The address of the next service in the pipeline. +// final InetSocketAddress addrNext = member.getService( +// downstreamId).getWritePipelineAddr(); // Start the send service. - sendService.start(addrNext); + sendService.start(nextState.addr); } // populate and/or clear the cache. - cachePipelineState(downstreamId); + pipelineStateRef.set(nextState); +// cachePipelineState(downstreamId); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { try { tearDown(); @@ -865,11 +890,16 @@ final InetSocketAddress addrSelf = member.getService() .getWritePipelineAddr(); // Address of the downstream service (if any). - final InetSocketAddress addrNext = downstreamId == null ? null - : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = downstreamId == null ? null +// : member.getService(downstreamId).getWritePipelineAddr(); +// final InetSocketAddress addrNext = getAddrNext(downstreamId); + final PipelineState<S> nextServiceState = getAddrNext(downstreamId); + final InetSocketAddress addrNext = nextServiceState == null ? null + : nextServiceState.addr; // Setup the receive service. - receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, - addrNext, new IHAReceiveCallback<HAMessageWrapper>() { + receiveService = new HAReceiveService<HAMessageWrapper>( + addrSelf, addrNext, + new IHAReceiveCallback<HAMessageWrapper>() { @Override public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { @@ -892,6 +922,8 @@ // Start the receive service - will not return until service is // running receiveService.start(); + // Signal pipeline change. + pipelineChanged.signalAll(); } catch (Throwable t) { /* * Always tear down if there was a setup problem to avoid leaking @@ -1251,21 +1283,92 @@ } + /** + * If there is an identified problem service and that service is either + * our upstream or downstream service, then we need to wait until we + * observe a pipeline change event such that it is no longer our + * upstream or downstream service. Otherwise we can go ahead and reset + * our pipeline. + * + * TODO What if this service is the problem service? + */ + private boolean isProblemServiceOurNeighbor() { + + final UUID psid = req.getProblemServiceId(); + + if (psid == null) { + + return false; + + } + + final UUID[] priorAndNext = member.getQuorum() + .getPipelinePriorAndNext(member.getServiceId()); + + if (psid.equals(priorAndNext[0])) + return true; + + if (psid.equals(priorAndNext[1])) + return true; + + return false; + + } + private IHAPipelineResetResponse doRunWithLock() throws Exception { log.warn("Will reset pipeline: req=" + req); - // tear down send and/or receive services. - innerEventHandler.tearDown(); + final long begin = System.nanoTime(); + final long timeout = req.getTimeoutNanos(); + long remaining = timeout; - // The current pipeline order. - final UUID[] pipelineOrder = member.getQuorum().getPipeline(); - // The index of this service in the pipeline order. - final int index = getIndex(serviceId, pipelineOrder); - if (index == 0) { - innerEventHandler.setUpSendService(); - } else if (index > 0) { - innerEventHandler.setUpReceiveService(); + if (isProblemServiceOurNeighbor()) { + + log.warn("Problem service is our neighbor."); + + do { + + pipelineChanged.await(remaining, TimeUnit.NANOSECONDS); + + // remaining = timeout - elapsed + remaining = timeout - (begin - System.nanoTime()); + + } while (isProblemServiceOurNeighbor() && remaining > 0); + + if (isProblemServiceOurNeighbor()) { + + /* + * Timeout elapsed. + * + * Note: This could be a false timeout, e.g., the problem + * service left and re-entered and is still our neighbor. + * However, the leader will just log and ignore the problem. + * If the pipeline is working again, then all is good. If + * not, then it will force out the problem service and reset + * the pipeline again. + */ + throw new TimeoutException(); + + } + + } else { + + log.warn("Problem service is not our neighbor."); + + // tear down send and/or receive services. + innerEventHandler.tearDown(); + + // The current pipeline order. + final UUID[] pipelineOrder = member.getQuorum().getPipeline(); + // The index of this service in the pipeline order. + final int index = getIndex(serviceId, pipelineOrder); + if (index == 0) { + innerEventHandler.setUpSendService(); + } else if (index > 0) { + innerEventHandler.setUpReceiveService(); + } + } return new HAPipelineResetResponse(); @@ -1835,7 +1938,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futSnd.isDone() || !futRec.isDone()) { /* @@ -1846,16 +1954,24 @@ try { futSnd.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRec.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ + futRec.get(); futSnd.get(); - futRec.get(); } finally { if (!futRec.isDone()) { @@ -1989,8 +2105,8 @@ } catch (Throwable e) { - // Log and continue. - log.error("Problem on reset pipeline", e); + // Log and continue. Details are logged by resetPipeline(). + log.warn("Problem(s) on reset pipeline: " + e); if(InnerCause.isInnerCause(e, InterruptedException.class)) { // Propagate interrupt. @@ -2046,7 +2162,9 @@ member.assertLeader(token); - final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token); + // TODO Configure timeout on HAJournalServer. + final IHAPipelineResetRequest msg = new HAPipelineResetRequest(token, + problemServiceId, TimeUnit.MILLISECONDS.toNanos(5000)); /* * To minimize latency, we first submit the futures for the other @@ -2400,7 +2518,12 @@ /* * Await the Futures, but spend more time waiting on the * local Future and only check the remote Future every - * second. Timeouts are ignored during this loop. + * second. Timeouts are ignored during this loop - they + * are used to let us wait longer on the local Future + * than on the remote Future. ExecutionExceptions are + * also ignored. We want to continue this loop until + * both Futures are done. Interrupts are not trapped, so + * an interrupt will still exit the loop. */ while (!futRec.isDone() || !futRep.isDone()) { /* @@ -2414,17 +2537,25 @@ try { futRec.get(1L, TimeUnit.SECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } try { futRep.get(10L, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { + } catch (ExecutionException ignore) { } } - // Note: Both futures are DONE at this point! + /* + * Note: Both futures are DONE at this point. However, + * we want to check the remote Future for the downstream + * service first in order to accurately report the + * service that was the source of a pipeline replication + * problem. + */ futRec.get(); futRep.get(); - + } finally { if (!futRep.isDone()) { // cancel remote Future unless done. @@ -2522,10 +2653,17 @@ */ public S service; + /** Deserialization constructor. */ + @SuppressWarnings("unused") public PipelineState() { } + public PipelineState(final S service, final InetSocketAddress addr) { + this.service = service; + this.addr = addr; + } + @SuppressWarnings("unchecked") public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |