From: <tho...@us...> - 2010-09-16 19:43:17
|
Revision: 3573 http://bigdata.svn.sourceforge.net/bigdata/?rev=3573&view=rev Author: thompsonbry Date: 2010-09-16 19:43:08 +0000 (Thu, 16 Sep 2010) Log Message: ----------- Tracked down some problems with distributed query evaluation and added more test suites. Broke out the "map binding sets over shards" capability into its own package, fixed a bug where it was failing on predicates which were only partly bound, updated the unit tests, refactored the implementation to include an interface which may be used to realize a variety of different algorithms for efficiently mapping binding sets across shards, detailed several such implementations, and provided two such implementations - one for fully bound predicates and another which is a general purpose technique and is what we had been using historically. Several of the described algorithms can be significantly more efficient for various conditions. I have filed an issue to implement and test these various alternative algorithms. See https://sourceforge.net/apps/trac/bigdata/ticket/162. Modified the PipelineOp#newBuffer() method to accept the BOpStats from the caller and to wrap the buffer such that it automatically tracks the #of written units and chunks. This was necessary for some operators where we otherwise did not have the necessary scope to properly track those statistics. I plan to do a similar thing with the source. Fixed some problems with SliceOp and how binding sets are routed to the query controller. Still working through the distributed query evaluation test suite. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractNode.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AbstractUnsynchronizedArrayBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/UnsyncLocalOutputBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/UnsyncDistributedOutputBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ndx/ISplitter.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_AsGivenPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_FullyBoundPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_GroupByLocatorScan.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_LowShardCount.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_NestedLocatorScan.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/IShardMapper.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Splitter.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestBOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/nodes/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/nodes/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/nodes/TestMapBindingSetsOverNodes.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestMapBindingSetsOverNodes.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestMapBindingSetsOverShards.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -137,7 +137,7 @@ * * @return The value of the annotation. * - * @throws IllegalArgumentException + * @throws IllegalStateException * if the named annotation is not bound. * * @todo Note: This variant without generics is required for some java @@ -153,6 +153,14 @@ BOp clone(); /** + * Return the {@link Annotations#BOP_ID}. + * + * @throws IllegalStateException + * if that annotation is not bound. + */ + int getId(); + + /** * Return the evaluation context for the operator. The default is * {@link BOpEvaluationContext#ANY}. Operators which must be mapped against * shards, mapped against nodes, or evaluated on the query controller must Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -323,12 +323,18 @@ final Object tmp = annotations.get(name); if (tmp == null) - throw new IllegalArgumentException("Required property: " + name); + throw new IllegalStateException("Required property: " + name); return tmp; } + public int getId() { + + return (Integer) getRequiredProperty(Annotations.BOP_ID); + + } + public String toString() { final StringBuilder sb = new StringBuilder(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -236,6 +236,9 @@ * @todo modify to accept {@link IChunkMessage} or an interface available * from getChunk() on {@link IChunkMessage} which provides us with * flexible mechanisms for accessing the chunk data. + * <p> + * When doing that, modify to automatically track the {@link BOpStats} + * as the <i>source</i> is consumed. */ // * @throws IllegalArgumentException // * if the <i>indexManager</i> is <code>null</code> Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -59,9 +59,13 @@ * operators which write on the database) then the operator MAY return an * immutable empty buffer. * + * @param stats + * The statistics on this object will automatically be updated as + * elements and chunks are output onto the returned buffer. + * * @return The buffer. */ - IBlockingBuffer<E[]> newBuffer(); + IBlockingBuffer<E[]> newBuffer(BOpStats stats); /** * Return a {@link FutureTask} which computes the operator against the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -209,11 +209,68 @@ } - public IBlockingBuffer<E[]> newBuffer() { + public IBlockingBuffer<E[]> newBuffer(final BOpStats stats) { - return new BlockingBuffer<E[]>(getChunkOfChunksCapacity(), - getChunkCapacity(), getChunkTimeout(), chunkTimeoutUnit); + if (stats == null) + throw new IllegalArgumentException(); + + return new BlockingBufferWithStats<E[]>(getChunkOfChunksCapacity(), + getChunkCapacity(), getChunkTimeout(), chunkTimeoutUnit, stats); } + private static class BlockingBufferWithStats<E> extends BlockingBuffer<E> { + + private final BOpStats stats; + + /** + * @param chunkOfChunksCapacity + * @param chunkCapacity + * @param chunkTimeout + * @param chunktimeoutunit + * @param stats + */ + public BlockingBufferWithStats(int chunkOfChunksCapacity, + int chunkCapacity, long chunkTimeout, + TimeUnit chunktimeoutunit, final BOpStats stats) { + + this.stats = stats; + + } + + /** + * Overridden to track {@link BOpStats#unitsOut} and + * {@link BOpStats#chunksOut}. + * <p> + * Note: {@link BOpStats#chunksOut} will report the #of chunks added to + * this buffer. However, the buffer MAY combine chunks either on add() + * or when drained by the iterator so the actual #of chunks read back + * from the iterator MAY differ. + * <p> + * {@inheritDoc} + */ + @Override + public boolean add(final E e, final long timeout, final TimeUnit unit) + throws InterruptedException { + + final boolean ret = super.add(e, timeout, unit); + + if (e.getClass().getComponentType() != null) { + + stats.unitsOut.add(((Object[]) e).length); + + } else { + + stats.unitsOut.increment(); + + } + + stats.chunksOut.increment(); + + return ret; + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -183,8 +183,8 @@ sink.add(def); else sink.add(Arrays.copyOf(def, ndef)); - stats.chunksOut.increment(); - stats.unitsOut.add(ndef); +// stats.chunksOut.increment(); +// stats.unitsOut.add(ndef); } if (nalt > 0) { @@ -192,8 +192,8 @@ sink2.add(alt); else sink2.add(Arrays.copyOf(alt, nalt)); - stats.chunksOut.increment(); - stats.unitsOut.add(nalt); +// stats.chunksOut.increment(); +// stats.unitsOut.add(nalt); } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -36,6 +36,7 @@ import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkAccessor; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -82,40 +83,39 @@ } /** - * Copy the source to the sink. + * Copy the source to the sink. + * + * @todo Optimize this. When using an {@link IChunkAccessor} we should be + * able to directly output the same chunk. */ static private class CopyTask implements Callable<Void> { - private final BOpStats stats; + private final BOpContext<IBindingSet> context; - private final IAsynchronousIterator<IBindingSet[]> source; - - private final IBlockingBuffer<IBindingSet[]> sink; - CopyTask(final BOpContext<IBindingSet> context) { - stats = context.getStats(); + this.context = context; - this.source = context.getSource(); - - this.sink = context.getSink(); - } public Void call() throws Exception { + final IAsynchronousIterator<IBindingSet[]> source = context.getSource(); + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); try { + final BOpStats stats = context.getStats(); while (source.hasNext()) { final IBindingSet[] chunk = source.next(); stats.chunksIn.increment(); stats.unitsIn.add(chunk.length); sink.add(chunk); - stats.chunksOut.increment(); - stats.unitsOut.add(chunk.length); +// stats.chunksOut.increment(); +// stats.unitsOut.add(chunk.length); } sink.flush(); return null; } finally { sink.close(); + source.close(); } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -106,7 +106,7 @@ chunksIn.add(o.chunksIn.get()); unitsIn.add(o.unitsIn.get()); unitsOut.add(o.unitsOut.get()); - chunksOut.add(o.chunksIn.get()); + chunksOut.add(o.chunksOut.get()); } public String toString() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -34,12 +34,15 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import com.bigdata.bop.BOp; +import com.bigdata.util.InnerCause; /** * The run state for a {@link RunningQuery}. This class is NOT thread-safe. @@ -83,6 +86,24 @@ private final UUID queryId; /** + * The query deadline. + * + * @see BOp.Annotations#TIMEOUT + * @see RunningQuery#getDeadline() + */ + private final long deadline; + + /** + * Set to <code>true</code> iff the query evaluation is complete due to + * normal termination. + * <p> + * Note: This is package private to expose it to {@link RunningQuery}. + * + * @see #haltOp(HaltOpMessage) + */ + /*private*/ final AtomicBoolean allDone = new AtomicBoolean(false); + + /** * The #of run state transitions which have occurred for this query. */ private long nsteps = 0; @@ -131,6 +152,8 @@ this.queryId = query.getQueryId(); + this.deadline = query.getDeadline(); + // this.nops = query.bopIndex.size(); } @@ -193,8 +216,11 @@ /** * @return <code>true</code> if this is the first time we will evaluate the * op. + * + * @throws TimeoutException + * if the deadline for the query has passed. */ - public boolean startOp(final StartOpMessage msg) { + public boolean startOp(final StartOpMessage msg) throws TimeoutException { nsteps++; @@ -257,35 +283,40 @@ // + ",fanIn=" + msg.nchunks); if (TableLog.tableLog.isInfoEnabled()) { - TableLog.tableLog -.info(getTableRow("startOp", msg.serviceId, + TableLog.tableLog.info(getTableRow("startOp", msg.serviceId, msg.bopId, msg.partitionId, msg.nchunks/* fanIn */, null/* cause */, null/* stats */)); } // check deadline. - final long deadline = query.getDeadline(); - if (deadline < System.currentTimeMillis()) { if (log.isTraceEnabled()) log.trace("expired: queryId=" + queryId + ", deadline=" + deadline); - query.future.halt(new TimeoutException()); + throw new TimeoutException(); - query.cancel(true/* mayInterruptIfRunning */); - } return firstTime; } /** - * Update termination criteria counters. @return <code>true</code> if the - * operator life cycle is over. + * Update termination criteria counters. If the query evaluation is over due + * to normal termination then {@link #allDone} is set to <code>true</code> + * as a side effect. + * + * @return <code>true</code> if the operator life cycle is over. + * + * @throws TimeoutException + * if the deadline has expired. + * @throws ExecutionException + * if the {@link HaltOpMessage#cause} was non-<code>null</code>, + * if which case it wraps {@link HaltOpMessage#cause}. */ - public boolean haltOp(final HaltOpMessage msg) { + public boolean haltOp(final HaltOpMessage msg) throws TimeoutException, + ExecutionException { nsteps++; @@ -354,9 +385,6 @@ } - // Figure out if this operator is done. - final boolean isDone = isOperatorDone(msg.bopId); - // System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId // + ",totalRunningTaskCount=" + totalRunningTaskCount // + ",totalAvailableTaskCount=" + totalAvailableChunkCount @@ -378,41 +406,53 @@ /* * Test termination criteria */ - final long deadline = query.getDeadline(); + + // true if this operator is done. + final boolean isOpDone = isOperatorDone(msg.bopId); + // true if the entire query is done. + final boolean isAllDone = totalRunningTaskCount == 0 + && totalAvailableChunkCount == 0; + if (msg.cause != null) { - // operator failed on this chunk. - log.error("Error: Canceling query: queryId=" + queryId + ",bopId=" - + msg.bopId + ",partitionId=" + msg.partitionId, msg.cause); +// /* +// * @todo probably just wrap and throw rather than logging since this +// * class does not have enough insight into non-error exceptions +// * while Haltable does. +// */ +// if (!InnerCause.isInnerCause(msg.cause, InterruptedException.class) +// && !InnerCause.isInnerCause(msg.cause, +// TimeoutException.class)) { +// +// // operator failed on this chunk. +// log.error("Error: Canceling query: queryId=" + queryId +// + ",bopId=" + msg.bopId + ",partitionId=" +// + msg.partitionId, msg.cause); +// } - query.future.halt(msg.cause); + throw new ExecutionException(msg.cause); - query.cancel(true/* mayInterruptIfRunning */); + } else if (isAllDone) { - } else if (totalRunningTaskCount == 0 && totalAvailableChunkCount == 0) { - // success (all done). if (log.isTraceEnabled()) log.trace("success: queryId=" + queryId); - query.future.halt(query.getStats()); - - query.cancel(true/* mayInterruptIfRunning */); - + this.allDone.set(true); + } else if (deadline < System.currentTimeMillis()) { if (log.isTraceEnabled()) log.trace("expired: queryId=" + queryId + ", deadline=" + deadline); - query.future.halt(new TimeoutException()); + throw new TimeoutException(); - query.cancel(true/* mayInterruptIfRunning */); - } - return isDone; + return isOpDone; + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -38,6 +38,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -63,7 +64,7 @@ /** * Metadata about running queries. */ -public class RunningQuery implements Future<Map<Integer,BOpStats>>, IRunningQuery { +public class RunningQuery implements Future<Void>, IRunningQuery { private final static transient Logger log = Logger .getLogger(RunningQuery.class); @@ -75,20 +76,6 @@ .getLogger(ChunkTask.class); /** - * The run state of the query and the result of the computation iff it - * completes execution normally (without being interrupted, cancelled, etc). - * <p> - * Note: Package private in order to expose this field to {@link RunState}. - */ - final /*private*/ Haltable<Map<Integer,BOpStats>> future = new Haltable<Map<Integer,BOpStats>>(); - - /** - * The runtime statistics for each {@link BOp} in the query and - * <code>null</code> unless this is the query controller. - */ - final private ConcurrentHashMap<Integer/* bopId */, BOpStats> statsMap; - - /** * The class executing the query on this node. */ final private QueryEngine queryEngine; @@ -123,20 +110,15 @@ final private BindingSetPipelineOp query; /** - * The buffer used for the overall output of the query pipeline. - * <p> - * Note: In scale out, this only exists on the query controller. In order to - * ensure that the results are transferred to the query controller, the - * top-level operator in the query plan must specify - * {@link BOpEvaluationContext#CONTROLLER}. For example, {@link SliceOp} - * uses this {@link BOpEvaluationContext}. + * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. */ - final private IBlockingBuffer<IBindingSet[]> queryBuffer; + protected final Map<Integer, BOp> bopIndex; /** - * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. + * The run state of the query and the result of the computation iff it + * completes execution normally (without being interrupted, cancelled, etc). */ - protected final Map<Integer, BOp> bopIndex; + final private Haltable<Void> future = new Haltable<Void>(); /** * A collection of {@link Future}s for currently executing operators for @@ -145,6 +127,23 @@ private final ConcurrentHashMap<BSBundle, Future<?>> operatorFutures = new ConcurrentHashMap<BSBundle, Future<?>>(); /** + * The runtime statistics for each {@link BOp} in the query and + * <code>null</code> unless this is the query controller. + */ + final private ConcurrentHashMap<Integer/* bopId */, BOpStats> statsMap; + + /** + * The buffer used for the overall output of the query pipeline. + * <p> + * Note: In scale out, this only exists on the query controller. In order to + * ensure that the results are transferred to the query controller, the + * top-level operator in the query plan must specify + * {@link BOpEvaluationContext#CONTROLLER}. For example, {@link SliceOp} + * uses this {@link BOpEvaluationContext}. + */ + final private IBlockingBuffer<IBindingSet[]> queryBuffer; + + /** * A lock guarding {@link RunState#totalRunningTaskCount}, * {@link RunState#totalAvailableChunkCount}, * {@link RunState#availableChunkCountMap}. This is <code>null</code> unless @@ -159,6 +158,11 @@ * query controller. */ final private RunState runState; + + /** + * Flag used to prevent retriggering of {@link #lifeCycleTearDownQuery()}. + */ + final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); /** * The chunks available for immediate processing (they must have been @@ -193,13 +197,18 @@ // set the deadline. if (!this.deadline .compareAndSet(Long.MAX_VALUE/* expect */, deadline/* update */)) { + // the deadline is already set. throw new IllegalStateException(); + } if (deadline < System.currentTimeMillis()) { + // deadline has already expired. + future.halt(new TimeoutException()); cancel(true/* mayInterruptIfRunning */); + } } @@ -252,7 +261,9 @@ * Return the operator tree for this query. */ public BindingSetPipelineOp getQuery() { + return query; + } /** @@ -276,11 +287,23 @@ } /** - * + * @param queryEngine + * The {@link QueryEngine} on which the query is running. In + * scale-out, a query is typically instantiated on many + * {@link QueryEngine}s. * @param queryId - * @param begin + * The identifier for that query. + * @param controller + * <code>true</code> iff the {@link QueryEngine} is the query + * controller for this query (the {@link QueryEngine} which will + * coordinate the query evaluation). * @param clientProxy + * The query controller. In standalone, this is the same as the + * <i>queryEngine</i>. In scale-out, this is a proxy for the + * query controller whenever the query is instantiated on a node + * other than the query controller itself. * @param query + * The query. * * @throws IllegalArgumentException * if any argument is <code>null</code>. @@ -318,20 +341,42 @@ this.query = query; - this.bopIndex = BOpUtility.getIndex(query); + bopIndex = BOpUtility.getIndex(query); - this.statsMap = controller ? new ConcurrentHashMap<Integer, BOpStats>() + statsMap = controller ? new ConcurrentHashMap<Integer, BOpStats>() : null; runStateLock = controller ? new ReentrantLock() : null; runState = controller ? new RunState(this) : null; - // Note: only exists on the query controller. - this.queryBuffer = controller ? newQueryBuffer() : null; - -// System.err -// .println("new RunningQuery:: queryId=" + queryId + if (controller) { + + final BOpStats queryStats = query.newStats(); + + statsMap.put((Integer) query + .getRequiredProperty(BOp.Annotations.BOP_ID), queryStats); + + if (!query.isMutation()) { + + queryBuffer = query.newBuffer(queryStats); + + } else { + + // Note: Not used for mutation queries. + queryBuffer = null; + + } + + } else { + + // Note: only exists on the query controller. + queryBuffer = null; + + } + + // System.err + // .println("new RunningQuery:: queryId=" + queryId // + ", isController=" + controller + ", queryController=" // + clientProxy + ", queryEngine=" // + queryEngine.getServiceUUID()); @@ -339,22 +384,6 @@ } /** - * Return the buffer on which the solutions will be written (if any). This - * is based on the top-level operator in the query plan. - * - * @return The buffer for the solutions -or- <code>null</code> if the - * top-level operator in the query plan is a mutation operator. - */ - protected IBlockingBuffer<IBindingSet[]> newQueryBuffer() { - - if (query.isMutation()) - return null; - - return ((BindingSetPipelineOp) query).newBuffer(); - - } - - /** * Take a chunk generated by some pass over an operator and make it * available to the target operator. How this is done depends on whether the * query is running against a standalone database or the scale-out database. @@ -372,10 +401,10 @@ * @param sink * The intermediate results to be passed to that target operator. * - * @return The #of chunks made available for consumption by the sink. This - * will always be ONE (1) for scale-up. For scale-out, there will be - * one chunk per index partition over which the intermediate results - * were mapped. + * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) + * for scale-up. For scale-out, there will be at least one + * {@link IChunkMessage} per index partition over which the + * intermediate results were mapped. */ protected <E> int handleOutputChunk(final int sinkId, final IBlockingBuffer<IBindingSet[]> sink) { @@ -478,6 +507,11 @@ if (runState.startOp(msg)) lifeCycleSetUpOperator(msg.bopId); + } catch(TimeoutException ex) { + + future.halt(ex); + cancel(true/* mayInterruptIfRunning */); + } finally { runStateLock.unlock(); @@ -508,6 +542,8 @@ if (tmp != null) tmp.add(msg.taskStats); + Throwable cause = null; + boolean allDone = false; runStateLock.lock(); try { @@ -520,14 +556,53 @@ */ lifeCycleTearDownOperator(msg.bopId); + + if(runState.allDone.get()) { + + allDone = true; + + } } - + + } catch(Throwable ex) { + + cause = ex; + } finally { runStateLock.unlock(); } + + /* + * Handle query termination once we have released the runStateLock. + * + * Note: In scale-out, query termination can involve RMI to the nodes on + * which query operators are known to be running and to nodes on which + * resources were allocated which were scoped to the query or an + * operator's evaluation. Those RMI messages should not go out while we + * are holding the runStateLock since that could cause deadlock with + * call backs on haltOp() from the query peers for that query. + */ + + if (cause != null) { + + /* + * Timeout, interrupted, operator error, or internal error in + * RunState. + */ + + future.halt(cause); + cancel(true/* mayInterruptIfRunning */); + + } else if (allDone) { + + // Normal termination. + future.halt((Void) null); + cancel(true/* mayInterruptIfRunning */); + + } } @@ -753,13 +828,15 @@ + bop); } - sink = (p == null ? queryBuffer : op.newBuffer()); + final BOpStats stats = op.newStats(); + + sink = (p == null ? queryBuffer : op.newBuffer(stats)); - altSink = altSinkId == null ? null : op.newBuffer(); + altSink = altSinkId == null ? null : op.newBuffer(stats); // context : @todo pass in IChunkMessage or IChunkAccessor context = new BOpContext<IBindingSet>(RunningQuery.this, - partitionId, op.newStats(), msg.getChunkAccessor() + partitionId, stats, msg.getChunkAccessor() .iterator(), sink, altSink); // FutureTask for operator execution (not running yet). @@ -903,11 +980,7 @@ * <p> * Since this involves RMI to the nodes, we should not issue those RMIs * while holding the {@link #runStateLock} (and this could even deadlock - * with callback from those nodes). Perhaps - * {@link RunState#haltOp(HaltOpMessage)} should throw back the - * {@link HaltOpMessage} or a {@link TimeoutException} if the deadline has - * expired and then let {@link RunningQuery#haltOp(HaltOpMessage)} handle - * the termination of the query, which it can do without holding the lock. + * with call back from those nodes). * <p> * When the controller sends a node a terminate signal for an operator, it * should not bother to RMI back to the controller (unless this is done for @@ -931,22 +1004,24 @@ // close the output sink. queryBuffer.close(); } - // life cycle hook for the end of the query. - lifeCycleTearDownQuery(); + if(didQueryTearDown.compareAndSet(false/*expect*/, true/*update*/)) { + // life cycle hook for the end of the query. + lifeCycleTearDownQuery(); + } // remove from the collection of running queries. queryEngine.runningQueries.remove(queryId, this); // true iff we cancelled something. return cancelled; } - final public Map<Integer, BOpStats> get() throws InterruptedException, + final public Void get() throws InterruptedException, ExecutionException { return future.get(); } - final public Map<Integer, BOpStats> get(long arg0, TimeUnit arg1) + final public Void get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { return future.get(arg0, arg1); @@ -977,4 +1052,18 @@ } + public String toString() { + final StringBuilder sb = new StringBuilder(getClass().getName()); + sb.append("{queryId=" + queryId); + sb.append(",deadline=" + deadline.get()); + sb.append(",isDone=" + isDone()); + sb.append(",isCancelled=" + isCancelled()); + sb.append(",runState=" + runState); + sb.append(",controller=" + controller); + sb.append(",clientProxy=" + clientProxy); + sb.append(",query=" + query); + sb.append("}"); + return sb.toString(); + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -47,6 +47,7 @@ import com.bigdata.bop.engine.IQueryPeer; import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.bop.fed.shards.MapBindingSetsOverShardsBuffer; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.journal.TemporaryStoreFactory; @@ -55,6 +56,7 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IBuffer; +import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; import com.bigdata.resources.ResourceManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ResourceService; @@ -363,6 +365,14 @@ * {@link ByteBuffer} and notifying the receiving service that there are * intermediate results which it can pull when it is ready to process them. * This pattern allows the receiver to impose flow control on the producer. + * + * @todo Figure out how (or if) we will combine binding set streams emerging + * from concurrent tasks executing on a given node destined for the + * same shard/node. (There is code in the {@link DistributedJoinTask} + * which does this for the same shard, but it does it on the receiver + * side.) Pay attention to the #of threads running in the join, the + * potential concurrency of threads targeting the same (bopId,shardId) + * and how to best combine their data together. */ @Override protected <E> int handleOutputChunk(final int sinkId, @@ -405,20 +415,33 @@ * * @todo Set the capacity of the the "map" buffer to the size of the * data contained in the sink (in fact, we should just process the - * sink data in place). + * sink data in place using an expanded IChunkAccessor interface). + * + * @todo high volume operators will need different capacity + * parameters. + * + * FIXME the chunkSize will limit us to RMI w/ the payload inline + * when it is the same as the threshold for NIO chuck transfers. + * This needs to be adaptive and responsive to the actual data scale + * of the operator's outputs */ @SuppressWarnings("unchecked") final IPredicate<E> pred = ((IShardwisePipelineOp) bop).getPredicate(); final IKeyOrder<E> keyOrder = pred.getKeyOrder(); final long timestamp = pred.getTimestamp(); final int capacity = 1000;// @todo - final int capacity2 = 1000;// @todo + final int chunkOfChunksCapacity = 10;// @todo small queue + final int chunkSize = 100;// @todo modest chunks. final MapBindingSetsOverShardsBuffer<IBindingSet, E> mapper = new MapBindingSetsOverShardsBuffer<IBindingSet, E>( getFederation(), pred, keyOrder, timestamp, capacity) { @Override - IBuffer<IBindingSet[]> newBuffer(final PartitionLocator locator) { - // @todo chunkCapacity and chunkOfChunksCapacity plus timeout stuff. - return new BlockingBuffer<IBindingSet[]>(capacity2); + protected IBuffer<IBindingSet[]> newBuffer(final PartitionLocator locator) { + return new BlockingBuffer<IBindingSet[]>( + chunkOfChunksCapacity,// + chunkSize,// + BlockingBuffer.DEFAULT_CONSUMER_CHUNK_TIMEOUT,// + BlockingBuffer.DEFAULT_CONSUMER_CHUNK_TIMEOUT_UNIT// + ); } }; /* @@ -454,17 +477,11 @@ * * @todo This stage should probably be integrated with the stage * which maps the binding sets over the shards (immediately above) - * to minimize copying or visiting in the data. - * - * FIXME Review the definition of an "output chunk" from the - * perspective of the atomic query termination decision. I think - * that it probably corresponds to a "message" sent to a node. For - * each message sent, we must later observe the evaluate of the - * operator on that node+shard. If the receiver is permitted to - * combine messages, then it must tell us how many messages were - * consumed. + * to minimize copying or visiting in the data. This could be done + * by hooking the method which outputs a chunk to instead directly + * send the IChunkMessage. */ - int nchunksout = 0; + int messageSendCount = 0; for (Map.Entry<PartitionLocator, IBuffer<IBindingSet[]>> e : mapper .getSinks().entrySet()) { @@ -484,11 +501,11 @@ sendChunkMessage(locator.getDataServiceUUID(), sinkId, locator .getPartitionId(), allocationContext, shardSink); - nchunksout++; + messageSendCount++; } - return nchunksout; + return messageSendCount; } case CONTROLLER: { Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-16 19:40:48 UTC (rev 3572) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-16 19:43:08 UTC (rev 3573) @@ -1,499 +0,0 @@ -package com.bigdata.bop.fed; - -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; - -import org.apache.log4j.Logger; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IPredicate; -import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.btree.BytesUtil; -import com.bigdata.btree.IIndex; -import com.bigdata.btree.keys.IKeyBuilder; -import com.bigdata.journal.NoSuchIndexException; -import com.bigdata.journal.TimestampUtility; -import com.bigdata.mdi.IMetadataIndex; -import com.bigdata.mdi.PartitionLocator; -import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; -import com.bigdata.relation.accesspath.IBuffer; -import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; -import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.Split; -import com.bigdata.service.ndx.AbstractSplitter; -import com.bigdata.striterator.IKeyOrder; - -/** - * Unsynchronized (non-thread safe) buffer maps the {@link IBindingSet}s across - * the index partition(s) associated with an {@link IPredicate} and - * {@link IKeyOrder}. For each source chunk, "as bound" versions of the target - * {@link IPredicate} are constructed and the {@link IBindingSet}s in the chunk - * are reordered based on {@link IKeyOrder#getFromKey(IKeyBuilder, IPredicate)} - * for each asBound predicate. The {@link PartitionLocator}s are discovered for - * each fromKey using an ordered locator scan and the binding sets are output - * onto a shard or node specific {@link IBuffer} created by a concrete subclass. - * The subclass is responsible for getting the binding sets from this node onto - * the node associated with each output buffer. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id: UnsyncDistributedOutputBuffer.java 3448 2010-08-18 20:55:58Z - * thompsonbry $ - * @param <E> - * The generic type of the elements in the buffer. - * @param <F> - * The generic type of the elements in the relation associated with - * the {@link IPredicate}. - * - * @todo This could be refactored such that it no longer implemented - * {@link IBuffer} but instead was a {@link BOp} with binding sets - * streaming in from its source. However, unlike a normal {@link BOp} it - * would have a compound sink and it would have to be tightly integrated - * with the {@link QueryEngine} to be used. - * - * @todo Figure out how we will combine binding set streams emerging from - * concurrent tasks executing on a given node destined for the same - * shard/node. (There is code in the {@link DistributedJoinTask} which - * does this for the same shard, but it does it on the receiver side.) Pay - * attention to the #of threads running in the join, the potential - * concurrency of threads targeting the same (bopId,shardId) and how to - * best combine their data together. - * - * @todo Optimize locator lookup by caching in {@link AbstractSplitter} and look - * at the code path for obtaining {@link PartitionLocator}s from the MDI. - * <p> - * For reads, we are permitted to cache the locators just as much as we - * like (but indirection would be introduced by a shared disk - * architecture). - * <p> - * For writes (or in a shard disk architecture) it is possible that the - * target shard will have moved by the time the receiver has notice of the - * intent to write on that shard or once the receiver has accepted the - * binding sets for that shard. The logic which moves the binding sets - * around will have to handle such 'stale locator' exceptions - * automatically. - * - * @todo This is not tracking the #of output chunks or the fanOut (#of - * shards/nodes which will receive binding sets). Given that the query - * engine will be managing the buffers on which the data are written, it - * might also update the appropriate statistics. - */ -public abstract class MapBindingSetsOverShardsBuffer<E extends IBindingSet, F> - extends AbstractUnsynchronizedArrayBuffer<E> { - - private static transient final Logger log = Logger.getLogger(MapBindingSetsOverShardsBuffer.class); - - /** - * The predicate from which we generate the asBound binding sets. This - * predicate and the {@link IKeyOrder} together determine the required - * access path. - */ - private final IPredicate<F> pred; - - /** - * Identifies the index for the access path required by the {@link #pred - * predicate}. - */ - private final IKeyOrder<F> keyOrder; - - /** - * The timestamp associated with the operation on the target access path. If - * the binding sets will be used to read on the shards of the target access - * path, then this is the read timestamp. If they will be used to write on - * the target access path, then this is the write timestamp. - */ - private final long timestamp; - - /** - * The {@link IKeyBuilder} for the index associated with the access path - * required by the predicate. - */ - private final IKeyBuilder keyBuilder; - - /** - * Used to efficient assign binding sets to index partitions. - */ - private final Splitter splitter; - -// /** -// */ -// private final BOpStats stats; - - /** - * @param fed - * The federation. - * @param pred - * The predicate associated with the target operator. The - * predicate identifies which variables and/or constants form the - * key for the access path and hence selects the shards on which - * the target operator must read or write. For example, when the - * target operator is a JOIN, this is the {@link IPredicate} - * associated with the right hand operator of the join. - * @param keyOrder - * Identifies the access path for the target predicate. - * @param timestamp - * The timestamp associated with the operation on the target - * access path. If the binding sets will be used to read on the - * shards of the target access path, then this is the read - * timestamp. If they will be used to write on the target access - * path, then this is the write timestamp. - * @param capacity - * The capacity of this buffer. - */ - public MapBindingSetsOverShardsBuffer( - final IBigdataFederation<?> fed,// - final IPredicate<F> pred, // - final IKeyOrder<F> keyOrder,// - final long timestamp,// - final int capacity) { - - super(capacity); - - if (fed == null) - throw new IllegalArgumentException(); - - if (pred == null) - throw new IllegalArgumentException(); - - if (keyOrder == null) - throw new IllegalArgumentException(); - -// this.context = context; - - this.pred = pred; - - this.keyOrder = keyOrder; - - this.timestamp = timestamp; - - /* - * Note: we can use the read view of the relation to get the IKeyBuilder - * even if we will be writing on the relation since the IKeyBuilder - * semantics can not be readily changed once an index has been created. - */ - { - - @SuppressWarnings("unchecked") - final IRelation<F> relation = (IRelation<F>) fed - .getResourceLocator().locate(pred.getOnlyRelationName(), - timestamp); - - final IIndex index = relation.getIndex(keyOrder); - - this.keyBuilder = index.getIndexMetadata().getKeyBuilder(); - - } - - /* - * Resolve a scale-out view of the metadata index for the target - * predicate. - */ - { - - final String namespace = pred.getOnlyRelationName(); - - final IMetadataIndex mdi = fed.getMetadataIndex(namespace + "." - + keyOrder.getIndexName(), timestamp); - - if (mdi == null) { - - throw new NoSuchIndexException("name=" + namespace - + ", timestamp=" + TimestampUtility.toString(timestamp)); - - } - - this.splitter = new Splitter(mdi); - - } - -// this.stats = context.getStats(); - - } - - /** - * Helper class efficiently splits an array of sorted keys into groups - * associated with a specific index partition. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - static private class Splitter extends AbstractSplitter { - - private final IMetadataIndex mdi; - - public Splitter(final IMetadataIndex mdi) { - - if (mdi == null) - throw new IllegalArgumentException(); - - this.mdi = mdi; - - } - - @Override - protected IMetadataIndex getMetadataIndex(long ts) { - - return mdi; - - } - - } - - /** - * Helper class used to place the binding sets into order based on the - * {@link #fromKey} associated with the {@link #asBound} predicate. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private class Bundle implements Comparable<Bundle> { - - /** The binding set. */ - final IBindingSet bindingSet; - - /** The asBound predicate. */ - final IPredicate<F> asBound; - - /** The fromKey generated from that asBound predicate. */ - final byte[] fromKey; - - public Bundle(final IBindingSet bindingSet) { - - this.bindingSet = bindingSet; - - this.asBound = pred.asBound(bindingSet); - - this.fromKey = keyOrder.getFromKey(keyBuilder, asBound); - - } - - /** - * Imposes an unsigned byte[] order on the {@link #fromKey}. - */ - public int compareTo(final Bundle o) { - - return BytesUtil.compareBytes(this.fromKey, o.fromKey); - - } - - /** - * Implemented to shut up findbugs, but not used. - */ - @SuppressWarnings("unchecked") - public boolean equals(final Object o) { - - if (this == o) - return true; - - if (!(o instanceof MapBindingSetsOverShardsBuffer.Bundle)) - return false; - - final MapBindingSetsOverShardsBuffer.Bundle t = (MapBindingSetsOverShardsBuffer.Bundle) o; - - if (compareTo(t) != 0) - return false; - - if (!bindingSet.equals(t.bindingSet)) - return false; - - if (!asBound.equals(t.asBound)) - return false; - - return true; - - } - - /** - * Implemented to shut up find bugs. - */ - public int hashCode() { - - if (hash == 0) { - - hash = Arrays.hashCode(fromKey); - - } - - return hash; - - } - private int hash = 0; - - } - - /** - * Maps the chunk of {@link IBindingSet}s across the index partition(s) for - * the sink join dimension. - * - * @param a - * A chunk of {@link IBindingSet}s. - */ - protected void handleChunk(final E[] chunk) { - - @SuppressWarnings("unchecked") - final Bundle[] bundles = new MapBindingSetsOverShardsBuffer.Bundle[chunk.length]; - - /* - * Create the asBound version of the predicate and the associated - * fromKey for each bindingSet in the chunk. - */ - for (int i = 0; i < chunk.length; i++) { - - bundles[i] = new Bundle(chunk[i]); - - } - - /* - * Sort the binding sets in the chunk by the fromKey associated with - * each asBound predicate. - */ - Arrays.sort(bundles); - - /* - * Construct a byte[][] out of the sorted fromKeys and then generate - * slices (Splits) which group the binding sets based on the target - * shard. - */ - final LinkedList<Split> splits; - { - - final byte[][] keys = new byte[bundles.length][]; - - for (int i = 0; i < bundles.length; i++) { - - keys[i] = bundles[i].fromKey; - - } - - splits = splitter.splitKeys(timestamp, 0/* fromIndex */, - bundles.length/* toIndex */, keys); - - } - - if (log.isTraceEnabled()) - log.trace("nsplits=" + splits.size() + ", pred=" + pred); - - /* - * For each split, write the binding sets in that split onto the - * corresponding buffer. - */ - for (Split split : splits) { - - // Note: pmd is a PartitionLocator, so this cast is valid. - final IBuffer<IBindingSet[]> sink = getBuffer((PartitionLocator) split.pmd); - - final IBindingSet[] slice = new IBindingSet[split.ntuples]; - - for (int j = 0, i = split.fromIndex; i < split.toIndex; i++, j++) { - - final IBindingSet bset = bundles[i].bindingSet; - - slice[j] = bset; - - if (log.isTraceEnabled()) - log - .trace("Mapping: keyOrder=" + keyOrder + ",bset=" - + bset + " onto partitionId=" - + split.pmd.getPartitionId()); - - } - -// for (int i = split.fromIndex; i < split.toIndex; i++) { -// -// final Bundle bundle = bundles[i]; -// -// sink.add(bundle.bindingSet); -// -//// stats.unitsOut.increment(); -// -// } - - sink.add(slice); - - } - - } - - /** - * Extended to flush each buffer which targets a specific index partition as - * well. - * <p> - * {@inheritDoc} - */ - @Override - public long flush() { - - final long n = super.flush(); - - for (IBuffer<IBindingSet[]> sink : sinks.values()) { - - if (!sink.isEmpty()) - sink.flush(); - - } - - return n; - - } - - /** - * The allocated sinks. - * <p> - * Note: Since the collection is not thread-safe, synchronization is - * required when adding to the collection and when visiting the elements of - * the collection. However, the {@link MapBindingSetsOverShardsBuffer} is not - * thread-safe either so this should be Ok. - */ - private final LinkedHashMap<PartitionLocator, IBuffer<IBindingSet[]>/* sink */> sinks = new LinkedHashMap<PartitionLocator, IBuffer<IBindingSet[]>>(); - - /** - * An immutable view of the si... [truncated message content] |