|
From: <tho...@us...> - 2010-08-18 11:40:17
|
Revision: 3446
http://bigdata.svn.sourceforge.net/bigdata/?rev=3446&view=rev
Author: thompsonbry
Date: 2010-08-18 11:40:10 +0000 (Wed, 18 Aug 2010)
Log Message:
-----------
Version of the distributed join task modified to use the JCIC Memoizer pattern.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-17 23:18:24 UTC (rev 3445)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-18 11:40:10 UTC (rev 3446)
@@ -2,19 +2,17 @@
import java.io.IOException;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import com.bigdata.concurrent.NamedLock;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.IMutableRelation;
import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer;
@@ -36,8 +34,14 @@
import com.bigdata.service.IDataService;
import com.bigdata.service.Session;
import com.bigdata.striterator.IKeyOrder;
+import com.bigdata.util.concurrent.Computable;
+import com.bigdata.util.concurrent.Memoizer;
+import cutthecrap.utils.striterators.Filter;
+import cutthecrap.utils.striterators.Resolver;
+import cutthecrap.utils.striterators.Striterator;
+
/**
* Implementation used by scale-out deployments. There will be one instance
* of this task per index partition on which the rule will read. Those
@@ -61,7 +65,7 @@
/**
* The federation is used to obtain locator scans for the access paths.
*/
- final protected AbstractScaleOutFederation fed;
+ final protected AbstractScaleOutFederation<?> fed;
/**
* The {@link IJoinNexus} for the {@link IBigdataFederation}. This is
@@ -79,7 +83,7 @@
/**
* @see IRuleState#getKeyOrder()
*/
- final private IKeyOrder[] keyOrders;
+ final private IKeyOrder<?>[] keyOrders;
/**
* The name of the scale-out index associated with the next
@@ -115,17 +119,17 @@
*/
private final DataService dataService;
- /**
- * The {@link JoinTaskSink}s for the downstream
- * {@link DistributedJoinTask}s onto which the generated
- * {@link IBindingSet}s will be written. This is <code>null</code>
- * for the last join since we will write solutions onto the
- * {@link #getSolutionBuffer()} instead.
- *
- * @todo configure capacity based on expectations of index partition
- * fan-out for this join dimension
- */
- final private Map<PartitionLocator, JoinTaskSink> sinkCache;
+// /**
+// * The {@link JoinTaskSink}s for the downstream
+// * {@link DistributedJoinTask}s onto which the generated
+// * {@link IBindingSet}s will be written. This is <code>null</code>
+// * for the last join since we will write solutions onto the
+// * {@link #getSolutionBuffer()} instead.
+// *
+// * @todo configure capacity based on expectations of index partition
+// * fan-out for this join dimension
+// */
+// final private Map<PartitionLocator, JoinTaskSink> sinkCache;
public DistributedJoinTask(
// final String scaleOutIndexName,
@@ -134,7 +138,7 @@
final int[] order,//
final int orderIndex,//
final int partitionId,//
- final AbstractScaleOutFederation fed,//
+ final AbstractScaleOutFederation<?> fed,//
final IJoinMaster master,//
final UUID masterUUID,//
final IAsynchronousIterator<IBindingSet[]> src,//
@@ -158,7 +162,7 @@
throw new IllegalArgumentException();
// Note: This MUST be the index manager for the local data service.
- if(joinNexus instanceof IBigdataFederation)
+ if(joinNexus instanceof IBigdataFederation<?>)
throw new IllegalArgumentException();
this.fed = fed;
@@ -172,7 +176,8 @@
if (lastJoin) {
- sinkCache = null;
+// sinkCache = null;
+ memo = null;
nextScaleOutIndexName = null;
@@ -195,7 +200,7 @@
* rule.
*/
- final IMutableRelation relation = (IMutableRelation) tmp
+ final IMutableRelation<?> relation = (IMutableRelation<?>) tmp
.getHeadRelationView(rule.getHead());
switch (action) {
@@ -242,7 +247,7 @@
} else {
- final IPredicate nextPredicate = rule
+ final IPredicate<?> nextPredicate = rule
.getTail(order[orderIndex + 1]);
final String namespace = nextPredicate.getOnlyRelationName();
@@ -252,7 +257,8 @@
solutionBuffer = null;
- sinkCache = new LinkedHashMap<PartitionLocator, JoinTaskSink>();
+// sinkCache = new LinkedHashMap<PartitionLocator, JoinTaskSink>();
+ memo = new SinkMemoizer(getSink);
// System.err.println("orderIndex=" + orderIndex + ", resources="
// + Arrays.toString(getResource()) + ", nextPredicate="
@@ -340,10 +346,10 @@
sourcesExhausted = true;
- final IAsynchronousIterator[] a = sources
+ final IAsynchronousIterator<?>[] a = sources
.toArray(new IAsynchronousIterator[] {});
- for (IAsynchronousIterator source : a) {
+ for (IAsynchronousIterator<?> source : a) {
source.close();
@@ -420,7 +426,7 @@
* @return A chunk assembled from one or more chunks from one or more of
* the source {@link JoinTask}s.
*/
- protected IBindingSet[] nextChunk() throws InterruptedException {
+ protected IBindingSet[] nextChunk() throws InterruptedException {
if (sourcesExhausted) {
@@ -482,6 +488,7 @@
// clone to avoid concurrent modification of sources during
// traversal.
+ @SuppressWarnings("unchecked")
final IAsynchronousIterator<IBindingSet[]>[] sources = (IAsynchronousIterator<IBindingSet[]>[]) this.sources
.toArray(new IAsynchronousIterator[] {});
@@ -773,7 +780,7 @@
+ ", partitionId="
+ partitionId
+ (lastJoin ? ", lastJoin" : ", sinkCount="
- + sinkCache.size()));
+ + memo.size()));
/*
* For the last join dimension the JoinTask instead writes onto the
@@ -789,7 +796,8 @@
*/
if (lastJoin) {
- assert sinkCache == null;
+// assert sinkCache == null;
+ assert memo == null;
if (DEBUG)
log.debug("\nWill flush buffer containing "
@@ -832,8 +840,7 @@
final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>();
- final Iterator<JoinTaskSink> itr = sinkCache.values()
- .iterator();
+ final Iterator<JoinTaskSink> itr = memo.getSinks();
while (itr.hasNext()) {
@@ -846,7 +853,7 @@
final List<Future<Void>> futures = fed.getExecutorService()
.invokeAll(tasks);
- for (Future f : futures) {
+ for (Future<?> f : futures) {
// make sure that all tasks were successful.
f.get();
@@ -858,8 +865,7 @@
// Await sinks.
{
- final Iterator<JoinTaskSink> itr = sinkCache.values()
- .iterator();
+ final Iterator<JoinTaskSink> itr = memo.getSinks();
while (itr.hasNext()) {
@@ -868,7 +874,7 @@
final JoinTaskSink sink = itr.next();
- final Future f = sink.getFuture();
+ final Future<?> f = sink.getFuture();
if (DEBUG)
log.debug("Waiting for Future: sink=" + sink);
@@ -888,7 +894,7 @@
+ ", partitionId="
+ partitionId
+ (lastJoin ? "lastJoin" : ", sinkCount="
- + sinkCache.size()));
+ + memo.size()));
}
@@ -948,9 +954,9 @@
if (DEBUG)
log.debug("orderIndex=" + orderIndex + ", partitionId="
- + partitionId + ", sinkCount=" + sinkCache.size());
+ + partitionId + ", sinkCount=" + memo.size());
- final Iterator<JoinTaskSink> itr = sinkCache.values().iterator();
+ final Iterator<JoinTaskSink> itr = memo.getSinks();
while (itr.hasNext()) {
@@ -968,7 +974,7 @@
if (DEBUG)
log.debug("Done: orderIndex=" + orderIndex + ", partitionId="
- + partitionId + ", sinkCount=" + sinkCache.size());
+ + partitionId + ", sinkCount=" + memo.size());
}
@@ -985,175 +991,320 @@
*
* @return The sink.
*
- * @throws ExecutionException
+ * @throws RuntimeException
* If the {@link JoinTaskFactoryTask} fails.
* @throws InterruptedException
* If the {@link JoinTaskFactoryTask} is interrupted.
+ */
+ protected JoinTaskSink getSink(final PartitionLocator locator)
+ throws InterruptedException, RuntimeException {
+
+ return memo.compute(new SinkRequest(this, locator));
+
+ }
+
+ /**
+ * Helper class models a request to obtain a sink for a given join task and
+ * locator.
+ * <p>
+ * Note: This class must implement equals() and hashCode() since it is used
+ * within the {@link Memoizer} pattern.
*
- * @todo Review this as a possible concurrency bottleneck. The operation
- * can have significant latency since RMI is required on a cache
- * miss to lookup or create the {@link JoinTask} on the target
- * dataService. Therefore we should probably allow concurrent
- * callers and establish a {@link NamedLock} that serializes
- * callers seeking the {@link JoinTaskSink} for the same index
- * partition identifier. Note that the limit on the #of possible
- * callers is the permitted parallelism for processing the source
- * {@link IBindingSet}s, e.g., the #of {@link ChunkTask}s that
- * can execute in parallel for a given {@link JoinTask}.
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
*/
- synchronized protected JoinTaskSink getSink(final PartitionLocator locator)
- throws InterruptedException, ExecutionException {
+ private static class SinkRequest {
- JoinTaskSink sink = sinkCache.get(locator);
+ final DistributedJoinTask joinTask;
- if (sink == null) {
+ final PartitionLocator locator;
-// /*
-// * Cache miss.
-// *
-// * First, obtain an exclusive resource lock on the sink task
-// * namespace and see if there is an instance of the required join
-// * task running somewhere. If there is, then request the proxy for
-// * its Future from the dataService on which it is executing.
-// *
-// * Otherwise, we are holding an exclusive lock on the sink task
-// * namespace. Select a dataService instance on which the desired
-// * index partition is replicated and then create a join task on that
-// * instance, register the join task under the lock, and return its
-// * Future.
-// *
-// * Finally, release the exclusive lock.
-// *
-// * Note: The JoinTask must acquire the same lock in order to
-// * conclude that it is done with its work and may exit. The lock
-// * therefore provides for an atomic decision vis-a-vis whether we
-// * need to create a new join task or use an existing one as well as
-// * whether an existing join task may exit.
-// *
-// * @todo since replication is not implemented we don't need to store
-// * anything under the namespace while we hold a lock. however, this
-// * shows a pattern where we would like to do that in the future. I
-// * believe that ZooKeeper would support this. If we do store
-// * something, then be sure that we also clean it up when we are done
-// * with the master instance.
-// */
+ /**
+ *
+ * @param joinTask
+ * The join task.
+ * @param locator
+ * The locator for the target shard.
+ */
+ public SinkRequest(final DistributedJoinTask joinTask, final PartitionLocator locator) {
+
+ this.joinTask = joinTask;
+
+ this.locator = locator;
+
+ }
+
+ /**
+ * Equals returns true iff parent == o.parent and index == o.index.
+ */
+ public boolean equals(final Object o) {
+
+ if (!(o instanceof SinkRequest))
+ return false;
+
+ final SinkRequest r = (SinkRequest) o;
+
+ return joinTask == r.joinTask && locator.equals(locator);
+
+ }
+
+ /**
+ * The hashCode() is based directly on the hash code of the
+ * {@link PartitionLocator}. All requests against a given
+ * {@link Memoizer} will have the same {@link DistributedJoinTask} so
+ * that field can be factored out of the hash code.
+ */
+ public int hashCode() {
+
+ return locator.hashCode();
+
+ }
+
+ }
+
+ /**
+ * Helper establishes a {@link JoinTaskSink} on the target {@link IDataService}.
+ */
+ final private static Computable<SinkRequest, JoinTaskSink> getSink = new Computable<SinkRequest, JoinTaskSink>() {
+
+ public JoinTaskSink compute(final SinkRequest req)
+ throws InterruptedException {
+
+ try {
+ return req.joinTask._getSink(req.locator);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ };
+
+ /**
+ * FIXME javadoc : A {@link Memoizer} subclass which exposes an additional method to remove
+ * a {@link FutureTask} from the internal cache. This is used as part of an
+ * explicit protocol to clear out cache
+ * entries once the sink reference has been set on
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ private static class SinkMemoizer extends
+ Memoizer<SinkRequest/* request */, JoinTaskSink/* sink */> {
+
+ /**
+ * @param c
+ */
+ public SinkMemoizer(final Computable<SinkRequest, JoinTaskSink> c) {
+
+ super(c);
+
+ }
+
+ int size() {
+ return cache.size();
+ }
+
+ /**
+ * FIMXE There are two distinct semantics available here. One is the set
+ * of current sinks (there is a join task fully up and running on a DS
+ * somewhere and we have a proxy for that DS). The other is the set of
+ * sinks which have been requested but may or may not have been fully
+ * realized yet. When we are breaking a join, we probably want to cancel
+ * all of the requests to obtain sinks in addition to canceling any
+ * running sinks. A similar problem may exist if we implement native
+ * SLICE since we could break the join while there are requests out to
+ * create sinks.
+ *
+ * One way to handle this is to pull the cancelSinks() method into this
+ * memoizer.
+ *
+ * However, if we broad cast the rule to the nodes and move away from
+ * this sinks model to using NIO buffers then we will just broadcast
+ * the close of each tail in turn or broadcast the break of the join.
+ */
+ @SuppressWarnings("unchecked")
+ Iterator<JoinTaskSink> getSinks() {
+ return new Striterator(cache.values().iterator()).addFilter(new Filter(){
+ private static final long serialVersionUID = 1L;
+ @Override
+ protected boolean isValid(final Object e) {
+ /*
+ * Filter out any tasks which are not done or which had an
+ * error.
+ */
+ final Future<JoinTaskSink> f = (Future<JoinTaskSink>)e;
+ if(!f.isDone()) {
+ return false;
+ }
+ try {f.get();}
+ catch(final ExecutionException ex) {
+ return false;
+ } catch (final InterruptedException ex) {
+ return false;
+ }
+ return true;
+ }
+ }).addFilter(new Resolver(){
+ private static final long serialVersionUID = 1L;
+ @Override
+ protected Object resolve(final Object arg0) {
+ /*
+ * We filtered out any tasks which were not done and any
+ * tasks which had errors. The future should be immediately
+ * available and Future.get() should not throw an error.
+ */
+ final Future<JoinTaskSink> f = (Future<JoinTaskSink>)arg0;
+ try {
+ return f.get();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+// /**
+// * Called by the thread which atomically sets the
+// * {@link AbstractNode#childRefs} element to the computed
+// * {@link AbstractNode}. At that point a reference exists to the child
+// * on the parent.
+// *
+// * @param req
+// * The request.
+// */
+// void removeFromCache(final SinkRequest req) {
//
-// final String namespace;
-// try {
-// namespace = masterProxy.getUUID() + "/" + orderIndex + "/"
-// + partitionId;
-// } catch (IOException ex) {
-// throw new RuntimeException(ex);
+// if (cache.remove(req) == null) {
+//
+// throw new AssertionError();
+//
// }
-//
-// final IResourceLock lock;
-// try {
-// lock = fed.getResourceLockService().acquireExclusiveLock(namespace);
-// } catch (IOException ex) {
-// throw new RuntimeException(ex);
-// }
//
-// try {
-
- /*
- * Allocate/discover JoinTask on the target data service and
- * obtain a sink reference for its future and buffers.
- *
- * Note: The JoinMasterTask uses very similar logic to setup the
- * first join dimension. Of course, it gets to assume that there
- * is no such JoinTask in existence at the time.
- */
+// }
- final int nextOrderIndex = orderIndex + 1;
+// /**
+// * Called from {@link AbstractBTree#close()}.
+// *
+// * @todo should we do this? There should not be any reads against the
+// * the B+Tree when it is close()d. Therefore I do not believe there
+// * is any reason to clear the FutureTask cache.
+// */
+// void clear() {
+//
+// cache.clear();
+//
+// }
+
+ };
- if (DEBUG)
- log.debug("Creating join task: nextOrderIndex="
- + nextOrderIndex + ", indexName="
- + nextScaleOutIndexName + ", partitionId="
- + locator.getPartitionId());
+ /**
+ * Used to materialize {@link JoinTaskSink}s without causing concurrent requests
+ * for different sinks to block.
+ */
+ final private SinkMemoizer memo;
- final UUID sinkUUID = locator.getDataServiceUUID();
+ /**
+ * Inner implementation invoked from the {@link Memoizer}.
+ *
+ * @param locator
+ * The shard locator.
+ *
+ * @return The sink which will write on the downstream {@link JoinTask}
+ * running on the node for that shard.
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private JoinTaskSink _getSink(final PartitionLocator locator) throws InterruptedException, ExecutionException {
+
+ /*
+ * Allocate/discover JoinTask on the target data service and
+ * obtain a sink reference for its future and buffers.
+ *
+ * Note: The JoinMasterTask uses very similar logic to setup the
+ * first join dimension. Of course, it gets to assume that there
+ * is no such JoinTask in existence at the time.
+ */
- final IDataService dataService;
- if (sinkUUID.equals(fed.getServiceUUID())) {
+ final int nextOrderIndex = orderIndex + 1;
- /*
- * As an optimization, special case when the downstream
- * data service is _this_ data service.
- */
- dataService = (IDataService)fed.getService();
-
- } else {
-
- dataService = fed.getDataService(sinkUUID);
-
- }
+ if (DEBUG)
+ log.debug("Creating join task: nextOrderIndex="
+ + nextOrderIndex + ", indexName="
+ + nextScaleOutIndexName + ", partitionId="
+ + locator.getPartitionId());
- sink = new JoinTaskSink(fed, locator, this);
+ final UUID sinkUUID = locator.getDataServiceUUID();
- /*
- * Export async iterator proxy.
- *
- * Note: This proxy is used by the sink to draw chunks from the
- * source JoinTask(s).
- */
- final IAsynchronousIterator<IBindingSet[]> sourceItrProxy;
- if (fed.isDistributed()) {
+ final IDataService dataService;
+ if (sinkUUID.equals(fed.getServiceUUID())) {
- sourceItrProxy = ((AbstractDistributedFederation) fed)
- .getProxy(sink.blockingBuffer.iterator(), joinNexus
- .getBindingSetSerializer(), joinNexus
- .getChunkOfChunksCapacity());
+ /*
+ * As an optimization, special case when the downstream
+ * data service is _this_ data service.
+ */
+ dataService = (IDataService)fed.getService();
+
+ } else {
+
+ dataService = fed.getDataService(sinkUUID);
+
+ }
- } else {
+ final JoinTaskSink sink = new JoinTaskSink(fed, locator, this);
- sourceItrProxy = sink.blockingBuffer.iterator();
+ /*
+ * Export async iterator proxy.
+ *
+ * Note: This proxy is used by the sink to draw chunks from the
+ * source JoinTask(s).
+ */
+ final IAsynchronousIterator<IBindingSet[]> sourceItrProxy;
+ if (fed.isDistributed()) {
- }
+ sourceItrProxy = ((AbstractDistributedFederation<?>) fed)
+ .getProxy(sink.blockingBuffer.iterator(), joinNexus
+ .getBindingSetSerializer(), joinNexus
+ .getChunkOfChunksCapacity());
- // the future for the factory task (not the JoinTask).
- final Future factoryFuture;
- try {
+ } else {
- final JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask(
- nextScaleOutIndexName, rule, joinNexus
- .getJoinNexusFactory(), order, nextOrderIndex,
- locator.getPartitionId(), masterProxy, masterUUID,
- sourceItrProxy, keyOrders, requiredVars);
+ sourceItrProxy = sink.blockingBuffer.iterator();
- // submit the factory task, obtain its future.
- factoryFuture = dataService.submit(factoryTask);
+ }
- } catch (IOException ex) {
+ // the future for the factory task (not the JoinTask).
+ final Future<?> factoryFuture;
+ try {
- // RMI problem.
- throw new RuntimeException(ex);
+ final JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask(
+ nextScaleOutIndexName, rule, joinNexus
+ .getJoinNexusFactory(), order, nextOrderIndex,
+ locator.getPartitionId(), masterProxy, masterUUID,
+ sourceItrProxy, keyOrders, requiredVars);
- }
+ // submit the factory task, obtain its future.
+ factoryFuture = dataService.submit(factoryTask);
- /*
- * Obtain the future for the JoinTask from the factory task's
- * Future.
- */
+ } catch (IOException ex) {
- sink.setFuture((Future) factoryFuture.get());
+ // RMI problem.
+ throw new RuntimeException(ex);
- stats.fanOut++;
-
- sinkCache.put(locator, sink);
-
-// } finally {
-//
-// try {
-// lock.unlock();
-// } catch (IOException ex) {
-// throw new RuntimeException(ex);
-// }
-//
-// }
-
}
+ /*
+ * Obtain the future for the JoinTask from the factory task's
+ * Future.
+ */
+
+ sink.setFuture((Future<?>) factoryFuture.get());
+
+ stats.fanOut++;
+
return sink;
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java 2010-08-17 23:18:24 UTC (rev 3445)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java 2010-08-18 11:40:10 UTC (rev 3446)
@@ -1,7 +1,6 @@
package com.bigdata.relation.rule.eval.pipeline;
import java.util.Iterator;
-import java.util.concurrent.ExecutionException;
import com.bigdata.mdi.PartitionLocator;
import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer;
@@ -31,7 +30,7 @@
/** The tailIndex of the next predicate to be evaluated. */
final int nextTailIndex;
- final IBigdataFederation fed;
+ final IBigdataFederation<?> fed;
/**
*
@@ -39,7 +38,7 @@
* @param joinTask
* @param capacity
*/
- public UnsyncDistributedOutputBuffer(final AbstractScaleOutFederation fed,
+ public UnsyncDistributedOutputBuffer(final AbstractScaleOutFederation<?> fed,
final DistributedJoinTask joinTask, final int capacity) {
super(capacity);
@@ -92,7 +91,7 @@
int bindingSetsOut = 0;
// the next predicate to be evaluated.
- final IPredicate nextPred = joinTask.rule.getTail(nextTailIndex);
+ final IPredicate<?> nextPred = joinTask.rule.getTail(nextTailIndex);
final IJoinNexus joinNexus = joinTask.joinNexus;
@@ -130,8 +129,6 @@
sink = joinTask.getSink(locator);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
- } catch (ExecutionException ex) {
- throw new RuntimeException(ex);
}
// add binding set to the sink.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|