From: <tho...@us...> - 2010-08-18 11:40:17
|
Revision: 3446 http://bigdata.svn.sourceforge.net/bigdata/?rev=3446&view=rev Author: thompsonbry Date: 2010-08-18 11:40:10 +0000 (Wed, 18 Aug 2010) Log Message: ----------- Version of the distributed join task modified to use the JCIC Memoizer pattern. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-17 23:18:24 UTC (rev 3445) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinTask.java 2010-08-18 11:40:10 UTC (rev 3446) @@ -2,19 +2,17 @@ import java.io.IOException; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import com.bigdata.concurrent.NamedLock; import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; @@ -36,8 +34,14 @@ import com.bigdata.service.IDataService; import com.bigdata.service.Session; import com.bigdata.striterator.IKeyOrder; +import com.bigdata.util.concurrent.Computable; +import com.bigdata.util.concurrent.Memoizer; +import cutthecrap.utils.striterators.Filter; +import cutthecrap.utils.striterators.Resolver; +import cutthecrap.utils.striterators.Striterator; + /** * Implementation used by scale-out deployments. There will be one instance * of this task per index partition on which the rule will read. Those @@ -61,7 +65,7 @@ /** * The federation is used to obtain locator scans for the access paths. */ - final protected AbstractScaleOutFederation fed; + final protected AbstractScaleOutFederation<?> fed; /** * The {@link IJoinNexus} for the {@link IBigdataFederation}. This is @@ -79,7 +83,7 @@ /** * @see IRuleState#getKeyOrder() */ - final private IKeyOrder[] keyOrders; + final private IKeyOrder<?>[] keyOrders; /** * The name of the scale-out index associated with the next @@ -115,17 +119,17 @@ */ private final DataService dataService; - /** - * The {@link JoinTaskSink}s for the downstream - * {@link DistributedJoinTask}s onto which the generated - * {@link IBindingSet}s will be written. This is <code>null</code> - * for the last join since we will write solutions onto the - * {@link #getSolutionBuffer()} instead. - * - * @todo configure capacity based on expectations of index partition - * fan-out for this join dimension - */ - final private Map<PartitionLocator, JoinTaskSink> sinkCache; +// /** +// * The {@link JoinTaskSink}s for the downstream +// * {@link DistributedJoinTask}s onto which the generated +// * {@link IBindingSet}s will be written. This is <code>null</code> +// * for the last join since we will write solutions onto the +// * {@link #getSolutionBuffer()} instead. +// * +// * @todo configure capacity based on expectations of index partition +// * fan-out for this join dimension +// */ +// final private Map<PartitionLocator, JoinTaskSink> sinkCache; public DistributedJoinTask( // final String scaleOutIndexName, @@ -134,7 +138,7 @@ final int[] order,// final int orderIndex,// final int partitionId,// - final AbstractScaleOutFederation fed,// + final AbstractScaleOutFederation<?> fed,// final IJoinMaster master,// final UUID masterUUID,// final IAsynchronousIterator<IBindingSet[]> src,// @@ -158,7 +162,7 @@ throw new IllegalArgumentException(); // Note: This MUST be the index manager for the local data service. - if(joinNexus instanceof IBigdataFederation) + if(joinNexus instanceof IBigdataFederation<?>) throw new IllegalArgumentException(); this.fed = fed; @@ -172,7 +176,8 @@ if (lastJoin) { - sinkCache = null; +// sinkCache = null; + memo = null; nextScaleOutIndexName = null; @@ -195,7 +200,7 @@ * rule. */ - final IMutableRelation relation = (IMutableRelation) tmp + final IMutableRelation<?> relation = (IMutableRelation<?>) tmp .getHeadRelationView(rule.getHead()); switch (action) { @@ -242,7 +247,7 @@ } else { - final IPredicate nextPredicate = rule + final IPredicate<?> nextPredicate = rule .getTail(order[orderIndex + 1]); final String namespace = nextPredicate.getOnlyRelationName(); @@ -252,7 +257,8 @@ solutionBuffer = null; - sinkCache = new LinkedHashMap<PartitionLocator, JoinTaskSink>(); +// sinkCache = new LinkedHashMap<PartitionLocator, JoinTaskSink>(); + memo = new SinkMemoizer(getSink); // System.err.println("orderIndex=" + orderIndex + ", resources=" // + Arrays.toString(getResource()) + ", nextPredicate=" @@ -340,10 +346,10 @@ sourcesExhausted = true; - final IAsynchronousIterator[] a = sources + final IAsynchronousIterator<?>[] a = sources .toArray(new IAsynchronousIterator[] {}); - for (IAsynchronousIterator source : a) { + for (IAsynchronousIterator<?> source : a) { source.close(); @@ -420,7 +426,7 @@ * @return A chunk assembled from one or more chunks from one or more of * the source {@link JoinTask}s. */ - protected IBindingSet[] nextChunk() throws InterruptedException { + protected IBindingSet[] nextChunk() throws InterruptedException { if (sourcesExhausted) { @@ -482,6 +488,7 @@ // clone to avoid concurrent modification of sources during // traversal. + @SuppressWarnings("unchecked") final IAsynchronousIterator<IBindingSet[]>[] sources = (IAsynchronousIterator<IBindingSet[]>[]) this.sources .toArray(new IAsynchronousIterator[] {}); @@ -773,7 +780,7 @@ + ", partitionId=" + partitionId + (lastJoin ? ", lastJoin" : ", sinkCount=" - + sinkCache.size())); + + memo.size())); /* * For the last join dimension the JoinTask instead writes onto the @@ -789,7 +796,8 @@ */ if (lastJoin) { - assert sinkCache == null; +// assert sinkCache == null; + assert memo == null; if (DEBUG) log.debug("\nWill flush buffer containing " @@ -832,8 +840,7 @@ final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); - final Iterator<JoinTaskSink> itr = sinkCache.values() - .iterator(); + final Iterator<JoinTaskSink> itr = memo.getSinks(); while (itr.hasNext()) { @@ -846,7 +853,7 @@ final List<Future<Void>> futures = fed.getExecutorService() .invokeAll(tasks); - for (Future f : futures) { + for (Future<?> f : futures) { // make sure that all tasks were successful. f.get(); @@ -858,8 +865,7 @@ // Await sinks. { - final Iterator<JoinTaskSink> itr = sinkCache.values() - .iterator(); + final Iterator<JoinTaskSink> itr = memo.getSinks(); while (itr.hasNext()) { @@ -868,7 +874,7 @@ final JoinTaskSink sink = itr.next(); - final Future f = sink.getFuture(); + final Future<?> f = sink.getFuture(); if (DEBUG) log.debug("Waiting for Future: sink=" + sink); @@ -888,7 +894,7 @@ + ", partitionId=" + partitionId + (lastJoin ? "lastJoin" : ", sinkCount=" - + sinkCache.size())); + + memo.size())); } @@ -948,9 +954,9 @@ if (DEBUG) log.debug("orderIndex=" + orderIndex + ", partitionId=" - + partitionId + ", sinkCount=" + sinkCache.size()); + + partitionId + ", sinkCount=" + memo.size()); - final Iterator<JoinTaskSink> itr = sinkCache.values().iterator(); + final Iterator<JoinTaskSink> itr = memo.getSinks(); while (itr.hasNext()) { @@ -968,7 +974,7 @@ if (DEBUG) log.debug("Done: orderIndex=" + orderIndex + ", partitionId=" - + partitionId + ", sinkCount=" + sinkCache.size()); + + partitionId + ", sinkCount=" + memo.size()); } @@ -985,175 +991,320 @@ * * @return The sink. * - * @throws ExecutionException + * @throws RuntimeException * If the {@link JoinTaskFactoryTask} fails. * @throws InterruptedException * If the {@link JoinTaskFactoryTask} is interrupted. + */ + protected JoinTaskSink getSink(final PartitionLocator locator) + throws InterruptedException, RuntimeException { + + return memo.compute(new SinkRequest(this, locator)); + + } + + /** + * Helper class models a request to obtain a sink for a given join task and + * locator. + * <p> + * Note: This class must implement equals() and hashCode() since it is used + * within the {@link Memoizer} pattern. * - * @todo Review this as a possible concurrency bottleneck. The operation - * can have significant latency since RMI is required on a cache - * miss to lookup or create the {@link JoinTask} on the target - * dataService. Therefore we should probably allow concurrent - * callers and establish a {@link NamedLock} that serializes - * callers seeking the {@link JoinTaskSink} for the same index - * partition identifier. Note that the limit on the #of possible - * callers is the permitted parallelism for processing the source - * {@link IBindingSet}s, e.g., the #of {@link ChunkTask}s that - * can execute in parallel for a given {@link JoinTask}. + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - synchronized protected JoinTaskSink getSink(final PartitionLocator locator) - throws InterruptedException, ExecutionException { + private static class SinkRequest { - JoinTaskSink sink = sinkCache.get(locator); + final DistributedJoinTask joinTask; - if (sink == null) { + final PartitionLocator locator; -// /* -// * Cache miss. -// * -// * First, obtain an exclusive resource lock on the sink task -// * namespace and see if there is an instance of the required join -// * task running somewhere. If there is, then request the proxy for -// * its Future from the dataService on which it is executing. -// * -// * Otherwise, we are holding an exclusive lock on the sink task -// * namespace. Select a dataService instance on which the desired -// * index partition is replicated and then create a join task on that -// * instance, register the join task under the lock, and return its -// * Future. -// * -// * Finally, release the exclusive lock. -// * -// * Note: The JoinTask must acquire the same lock in order to -// * conclude that it is done with its work and may exit. The lock -// * therefore provides for an atomic decision vis-a-vis whether we -// * need to create a new join task or use an existing one as well as -// * whether an existing join task may exit. -// * -// * @todo since replication is not implemented we don't need to store -// * anything under the namespace while we hold a lock. however, this -// * shows a pattern where we would like to do that in the future. I -// * believe that ZooKeeper would support this. If we do store -// * something, then be sure that we also clean it up when we are done -// * with the master instance. -// */ + /** + * + * @param joinTask + * The join task. + * @param locator + * The locator for the target shard. + */ + public SinkRequest(final DistributedJoinTask joinTask, final PartitionLocator locator) { + + this.joinTask = joinTask; + + this.locator = locator; + + } + + /** + * Equals returns true iff parent == o.parent and index == o.index. + */ + public boolean equals(final Object o) { + + if (!(o instanceof SinkRequest)) + return false; + + final SinkRequest r = (SinkRequest) o; + + return joinTask == r.joinTask && locator.equals(locator); + + } + + /** + * The hashCode() is based directly on the hash code of the + * {@link PartitionLocator}. All requests against a given + * {@link Memoizer} will have the same {@link DistributedJoinTask} so + * that field can be factored out of the hash code. + */ + public int hashCode() { + + return locator.hashCode(); + + } + + } + + /** + * Helper establishes a {@link JoinTaskSink} on the target {@link IDataService}. + */ + final private static Computable<SinkRequest, JoinTaskSink> getSink = new Computable<SinkRequest, JoinTaskSink>() { + + public JoinTaskSink compute(final SinkRequest req) + throws InterruptedException { + + try { + return req.joinTask._getSink(req.locator); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + } + + }; + + /** + * FIXME javadoc : A {@link Memoizer} subclass which exposes an additional method to remove + * a {@link FutureTask} from the internal cache. This is used as part of an + * explicit protocol to clear out cache + * entries once the sink reference has been set on + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private static class SinkMemoizer extends + Memoizer<SinkRequest/* request */, JoinTaskSink/* sink */> { + + /** + * @param c + */ + public SinkMemoizer(final Computable<SinkRequest, JoinTaskSink> c) { + + super(c); + + } + + int size() { + return cache.size(); + } + + /** + * FIMXE There are two distinct semantics available here. One is the set + * of current sinks (there is a join task fully up and running on a DS + * somewhere and we have a proxy for that DS). The other is the set of + * sinks which have been requested but may or may not have been fully + * realized yet. When we are breaking a join, we probably want to cancel + * all of the requests to obtain sinks in addition to canceling any + * running sinks. A similar problem may exist if we implement native + * SLICE since we could break the join while there are requests out to + * create sinks. + * + * One way to handle this is to pull the cancelSinks() method into this + * memoizer. + * + * However, if we broad cast the rule to the nodes and move away from + * this sinks model to using NIO buffers then we will just broadcast + * the close of each tail in turn or broadcast the break of the join. + */ + @SuppressWarnings("unchecked") + Iterator<JoinTaskSink> getSinks() { + return new Striterator(cache.values().iterator()).addFilter(new Filter(){ + private static final long serialVersionUID = 1L; + @Override + protected boolean isValid(final Object e) { + /* + * Filter out any tasks which are not done or which had an + * error. + */ + final Future<JoinTaskSink> f = (Future<JoinTaskSink>)e; + if(!f.isDone()) { + return false; + } + try {f.get();} + catch(final ExecutionException ex) { + return false; + } catch (final InterruptedException ex) { + return false; + } + return true; + } + }).addFilter(new Resolver(){ + private static final long serialVersionUID = 1L; + @Override + protected Object resolve(final Object arg0) { + /* + * We filtered out any tasks which were not done and any + * tasks which had errors. The future should be immediately + * available and Future.get() should not throw an error. + */ + final Future<JoinTaskSink> f = (Future<JoinTaskSink>)arg0; + try { + return f.get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + throw new RuntimeException(e); + } + } + }); + } + +// /** +// * Called by the thread which atomically sets the +// * {@link AbstractNode#childRefs} element to the computed +// * {@link AbstractNode}. At that point a reference exists to the child +// * on the parent. +// * +// * @param req +// * The request. +// */ +// void removeFromCache(final SinkRequest req) { // -// final String namespace; -// try { -// namespace = masterProxy.getUUID() + "/" + orderIndex + "/" -// + partitionId; -// } catch (IOException ex) { -// throw new RuntimeException(ex); +// if (cache.remove(req) == null) { +// +// throw new AssertionError(); +// // } -// -// final IResourceLock lock; -// try { -// lock = fed.getResourceLockService().acquireExclusiveLock(namespace); -// } catch (IOException ex) { -// throw new RuntimeException(ex); -// } // -// try { - - /* - * Allocate/discover JoinTask on the target data service and - * obtain a sink reference for its future and buffers. - * - * Note: The JoinMasterTask uses very similar logic to setup the - * first join dimension. Of course, it gets to assume that there - * is no such JoinTask in existence at the time. - */ +// } - final int nextOrderIndex = orderIndex + 1; +// /** +// * Called from {@link AbstractBTree#close()}. +// * +// * @todo should we do this? There should not be any reads against the +// * the B+Tree when it is close()d. Therefore I do not believe there +// * is any reason to clear the FutureTask cache. +// */ +// void clear() { +// +// cache.clear(); +// +// } + + }; - if (DEBUG) - log.debug("Creating join task: nextOrderIndex=" - + nextOrderIndex + ", indexName=" - + nextScaleOutIndexName + ", partitionId=" - + locator.getPartitionId()); + /** + * Used to materialize {@link JoinTaskSink}s without causing concurrent requests + * for different sinks to block. + */ + final private SinkMemoizer memo; - final UUID sinkUUID = locator.getDataServiceUUID(); + /** + * Inner implementation invoked from the {@link Memoizer}. + * + * @param locator + * The shard locator. + * + * @return The sink which will write on the downstream {@link JoinTask} + * running on the node for that shard. + * + * @throws ExecutionException + * @throws InterruptedException + */ + private JoinTaskSink _getSink(final PartitionLocator locator) throws InterruptedException, ExecutionException { + + /* + * Allocate/discover JoinTask on the target data service and + * obtain a sink reference for its future and buffers. + * + * Note: The JoinMasterTask uses very similar logic to setup the + * first join dimension. Of course, it gets to assume that there + * is no such JoinTask in existence at the time. + */ - final IDataService dataService; - if (sinkUUID.equals(fed.getServiceUUID())) { + final int nextOrderIndex = orderIndex + 1; - /* - * As an optimization, special case when the downstream - * data service is _this_ data service. - */ - dataService = (IDataService)fed.getService(); - - } else { - - dataService = fed.getDataService(sinkUUID); - - } + if (DEBUG) + log.debug("Creating join task: nextOrderIndex=" + + nextOrderIndex + ", indexName=" + + nextScaleOutIndexName + ", partitionId=" + + locator.getPartitionId()); - sink = new JoinTaskSink(fed, locator, this); + final UUID sinkUUID = locator.getDataServiceUUID(); - /* - * Export async iterator proxy. - * - * Note: This proxy is used by the sink to draw chunks from the - * source JoinTask(s). - */ - final IAsynchronousIterator<IBindingSet[]> sourceItrProxy; - if (fed.isDistributed()) { + final IDataService dataService; + if (sinkUUID.equals(fed.getServiceUUID())) { - sourceItrProxy = ((AbstractDistributedFederation) fed) - .getProxy(sink.blockingBuffer.iterator(), joinNexus - .getBindingSetSerializer(), joinNexus - .getChunkOfChunksCapacity()); + /* + * As an optimization, special case when the downstream + * data service is _this_ data service. + */ + dataService = (IDataService)fed.getService(); + + } else { + + dataService = fed.getDataService(sinkUUID); + + } - } else { + final JoinTaskSink sink = new JoinTaskSink(fed, locator, this); - sourceItrProxy = sink.blockingBuffer.iterator(); + /* + * Export async iterator proxy. + * + * Note: This proxy is used by the sink to draw chunks from the + * source JoinTask(s). + */ + final IAsynchronousIterator<IBindingSet[]> sourceItrProxy; + if (fed.isDistributed()) { - } + sourceItrProxy = ((AbstractDistributedFederation<?>) fed) + .getProxy(sink.blockingBuffer.iterator(), joinNexus + .getBindingSetSerializer(), joinNexus + .getChunkOfChunksCapacity()); - // the future for the factory task (not the JoinTask). - final Future factoryFuture; - try { + } else { - final JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask( - nextScaleOutIndexName, rule, joinNexus - .getJoinNexusFactory(), order, nextOrderIndex, - locator.getPartitionId(), masterProxy, masterUUID, - sourceItrProxy, keyOrders, requiredVars); + sourceItrProxy = sink.blockingBuffer.iterator(); - // submit the factory task, obtain its future. - factoryFuture = dataService.submit(factoryTask); + } - } catch (IOException ex) { + // the future for the factory task (not the JoinTask). + final Future<?> factoryFuture; + try { - // RMI problem. - throw new RuntimeException(ex); + final JoinTaskFactoryTask factoryTask = new JoinTaskFactoryTask( + nextScaleOutIndexName, rule, joinNexus + .getJoinNexusFactory(), order, nextOrderIndex, + locator.getPartitionId(), masterProxy, masterUUID, + sourceItrProxy, keyOrders, requiredVars); - } + // submit the factory task, obtain its future. + factoryFuture = dataService.submit(factoryTask); - /* - * Obtain the future for the JoinTask from the factory task's - * Future. - */ + } catch (IOException ex) { - sink.setFuture((Future) factoryFuture.get()); + // RMI problem. + throw new RuntimeException(ex); - stats.fanOut++; - - sinkCache.put(locator, sink); - -// } finally { -// -// try { -// lock.unlock(); -// } catch (IOException ex) { -// throw new RuntimeException(ex); -// } -// -// } - } + /* + * Obtain the future for the JoinTask from the factory task's + * Future. + */ + + sink.setFuture((Future<?>) factoryFuture.get()); + + stats.fanOut++; + return sink; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java 2010-08-17 23:18:24 UTC (rev 3445) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java 2010-08-18 11:40:10 UTC (rev 3446) @@ -1,7 +1,6 @@ package com.bigdata.relation.rule.eval.pipeline; import java.util.Iterator; -import java.util.concurrent.ExecutionException; import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; @@ -31,7 +30,7 @@ /** The tailIndex of the next predicate to be evaluated. */ final int nextTailIndex; - final IBigdataFederation fed; + final IBigdataFederation<?> fed; /** * @@ -39,7 +38,7 @@ * @param joinTask * @param capacity */ - public UnsyncDistributedOutputBuffer(final AbstractScaleOutFederation fed, + public UnsyncDistributedOutputBuffer(final AbstractScaleOutFederation<?> fed, final DistributedJoinTask joinTask, final int capacity) { super(capacity); @@ -92,7 +91,7 @@ int bindingSetsOut = 0; // the next predicate to be evaluated. - final IPredicate nextPred = joinTask.rule.getTail(nextTailIndex); + final IPredicate<?> nextPred = joinTask.rule.getTail(nextTailIndex); final IJoinNexus joinNexus = joinTask.joinNexus; @@ -130,8 +129,6 @@ sink = joinTask.getSink(locator); } catch (InterruptedException ex) { throw new RuntimeException(ex); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); } // add binding set to the sink. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |