From: <tho...@us...> - 2011-01-05 13:49:23
|
Revision: 4053 http://bigdata.svn.sourceforge.net/bigdata/?rev=4053&view=rev Author: thompsonbry Date: 2011-01-05 13:49:15 +0000 (Wed, 05 Jan 2011) Log Message: ----------- Mainly an (inadvertent) whitespace change. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-05 13:45:21 UTC (rev 4052) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-05 13:49:15 UTC (rev 4053) @@ -20,7 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ /* * Created on Aug 18, 2010 */ @@ -104,65 +104,66 @@ * suite. */ public class PipelineJoin<E> extends PipelineOp implements - IShardwisePipelineOp<E> { + IShardwisePipelineOp<E> { - static private final transient Logger log = Logger.getLogger(PipelineJoin.class); + static private final transient Logger log = Logger + .getLogger(PipelineJoin.class); - /** + /** * */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - public interface Annotations extends PipelineOp.Annotations { + public interface Annotations extends PipelineOp.Annotations { /** * The {@link IPredicate} which is used to generate the * {@link IAccessPath}s during the join. */ String PREDICATE = PipelineJoin.class.getName() + ".predicate"; - - /** - * An optional {@link IVariable}[] identifying the variables to be - * retained in the {@link IBindingSet}s written out by the operator. - * All variables are retained unless this annotation is specified. - */ - String SELECT = PipelineJoin.class.getName() + ".select"; - - /** - * An optional {@link IConstraint}[] which places restrictions on the - * legal patterns in the variable bindings. - */ - String CONSTRAINTS = PipelineJoin.class.getName() + ".constraints"; - /** - * Marks the join as "optional" in the SPARQL sense. Binding sets which - * fail the join will be routed to the alternative sink as specified by - * either {@link PipelineOp.Annotations#ALT_SINK_REF} or - * {@link PipelineOp.Annotations#ALT_SINK_GROUP}. - * - * @see #DEFAULT_OPTIONAL - */ - String OPTIONAL = PipelineJoin.class.getName() + ".optional"; + /** + * An optional {@link IVariable}[] identifying the variables to be + * retained in the {@link IBindingSet}s written out by the operator. All + * variables are retained unless this annotation is specified. + */ + String SELECT = PipelineJoin.class.getName() + ".select"; - boolean DEFAULT_OPTIONAL = false; + /** + * An optional {@link IConstraint}[] which places restrictions on the + * legal patterns in the variable bindings. + */ + String CONSTRAINTS = PipelineJoin.class.getName() + ".constraints"; - /** - * The maximum parallelism with which the pipeline will consume the - * source {@link IBindingSet}[] chunk. - * <p> - * Note: When ZERO (0), everything will run in the caller's - * {@link Thread}, but there will still be one thread per pipeline join - * task which is executing concurrently against different source chunks. - * When GT ZERO (0), tasks will run on an {@link ExecutorService} with - * the specified maximum parallelism. - * - * @see #DEFAULT_MAX_PARALLEL - */ - String MAX_PARALLEL = PipelineJoin.class.getName() + ".maxParallel"; + /** + * Marks the join as "optional" in the SPARQL sense. Binding sets which + * fail the join will be routed to the alternative sink as specified by + * either {@link PipelineOp.Annotations#ALT_SINK_REF} or + * {@link PipelineOp.Annotations#ALT_SINK_GROUP}. + * + * @see #DEFAULT_OPTIONAL + */ + String OPTIONAL = PipelineJoin.class.getName() + ".optional"; - int DEFAULT_MAX_PARALLEL = 0; + boolean DEFAULT_OPTIONAL = false; /** + * The maximum parallelism with which the pipeline will consume the + * source {@link IBindingSet}[] chunk. + * <p> + * Note: When ZERO (0), everything will run in the caller's + * {@link Thread}, but there will still be one thread per pipeline join + * task which is executing concurrently against different source chunks. + * When GT ZERO (0), tasks will run on an {@link ExecutorService} with + * the specified maximum parallelism. + * + * @see #DEFAULT_MAX_PARALLEL + */ + String MAX_PARALLEL = PipelineJoin.class.getName() + ".maxParallel"; + + int DEFAULT_MAX_PARALLEL = 0; + + /** * When <code>true</code>, binding sets observed in the same chunk which * have the binding pattern on the variables for the access path will be * coalesced into a single access path (default @@ -172,7 +173,7 @@ * does NOT reduce the #of solutions generated. * <p> * This option can cause some error in the join hit ratio when it is - * estimated from a cutoff join. + * estimated from a cutoff join. * * @see PipelineJoinStats#getJoinHitRatio() * @@ -194,16 +195,16 @@ String LIMIT = PipelineJoin.class.getName() + ".limit"; long DEFAULT_LIMIT = Long.MAX_VALUE; - - } - /** - * Extended statistics for the join operator. - */ - public static class PipelineJoinStats extends BOpStats { + } - private static final long serialVersionUID = 1L; - + /** + * Extended statistics for the join operator. + */ + public static class PipelineJoinStats extends BOpStats { + + private static final long serialVersionUID = 1L; + /** * The #of duplicate access paths which were detected and filtered out. */ @@ -294,492 +295,505 @@ return 0; return ((double) out) / in; } - - /** - * The #of chunks read from an {@link IAccessPath}. - */ - public final CAT accessPathChunksIn = new CAT(); - - /** - * The #of elements read from an {@link IAccessPath}. - */ - public final CAT accessPathUnitsIn = new CAT(); -// /** -// * The maximum observed fan in for this join dimension (maximum #of -// * sources observed writing on any join task for this join dimension). -// * Since join tasks may be closed and new join tasks re-opened for the -// * same query, join dimension and index partition, and since each join -// * task for the same join dimension could, in principle, have a -// * different fan in based on the actual binding sets propagated this is -// * not necessarily the "actual" fan in for the join dimension. You would -// * have to track the #of distinct partitionId values to track that. -// */ -// public int fanIn; -// -// /** -// * The maximum observed fan out for this join dimension (maximum #of -// * sinks on which any join task is writing for this join dimension). -// * Since join tasks may be closed and new join tasks re-opened for the -// * same query, join dimension and index partition, and since each join -// * task for the same join dimension could, in principle, have a -// * different fan out based on the actual binding sets propagated this is -// * not necessarily the "actual" fan out for the join dimension. -// */ -// public int fanOut; + /** + * The #of chunks read from an {@link IAccessPath}. + */ + public final CAT accessPathChunksIn = new CAT(); - public void add(final BOpStats o) { + /** + * The #of elements read from an {@link IAccessPath}. + */ + public final CAT accessPathUnitsIn = new CAT(); - super.add(o); - - if (o instanceof PipelineJoinStats) { + // /** + // * The maximum observed fan in for this join dimension (maximum #of + // * sources observed writing on any join task for this join dimension). + // * Since join tasks may be closed and new join tasks re-opened for the + // * same query, join dimension and index partition, and since each join + // * task for the same join dimension could, in principle, have a + // * different fan in based on the actual binding sets propagated this + // is + // * not necessarily the "actual" fan in for the join dimension. You + // would + // * have to track the #of distinct partitionId values to track that. + // */ + // public int fanIn; + // + // /** + // * The maximum observed fan out for this join dimension (maximum #of + // * sinks on which any join task is writing for this join dimension). + // * Since join tasks may be closed and new join tasks re-opened for the + // * same query, join dimension and index partition, and since each join + // * task for the same join dimension could, in principle, have a + // * different fan out based on the actual binding sets propagated this + // is + // * not necessarily the "actual" fan out for the join dimension. + // */ + // public int fanOut; - final PipelineJoinStats t = (PipelineJoinStats) o; + public void add(final BOpStats o) { - accessPathDups.add(t.accessPathDups.get()); + super.add(o); - accessPathCount.add(t.accessPathCount.get()); + if (o instanceof PipelineJoinStats) { - accessPathRangeCount.add(t.accessPathRangeCount.get()); + final PipelineJoinStats t = (PipelineJoinStats) o; - accessPathChunksIn.add(t.accessPathChunksIn.get()); + accessPathDups.add(t.accessPathDups.get()); - accessPathUnitsIn.add(t.accessPathUnitsIn.get()); + accessPathCount.add(t.accessPathCount.get()); - inputSolutions.add(t.inputSolutions.get()); + accessPathRangeCount.add(t.accessPathRangeCount.get()); - outputSolutions.add(t.outputSolutions.get()); + accessPathChunksIn.add(t.accessPathChunksIn.get()); -// if (t.fanIn > this.fanIn) { -// // maximum reported fanIn for this join dimension. -// this.fanIn = t.fanIn; -// } -// if (t.fanOut > this.fanOut) { -// // maximum reported fanOut for this join dimension. -// this.fanOut += t.fanOut; -// } + accessPathUnitsIn.add(t.accessPathUnitsIn.get()); - } - - } - - @Override - protected void toString(final StringBuilder sb) { - sb.append(",accessPathDups=" + accessPathDups.get()); - sb.append(",accessPathCount=" + accessPathCount.get()); - sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); - sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); - sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); - sb.append(",inputSolutions=" + inputSolutions.get()); - sb.append(",outputSolutions=" + outputSolutions.get()); - sb.append(",joinHitRatio=" + getJoinHitRatio()); - } - - } + inputSolutions.add(t.inputSolutions.get()); - /** - * Deep copy constructor. - * - * @param op - */ - public PipelineJoin(final PipelineJoin<E> op) { - super(op); - } - - /** - * Shallow copy vararg constructor. - * - * @param args - * @param annotations - */ - public PipelineJoin(final BOp[] args, NV... annotations) { + outputSolutions.add(t.outputSolutions.get()); - this(args, NV.asMap(annotations)); - - } + // if (t.fanIn > this.fanIn) { + // // maximum reported fanIn for this join dimension. + // this.fanIn = t.fanIn; + // } + // if (t.fanOut > this.fanOut) { + // // maximum reported fanOut for this join dimension. + // this.fanOut += t.fanOut; + // } - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public PipelineJoin(final BOp[] args, final Map<String, Object> annotations) { + } - super(args, annotations); + } -// if (arity() != 1) -// throw new IllegalArgumentException(); + @Override + protected void toString(final StringBuilder sb) { + sb.append(",accessPathDups=" + accessPathDups.get()); + sb.append(",accessPathCount=" + accessPathCount.get()); + sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); + sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); + sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); + sb.append(",inputSolutions=" + inputSolutions.get()); + sb.append(",outputSolutions=" + outputSolutions.get()); + sb.append(",joinHitRatio=" + getJoinHitRatio()); + } -// if (left() == null) -// throw new IllegalArgumentException(); + } - } - -// /** -// * The sole operand, which is the previous join in the pipeline join path. -// */ -// public PipelineOp left() { -// -// return (PipelineOp) get(0); -// -// } + /** + * Deep copy constructor. + * + * @param op + */ + public PipelineJoin(final PipelineJoin<E> op) { + super(op); + } - /** - * {@inheritDoc} - * - * @see Annotations#PREDICATE - */ - @SuppressWarnings("unchecked") + /** + * Shallow copy vararg constructor. + * + * @param args + * @param annotations + */ + public PipelineJoin(final BOp[] args, NV... annotations) { + + this(args, NV.asMap(annotations)); + + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public PipelineJoin(final BOp[] args, final Map<String, Object> annotations) { + + super(args, annotations); + + // if (arity() != 1) + // throw new IllegalArgumentException(); + + // if (left() == null) + // throw new IllegalArgumentException(); + + } + + // /** + // * The sole operand, which is the previous join in the pipeline join path. + // */ + // public PipelineOp left() { + // + // return (PipelineOp) get(0); + // + // } + + /** + * {@inheritDoc} + * + * @see Annotations#PREDICATE + */ + @SuppressWarnings("unchecked") public IPredicate<E> getPredicate() { - + return (IPredicate<E>) getRequiredProperty(Annotations.PREDICATE); - - } - /** - * @see Annotations#CONSTRAINTS - */ - public IConstraint[] constraints() { + } - return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); + /** + * @see Annotations#CONSTRAINTS + */ + public IConstraint[] constraints() { - } + return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); - /** - * @see Annotations#OPTIONAL - */ - public boolean isOptional() { + } - return getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); + /** + * @see Annotations#OPTIONAL + */ + public boolean isOptional() { - } + return getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); - /** - * @see Annotations#MAX_PARALLEL - */ - public int getMaxParallel() { + } - return getProperty(Annotations.MAX_PARALLEL, Annotations.DEFAULT_MAX_PARALLEL); + /** + * @see Annotations#MAX_PARALLEL + */ + public int getMaxParallel() { - } + // return 5; + return getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); - /** - * @see Annotations#SELECT - */ - public IVariable<?>[] variablesToKeep() { + } - return getProperty(Annotations.SELECT, null/* defaultValue */); + /** + * @see Annotations#SELECT + */ + public IVariable<?>[] variablesToKeep() { - } + return getProperty(Annotations.SELECT, null/* defaultValue */); - @Override - public PipelineJoinStats newStats() { + } - return new PipelineJoinStats(); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + @Override + public PipelineJoinStats newStats() { - return new FutureTask<Void>(new JoinTask<E>(this, context)); - - } + return new PipelineJoinStats(); - /** - * Pipeline join impl. - */ - private static class JoinTask<E> extends Haltable<Void> implements Callable<Void> { + } - /** - * The join that is being executed. - */ - final private PipelineJoin<?> joinOp; + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - /** - * The constraint (if any) specified for the join operator. - */ - final private IConstraint[] constraints; + return new FutureTask<Void>(new JoinTask<E>(this, context)); - /** - * The maximum parallelism with which the {@link JoinTask} will - * consume the source {@link IBindingSet}s. - * - * @see Annotations#MAX_PARALLEL - */ - final private int maxParallel; + } - /** - * The service used for executing subtasks (optional). - * - * @see #maxParallel - */ - final private Executor service; + /** + * Pipeline join impl. + */ + private static class JoinTask<E> extends Haltable<Void> implements + Callable<Void> { - /** - * True iff the {@link #predicate} operand is an optional pattern (aka if - * this is a SPARQL style left join). - */ - final private boolean optional; + /** + * The join that is being executed. + */ + final private PipelineJoin<?> joinOp; - /** - * The variables to be retained by the join operator. Variables not - * appearing in this list will be stripped before writing out the - * binding set onto the output sink(s). - */ - final private IVariable<?>[] variablesToKeep; + /** + * The constraint (if any) specified for the join operator. + */ + final private IConstraint[] constraints; - /** - * The source for the elements to be joined. - */ - final private IPredicate<E> predicate; + /** + * The maximum parallelism with which the {@link JoinTask} will consume + * the source {@link IBindingSet}s. + * + * @see Annotations#MAX_PARALLEL + */ + final private int maxParallel; - /** - * The relation associated with the {@link #predicate} operand. - */ - final private IRelation<E> relation; - - /** - * The partition identifier -or- <code>-1</code> if we are not reading - * on an index partition. - */ - final private int partitionId; - - /** - * The evaluation context. - */ - final private BOpContext<IBindingSet> context; + /** + * The service used for executing subtasks (optional). + * + * @see #maxParallel + */ + final private Executor service; - /** - * The statistics for this {@link JoinTask}. - */ - final private PipelineJoinStats stats; + /** + * True iff the {@link #predicate} operand is an optional pattern (aka + * if this is a SPARQL style left join). + */ + final private boolean optional; /** + * The variables to be retained by the join operator. Variables not + * appearing in this list will be stripped before writing out the + * binding set onto the output sink(s). + */ + final private IVariable<?>[] variablesToKeep; + + /** + * The source for the elements to be joined. + */ + final private IPredicate<E> predicate; + + /** + * The relation associated with the {@link #predicate} operand. + */ + final private IRelation<E> relation; + + /** + * The partition identifier -or- <code>-1</code> if we are not reading + * on an index partition. + */ + final private int partitionId; + + /** + * The evaluation context. + */ + final private BOpContext<IBindingSet> context; + +// /** +// * When <code>true</code>, the {@link #stats} will be tracked. This is +// * <code>false</code> unless logging is requested for {@link QueryLog} +// * or stats are explicitly request (e.g., to support cutoff joins). +// */ +// final private boolean trackStats; + + /** + * The statistics for this {@link JoinTask}. + */ + final private PipelineJoinStats stats; + + /** * An optional limit on the #of solutions to be produced. The limit is * ignored if it is {@link Long#MAX_VALUE}. */ - final private long limit; - - /** - * When <code>true</code> an attempt will be made to coalesce as-bound - * predicates which result in the same access path. - * - * @see Annotations#COALESCE_DUPLICATE_ACCESS_PATHS - */ - final boolean coalesceAccessPaths; - - /** - * Used to enforce the {@link Annotations#LIMIT} iff one is specified. - */ - final private AtomicLong exactOutputCount = new AtomicLong(); - - /** - * The source from which we read the binding set chunks. - * <p> - * Note: In keeping with the top-down evaluation of the operator tree - * the source should not be set until we begin to execute the - * {@link #left} operand and that should not happen until we are in - * {@link #call()} in order to ensure that the producer will be - * terminated if there is a problem setting up this join. Given that, it - * might need to be an atomic reference or volatile or the like. - */ - final private IAsynchronousIterator<IBindingSet[]> source; + final private long limit; - /** - * Where the join results are written. - * <p> - * Chunks of bindingSets are written pre-Thread unsynchronized buffers - * by {@link ChunkTask}. Those unsynchronized buffers overflow onto the - * per-JoinTask {@link #sink}, which is a thread-safe - * {@link IBlockingBuffer}. The downstream pipeline operator drains that - * {@link IBlockingBuffer} using its iterator(). When the {@link #sink} - * is closed and everything in it has been drained, then the downstream - * operator will conclude that no more bindingSets are available and it - * will terminate. - */ - final private IBlockingBuffer<IBindingSet[]> sink; + /** + * When <code>true</code> an attempt will be made to coalesce as-bound + * predicates which result in the same access path. + * + * @see Annotations#COALESCE_DUPLICATE_ACCESS_PATHS + */ + final boolean coalesceAccessPaths; - /** - * The alternative sink to use when the join is {@link #optional} AND - * {@link BOpContext#getSink2()} returns a distinct buffer for the - * alternative sink. The binding sets from the source are copied onto the - * alternative sink for an optional join if the join fails. Normally the - * {@link BOpContext#getSink()} can be used for both the joins which - * succeed and those which fail. The alternative sink is only necessary - * when the failed join needs to jump out of a join group rather than - * routing directly to the ancestor in the operator tree. - */ - final private IBlockingBuffer<IBindingSet[]> sink2; - - /** - * The thread-local buffer factory for the default sink. - */ - final private TLBFactory threadLocalBufferFactory; - - /** - * The thread-local buffer factory for the optional sink (iff the - * optional sink is defined). - */ - final private TLBFactory threadLocalBufferFactory2; + /** + * Used to enforce the {@link Annotations#LIMIT} iff one is specified. + */ + final private AtomicLong exactOutputCount = new AtomicLong(); - /** - * Instances of this class MUST be created in the appropriate execution - * context of the target {@link DataService} so that the federation and - * the joinNexus references are both correct and so that it has access - * to the local index object for the specified index partition. - * - * @param joinOp - * @param context - */ - public JoinTask(// - final PipelineJoin<E> joinOp,// - final BOpContext<IBindingSet> context - ) { + /** + * The source from which we read the binding set chunks. + * <p> + * Note: In keeping with the top-down evaluation of the operator tree + * the source should not be set until we begin to execute the + * {@link #left} operand and that should not happen until we are in + * {@link #call()} in order to ensure that the producer will be + * terminated if there is a problem setting up this join. Given that, it + * might need to be an atomic reference or volatile or the like. + */ + final private IAsynchronousIterator<IBindingSet[]> source; - if (joinOp == null) - throw new IllegalArgumentException(); - if (context == null) - throw new IllegalArgumentException(); + /** + * Where the join results are written. + * <p> + * Chunks of bindingSets are written pre-Thread unsynchronized buffers + * by {@link ChunkTask}. Those unsynchronized buffers overflow onto the + * per-JoinTask {@link #sink}, which is a thread-safe + * {@link IBlockingBuffer}. The downstream pipeline operator drains that + * {@link IBlockingBuffer} using its iterator(). When the {@link #sink} + * is closed and everything in it has been drained, then the downstream + * operator will conclude that no more bindingSets are available and it + * will terminate. + */ + final private IBlockingBuffer<IBindingSet[]> sink; - this.joinOp = joinOp; - this.predicate = joinOp.getPredicate(); - this.constraints = joinOp.constraints(); - this.maxParallel = joinOp.getMaxParallel(); - if (maxParallel < 0) - throw new IllegalArgumentException(Annotations.MAX_PARALLEL - + "=" + maxParallel); - if (maxParallel > 0) { - // shared service. - service = new LatchedExecutor(context.getIndexManager() - .getExecutorService(), maxParallel); - } else { - // run in the caller's thread. - service = null; - } - this.optional = joinOp.isOptional(); - this.variablesToKeep = joinOp.variablesToKeep(); - this.context = context; - this.relation = context.getRelation(predicate); - this.source = context.getSource(); - this.sink = context.getSink(); - this.sink2 = context.getSink2(); - this.partitionId = context.getPartitionId(); - this.stats = (PipelineJoinStats) context.getStats(); - this.limit = joinOp.getProperty(Annotations.LIMIT,Annotations.DEFAULT_LIMIT); + /** + * The alternative sink to use when the join is {@link #optional} AND + * {@link BOpContext#getSink2()} returns a distinct buffer for the + * alternative sink. The binding sets from the source are copied onto + * the alternative sink for an optional join if the join fails. Normally + * the {@link BOpContext#getSink()} can be used for both the joins which + * succeed and those which fail. The alternative sink is only necessary + * when the failed join needs to jump out of a join group rather than + * routing directly to the ancestor in the operator tree. + */ + final private IBlockingBuffer<IBindingSet[]> sink2; + + /** + * The thread-local buffer factory for the default sink. + */ + final private TLBFactory threadLocalBufferFactory; + + /** + * The thread-local buffer factory for the optional sink (iff the + * optional sink is defined). + */ + final private TLBFactory threadLocalBufferFactory2; + + /** + * Instances of this class MUST be created in the appropriate execution + * context of the target {@link DataService} so that the federation and + * the joinNexus references are both correct and so that it has access + * to the local index object for the specified index partition. + * + * @param joinOp + * @param context + */ + public JoinTask(// + final PipelineJoin<E> joinOp,// + final BOpContext<IBindingSet> context) { + + if (joinOp == null) + throw new IllegalArgumentException(); + if (context == null) + throw new IllegalArgumentException(); + + this.joinOp = joinOp; + this.predicate = joinOp.getPredicate(); + this.constraints = joinOp.constraints(); + this.maxParallel = joinOp.getMaxParallel(); + if (maxParallel < 0) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL + + "=" + maxParallel); + if (maxParallel > 0) { + // shared service. + service = new LatchedExecutor(context.getIndexManager() + .getExecutorService(), maxParallel); + } else { + // run in the caller's thread. + service = null; + } + this.optional = joinOp.isOptional(); + this.variablesToKeep = joinOp.variablesToKeep(); + this.context = context; + this.relation = context.getRelation(predicate); + this.source = context.getSource(); + this.sink = context.getSink(); + this.sink2 = context.getSink2(); + this.partitionId = context.getPartitionId(); + this.stats = (PipelineJoinStats) context.getStats(); + this.limit = joinOp.getProperty(Annotations.LIMIT, + Annotations.DEFAULT_LIMIT); this.coalesceAccessPaths = joinOp.getProperty( Annotations.COALESCE_DUPLICATE_ACCESS_PATHS, Annotations.DEFAULT_COALESCE_DUPLICATE_ACCESS_PATHS); - - this.threadLocalBufferFactory = new TLBFactory(sink); - - this.threadLocalBufferFactory2 = sink2 == null ? null - : new TLBFactory(sink2); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + this.threadLocalBufferFactory = new TLBFactory(sink); - } + this.threadLocalBufferFactory2 = sink2 == null ? null + : new TLBFactory(sink2); - public String toString() { + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - return getClass().getName() + "{ joinOp=" + joinOp + "}"; + } - } + public String toString() { - /** - * Runs the {@link JoinTask}. - * - * @return <code>null</code>. - */ - public Void call() throws Exception { + return getClass().getName() + "{ joinOp=" + joinOp + "}"; -// final long begin = System.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + } - try { + /** + * Runs the {@link JoinTask}. + * + * @return <code>null</code>. + */ + public Void call() throws Exception { - /* - * Consume bindingSet chunks from the source JoinTask(s). - */ - consumeSource(); + // final long begin = System.currentTimeMillis(); - /* - * Flush and close the thread-local output buffers. - */ - threadLocalBufferFactory.flush(); - if (threadLocalBufferFactory2 != null) - threadLocalBufferFactory2.flush(); + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - // flush the sync buffer - flushAndCloseBuffersAndAwaitSinks(); + try { - if (log.isDebugEnabled()) - log.debug("JoinTask done: joinOp=" + joinOp); + /* + * Consume bindingSet chunks from the source JoinTask(s). + */ + consumeSource(); - halted(); + /* + * Flush and close the thread-local output buffers. + */ + threadLocalBufferFactory.flush(); + if (threadLocalBufferFactory2 != null) + threadLocalBufferFactory2.flush(); - return null; + // flush the sync buffer + flushAndCloseBuffersAndAwaitSinks(); - } catch (Throwable t) { + if (log.isDebugEnabled()) + log.debug("JoinTask done: joinOp=" + joinOp); - /* - * This is used for processing errors and also if this task is - * interrupted (because the sink has been closed). - */ + halted(); - halt(t); + return null; - // reset the unsync buffers. - try { - // resetUnsyncBuffers(); - threadLocalBufferFactory.reset(); - if (threadLocalBufferFactory2 != null) - threadLocalBufferFactory2.reset(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + } catch (Throwable t) { - // reset the sync buffer and cancel the sink JoinTasks. - try { - cancelSinks(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + /* + * This is used for processing errors and also if this task is + * interrupted (because the sink has been closed). + */ - /* - * Close source iterators, which will cause any source JoinTasks - * that are still executing to throw a CancellationException - * when the Future associated with the source iterator is - * cancelled. - */ - try { - closeSources(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + halt(t); - throw new RuntimeException(t); + // reset the unsync buffers. + try { + // resetUnsyncBuffers(); + threadLocalBufferFactory.reset(); + if (threadLocalBufferFactory2 != null) + threadLocalBufferFactory2.reset(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } -// } finally { -// -// stats.elapsed.add(System.currentTimeMillis() - begin); - - } + // reset the sync buffer and cancel the sink JoinTasks. + try { + cancelSinks(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } - } + /* + * Close source iterators, which will cause any source JoinTasks + * that are still executing to throw a CancellationException + * when the Future associated with the source iterator is + * cancelled. + */ + try { + closeSources(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } - /** - * Consume {@link IBindingSet} chunks from the {@link #source}. - * - * @throws Exception - */ - protected void consumeSource() throws Exception { + throw new RuntimeException(t); - IBindingSet[] chunk; + // } finally { + // + // stats.elapsed.add(System.currentTimeMillis() - begin); + } + + } + + /** + * Consume {@link IBindingSet} chunks from the {@link #source}. + * + * @throws Exception + */ + protected void consumeSource() throws Exception { + + IBindingSet[] chunk; + while (!isDone() && (chunk = nextChunk()) != null) { if (chunk.length == 0) { @@ -789,240 +803,240 @@ */ continue; } - - /* - * Consume the chunk until done using either the caller's thread - * or the executor service as appropriate to run subtasks. - */ - if (chunk.length <= 1) { - - /* - * Run on the caller's thread anyway since there is just one - * binding set to be consumed. - */ - - new BindingSetConsumerTask(null/* service */, chunk).call(); - - } else { - - /* - * Run subtasks on either the caller's thread or the shared - * executed service depending on the configured value of - * [maxParallel]. - */ - - new BindingSetConsumerTask(service, chunk).call(); - - } - - } - } + /* + * Consume the chunk until done using either the caller's thread + * or the executor service as appropriate to run subtasks. + */ + if (chunk.length <= 1) { - /** - * Closes the {@link #source} specified to the ctor. - */ - protected void closeSources() { + /* + * Run on the caller's thread anyway since there is just one + * binding set to be consumed. + */ - if (log.isInfoEnabled()) - log.info(toString()); + new BindingSetConsumerTask(null/* service */, chunk) + .call(); - source.close(); + } else { - } + /* + * Run subtasks on either the caller's thread or the shared + * executed service depending on the configured value of + * [maxParallel]. + */ - /** - * Flush and close all output buffers and await sink {@link JoinTask} - * (s). - * <p> - * Note: You MUST close the {@link BlockingBuffer} from which each sink - * reads <em>before</em> invoking this method in order for those sinks - * to terminate. Otherwise the source {@link IAsynchronousIterator}(s) - * on which the sink is reading will remain open and the sink will never - * decide that it has exhausted its source(s). - * - * @throws InterruptedException - * @throws ExecutionException - */ - protected void flushAndCloseBuffersAndAwaitSinks() - throws InterruptedException, ExecutionException { + new BindingSetConsumerTask(service, chunk).call(); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + } - /* - * Close the thread-safe output buffer. For any JOIN except the - * last, this buffer will be the source for one or more sink - * JoinTasks for the next join dimension. Once this buffer is - * closed, the asynchronous iterator draining the buffer will - * eventually report that there is nothing left for it to process. - * - * Note: This is a BlockingBuffer. BlockingBuffer#flush() is a NOP. - */ + } - sink.flush(); - sink.close(); - - if(sink2!=null) { - sink2.flush(); - sink2.close(); - } - - } + } - /** - * Cancel sink {@link JoinTask}(s). - */ - protected void cancelSinks() { + /** + * Closes the {@link #source} specified to the ctor. + */ + protected void closeSources() { - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + if (log.isInfoEnabled()) + log.info(toString()); - sink.reset(); + source.close(); - if (sink.getFuture() != null) { + } - sink.getFuture().cancel(true/* mayInterruptIfRunning */); + /** + * Flush and close all output buffers and await sink {@link JoinTask} + * (s). + * <p> + * Note: You MUST close the {@link BlockingBuffer} from which each sink + * reads <em>before</em> invoking this method in order for those sinks + * to terminate. Otherwise the source {@link IAsynchronousIterator}(s) + * on which the sink is reading will remain open and the sink will never + * decide that it has exhausted its source(s). + * + * @throws InterruptedException + * @throws ExecutionException + */ + protected void flushAndCloseBuffersAndAwaitSinks() + throws InterruptedException, ExecutionException { - } + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - if (sink2 != null) { - - sink2.reset(); + /* + * Close the thread-safe output buffer. For any JOIN except the + * last, this buffer will be the source for one or more sink + * JoinTasks for the next join dimension. Once this buffer is + * closed, the asynchronous iterator draining the buffer will + * eventually report that there is nothing left for it to process. + * + * Note: This is a BlockingBuffer. BlockingBuffer#flush() is a NOP. + */ - if (sink2.getFuture() != null) { + sink.flush(); + sink.close(); - sink2.getFuture().cancel(true/* mayInterruptIfRunning */); + if (sink2 != null) { + sink2.flush(); + sink2.close(); + } - } - - } + } - } + /** + * Cancel sink {@link JoinTask}(s). + */ + protected void cancelSinks() { - /** - * Return a chunk of {@link IBindingSet}s from source. - * - * @return The next chunk -or- <code>null</code> iff the source is - * exhausted. - */ - protected IBindingSet[] nextChunk() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + sink.reset(); - while (!source.isExhausted()) { + if (sink.getFuture() != null) { - halted(); + sink.getFuture().cancel(true/* mayInterruptIfRunning */); - // note: uses timeout to avoid blocking w/o testing [halt]. - if (source.hasNext(10, TimeUnit.MILLISECONDS)) { + } - // read the chunk. - final IBindingSet[] chunk = source.next(); + if (sink2 != null) { - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); + sink2.reset(); - if (log.isDebugEnabled()) - log.debug("Read chunk from source: chunkSize=" - + chunk.length + ", joinOp=" + joinOp); + if (sink2.getFuture() != null) { - return chunk; + sink2.getFuture().cancel(true/* mayInterruptIfRunning */); - } + } - } + } - /* - * Termination condition: the source is exhausted. - */ + } - if (log.isDebugEnabled()) - log.debug("Source exhausted: joinOp=" + joinOp); + /** + * Return a chunk of {@link IBindingSet}s from source. + * + * @return The next chunk -or- <code>null</code> iff the source is + * exhausted. + */ + protected IBindingSet[] nextChunk() throws InterruptedException { - return null; + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - } + while (!source.isExhausted()) { - /** - * Class consumes a chunk of binding set executing a nested indexed join - * until canceled, interrupted, or all the binding sets are exhausted. - * For each {@link IBindingSet} in the chunk, an {@link AccessPathTask} - * is created which will consume that {@link IBindingSet}. The - * {@link AccessPathTask}s are sorted based on their - * <code>fromKey</code> so as to order the execution of those tasks in a - * manner that will maximize the efficiency of index reads. The ordered - * {@link AccessPathTask}s are then submitted to the caller's - * {@link Executor} or run in the caller's thread if the executor is - * <code>null</code>. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - * @version $Id$ - */ - protected class BindingSetConsumerTask implements Callable<Void> { + halted(); - private final Executor executor; - private final IBindingSet[] chunk; + // note: uses timeout to avoid blocking w/o testing [halt]. + if (source.hasNext(10, TimeUnit.MILLISECONDS)) { - /** - * - * @param executor - * The service that will execute the generated - * {@link AccessPathTask}s -or- <code>null</code> IFF you - * want the {@link AccessPathTask}s to be executed in the - * caller's thread. - * @param chunk - * A chunk of binding sets from the upstream producer. - */ - public BindingSetConsumerTask(final Executor executor, - final IBindingSet[] chunk) { + // read the chunk. + final IBindingSet[] chunk = source.next(); - if (chunk == null) - throw new IllegalArgumentException(); - - this.executor = executor; - - this.chunk = chunk; + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); - } + if (log.isDebugEnabled()) + log.debug("Read chunk from source: chunkSize=" + + chunk.length + ", joinOp=" + joinOp); - /** - * Read chunks from one or more sources until canceled, interrupted, - * or all sources are exhausted and submits {@link AccessPathTask}s - * to the caller's {@link ExecutorService} -or- executes those tasks - * in the caller's thread if no {@link ExecutorService} was provided - * to the ctor. - * <p> - * Note: When running with an {@link ExecutorService}, the caller is - * responsible for waiting on that {@link ExecutorService} until the - * {@link AccessPathTask}s to complete and must verify all tasks - * completed successfully. - * - * @return <code>null</code> - * - * @throws BufferClosedException - * if there is an attempt to output a chunk of - * {@link IBindingSet}s or {@link ISolution}s and the - * output buffer is an {@link IBlockingBuffer} (true for - * all join dimensions exception the lastJoin and also - * true for query on the lastJoin) and that - * {@link IBlockingBuffer} has been closed. - */ + return chunk; + + } + + } + + /* + * Termination condition: the source is exhausted. + */ + + if (log.isDebugEnabled()) + log.debug("Source exhausted: joinOp=" + joinOp); + + return null; + + } + + /** + * Class consumes a chunk of binding set executing a nested indexed join + * until canceled, interrupted, or all the binding sets are exhausted. + * For each {@link IBindingSet} in the chunk, an {@link AccessPathTask} + * is created which will consume that {@link IBindingSet}. The + * {@link AccessPathTask}s are sorted based on their + * <code>fromKey</code> so as to order the execution of those tasks in a + * manner that will maximize the efficiency of index reads. The ordered + * {@link AccessPathTask}s are then submitted to the caller's + * {@link Executor} or run in the caller's thread if the executor is + * <code>null</code>. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + protected class BindingSetConsumerTask implements Callable<Void> { + + private final Executor executor; + private final IBindingSet[] chunk; + + /** + * + * @param executor + * The service that will execute the generated + * {@link AccessPathTask}s -or- <code>null</code> IFF you + * want the {@link AccessPathTask}s to be executed in the + * caller's thread. + * @param chunk + * A chunk of binding sets from the upstream producer. + */ + public BindingSetConsumerTask(final Executor executor, + final IBindingSet[] chunk) { + + if (chunk == null) + throw new IllegalArgumentException(); + + this.executor = executor; + + this.chunk = chunk; + + } + + /** + * Read chunks from one or more sources until canceled, interrupted, + * or all sources are exhausted and submits {@link AccessPathTask}s + * to the caller's {@link ExecutorService} -or- executes those tasks + * in the caller's thread if no {@link ExecutorService} was provided + * to the ctor. + * <p> + * Note: When running with an {@link ExecutorService}, the caller is + * responsible for waiting on that {@link ExecutorService} until the + * {@link AccessPathTask}s to complete and must verify all tasks + * completed successfully. + * + * @return <code>null</code> + * + * @throws BufferClosedException + * if there is an attempt to output a chunk of + * {@link IBindingSet}s or {@link ISolution}s and the + * output buffer is an {@link IBlockingBuffer} (true for + * all join dimensions exception the lastJoin and also + * true for query on the lastJoin) and that + * {@link IBlockingBuffer} has been closed. + */ public Void call() throws Exception { - try { + try { - if (chunk.length == 1) { - - // fast path if the chunk has a single binding set. - runOneTask(); - - return null; - - } + if (chunk.length == 1) { + // fast path if the chunk has a single binding set. + runOneTask(); + + return null; + + } + /* * Generate (and optionally coalesce) the access path tasks. */ @@ -1031,33 +1045,33 @@ /* * Reorder those tasks for better index read performance. */ - reorderTasks(tasks); + reorderTasks(tasks); - /* - * Execute the tasks (either in the caller's thread or on - * the supplied service). - */ - executeTasks(tasks); + /* + * Execute the tasks (either in the caller's thread or on + * the supplied service). + */ + executeTasks(tasks); - return null; + return null; - } catch (Throwable t) { + } catch (Throwable t) { - halt(t); + halt(t); - throw new RuntimeException(t); + throw new RuntimeException(t); - } + } - } + } - /** - * There is exactly one {@link IBindingSet} in the chunk, so run - * exactly one {@link AccessPathTask}. - * - * @throws Exception - */ - private void runOneTask() throws Exception { + /** + * There is exactly one {@link IBindingSet} in the chunk, so run + * exactly one {@link AccessPathTask}. + * + * @throws Exception + */ + private void runOneTask() throws Exception { if (chunk.length != 1) throw new AssertionError(); @@ -1094,10 +1108,11 @@ * * @param chunk * The chunk. - * + * * @return The tasks to process that chunk. */ - protected AccessPathTask[] generateAccessPaths(final IBindingSet[] chunk) { + protected AccessPathTask[] generateAccessPaths( + final IBindingSet[] chunk) { final AccessPathTask[] tasks; @@ -1122,7 +1137,7 @@ /* * Do not coalesce access paths. */ - + tasks = new JoinTask.AccessPathTask[chunk.length]; for (int i = 0; i < chunk.length; i++) { @@ -1157,272 +1172,272 @@ } return tasks; - - } - - /** - * Populates a map of asBound predicates paired to a set of - * bindingSets. - * <p> - * Note: The {@link AccessPathTask} will apply each bindingSet to - * each element visited by the {@link IAccessPath} obtained for the - * asBound {@link IPredicate}. This has the natural consequence of - * eliminating subqueries within the chunk. - * - * @param chunk - * A chunk of bindingSets from the source join dimension. - * - * @return A map which pairs the distinct asBound predicates to the - * bindingSets in the chunk from which the predicate was - * generated. - */ - protected Map<HashedPredicate<E>, Collection<IBindingSet>> combineBindingSets( - final IBindingSet[] chunk) { - if (log.isDebugEnabled()) - log.debug("chunkSize=" + chunk.length); + } - final Map<HashedPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<HashedPredicate<E>, Collection<IBindingSet>>( - chunk.length); + /** + * Populates a map of asBound predicates paired to a set of + * bindingSets. + * <p> + * Note: The {@link AccessPathTask} will apply each bindingSet to + * each element visited by the {@link IAccessPath} obtained for the + * asBound {@link IPredicate}. This has the natural consequence of + * eliminating subqueries within the chunk. + * + * @param chunk + * A chunk of bindingSets from the source join dimension. + * + * @return A map which pairs the distinct asBound predicates to the + * bindingSets in the chunk from which the predicate was + * generated. + */ + protected Map<HashedPredicate<E>, Collection<IBindingSet>> combineBindingSets( + final IBindingSet[] chunk) { - for (IBindingSet bindingSet : chunk) { + if (log.isDebugEnabled()) + log.debug("chunkSize=" + chunk.length); - halted(); + final Map<HashedPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<HashedPredicate<E>, Collection<IBindingSet>>( + chunk.length); - // constrain the predicate to the given bindings. - IPredicate<E> asBound = predicate.asBound(bindingSet); + for (IBindingSet bindingSet : chunk) { - if (partitionId != -1) { + halted(); - /* - * Constrain the predicate to the desired index - * partition. - * - * Note: we do this for scale-out joins since the access - * path will be evaluated by a JoinTask dedicated to - * this index partition, which is part of how we give - * the JoinTask to gain access to the local index object - * for an index partition. - */ + // constrain the predicate to the given bindings. + IPredicate<E> asBound = predicate.asBound(bindingSet); - asBound = asBound.setPartitionId(partitionId); + if (partitionId != -1) { - } + /* + * Constrain the predicate to the desired index + * partition. + * + * Note: we do this for scale-out joins since the access + * path will be evaluated by a JoinTask dedicated to + * this index partition, which is part of how we give + * the JoinTask to gain access to the local index object + * for an index partition. + */ - // lookup the asBound predicate in the map. - final HashedPredicate<E> hashedPred = new HashedPredicate<E>(asBound); - Collection<IBindingSet> values = map.get(hashedPred); + asBound = asBound.setPartitionId(partitionId); - if (values == null) { + } - /* - * This is the first bindingSet for this asBound - * predicate. We create a collection of bindingSets to - * be paired with that predicate and put the collection - * into the map using that predicate as the key. - */ + // lookup the asBound predicate in the map. + final HashedPredicate<E> hashedPred = new HashedPredicate<E>( + asBound); + Collection<IBindingSet> values = map.get(hashedPred); - values = new LinkedList<IBindingSet>(); + if (values == null) { - map.put(hashedPred, values); + /* + * This is the first bindingSet for this asBound + * predicate. We create a collection of bindingSets to + * be paired with that predicate and put the collection + * into the map using that predicate as the key. + */ - } else { + values = new LinkedList<IBindingSet>(); - // more than one bindingSet will use the same access - // path. - stats.accessPathDups.increment(); + map.put(hashedPred, values); - } + } else { - /* - * Add the bindingSet to the collection of bindingSets - * paired with the asBound predicate. - */ + // more than one bindingSet will use the same access + // path. + stats.accessPathDups.increment(); - values.add(bindingSet); + } - } + /* + * Add the bindingSet to the collection of bindingSets + * paired with the asBound predicate. + */ - if (log.isDebugEnabled()) - log.debug("chunkSize=" + chunk.length - + ", #distinct predicates=" + map.size()); + values.add(bindingSet); - return map; + } - } + if (log.isDebugEnabled()) + log.debug("chunkSize=" + chunk.length + + ", #distinct predicates=" + map.size()); - /** - * Creates an {@link AccessPathTask} for each {@link IBindingSet} in - * the given chunk. - * - * @param chunk - * A chunk of {@link IBindingSet}s from one or more - * source {@link JoinTask}s. - * - * @return A chunk of {@link AccessPathTask} in a desirable - * execution order. - * - * @throws Exception - */ - protected AccessPathTask[] getAccessPathTasks( - final Map<HashedPredicate<E>, Collection<IBindingSet>> map) { + return map; - final int n = map.size(); + } - if (log.isDebugEnabled()) - log.debug("#distinct predicates=" + n); + /** + * Creates an {@link AccessPathTask} for each {@link IBindingSet} in + * the given chunk. + * + * @param chunk + * A chunk of {@link IBindingSet}s from one or more + * source {@link JoinTask}s. + * + * @return A chunk of {@link AccessPathTask} in a desirable + * execution order. + * + * @throws Exception + */ + protected AccessPathTask[] getAccessPathTasks( + final Map<HashedPredicate<E>, Collection<IBindingSet>> map) { - final AccessPathTask[] tasks = new JoinTask.AccessPathTask[n]; + final int n = map.size(); - final Iterator<Map.Entry<HashedPredicate<E>, Collection<IBindingSet>>> itr ... [truncated message content] |