From: <tho...@us...> - 2010-11-08 21:28:27
|
Revision: 3914 http://bigdata.svn.sourceforge.net/bigdata/?rev=3914&view=rev Author: thompsonbry Date: 2010-11-08 21:28:21 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Allowing joins with no operands (so you do not need to feed them with a StartOp). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-08 21:27:47 UTC (rev 3913) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-08 21:28:21 UTC (rev 3914) @@ -271,23 +271,23 @@ super(args, annotations); - if (arity() != 1) - throw new IllegalArgumentException(); +// if (arity() != 1) +// throw new IllegalArgumentException(); - if (left() == null) - throw new IllegalArgumentException(); +// if (left() == null) +// throw new IllegalArgumentException(); } - /** - * The sole operand, which is the previous join in the pipeline join path. - */ - public PipelineOp left() { +// /** +// * The sole operand, which is the previous join in the pipeline join path. +// */ +// public PipelineOp left() { +// +// return (PipelineOp) get(0); +// +// } - return (PipelineOp) get(0); - - } - /** * {@inheritDoc} * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-01-05 13:49:23
|
Revision: 4053 http://bigdata.svn.sourceforge.net/bigdata/?rev=4053&view=rev Author: thompsonbry Date: 2011-01-05 13:49:15 +0000 (Wed, 05 Jan 2011) Log Message: ----------- Mainly an (inadvertent) whitespace change. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-05 13:45:21 UTC (rev 4052) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-05 13:49:15 UTC (rev 4053) @@ -20,7 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ + */ /* * Created on Aug 18, 2010 */ @@ -104,65 +104,66 @@ * suite. */ public class PipelineJoin<E> extends PipelineOp implements - IShardwisePipelineOp<E> { + IShardwisePipelineOp<E> { - static private final transient Logger log = Logger.getLogger(PipelineJoin.class); + static private final transient Logger log = Logger + .getLogger(PipelineJoin.class); - /** + /** * */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - public interface Annotations extends PipelineOp.Annotations { + public interface Annotations extends PipelineOp.Annotations { /** * The {@link IPredicate} which is used to generate the * {@link IAccessPath}s during the join. */ String PREDICATE = PipelineJoin.class.getName() + ".predicate"; - - /** - * An optional {@link IVariable}[] identifying the variables to be - * retained in the {@link IBindingSet}s written out by the operator. - * All variables are retained unless this annotation is specified. - */ - String SELECT = PipelineJoin.class.getName() + ".select"; - - /** - * An optional {@link IConstraint}[] which places restrictions on the - * legal patterns in the variable bindings. - */ - String CONSTRAINTS = PipelineJoin.class.getName() + ".constraints"; - /** - * Marks the join as "optional" in the SPARQL sense. Binding sets which - * fail the join will be routed to the alternative sink as specified by - * either {@link PipelineOp.Annotations#ALT_SINK_REF} or - * {@link PipelineOp.Annotations#ALT_SINK_GROUP}. - * - * @see #DEFAULT_OPTIONAL - */ - String OPTIONAL = PipelineJoin.class.getName() + ".optional"; + /** + * An optional {@link IVariable}[] identifying the variables to be + * retained in the {@link IBindingSet}s written out by the operator. All + * variables are retained unless this annotation is specified. + */ + String SELECT = PipelineJoin.class.getName() + ".select"; - boolean DEFAULT_OPTIONAL = false; + /** + * An optional {@link IConstraint}[] which places restrictions on the + * legal patterns in the variable bindings. + */ + String CONSTRAINTS = PipelineJoin.class.getName() + ".constraints"; - /** - * The maximum parallelism with which the pipeline will consume the - * source {@link IBindingSet}[] chunk. - * <p> - * Note: When ZERO (0), everything will run in the caller's - * {@link Thread}, but there will still be one thread per pipeline join - * task which is executing concurrently against different source chunks. - * When GT ZERO (0), tasks will run on an {@link ExecutorService} with - * the specified maximum parallelism. - * - * @see #DEFAULT_MAX_PARALLEL - */ - String MAX_PARALLEL = PipelineJoin.class.getName() + ".maxParallel"; + /** + * Marks the join as "optional" in the SPARQL sense. Binding sets which + * fail the join will be routed to the alternative sink as specified by + * either {@link PipelineOp.Annotations#ALT_SINK_REF} or + * {@link PipelineOp.Annotations#ALT_SINK_GROUP}. + * + * @see #DEFAULT_OPTIONAL + */ + String OPTIONAL = PipelineJoin.class.getName() + ".optional"; - int DEFAULT_MAX_PARALLEL = 0; + boolean DEFAULT_OPTIONAL = false; /** + * The maximum parallelism with which the pipeline will consume the + * source {@link IBindingSet}[] chunk. + * <p> + * Note: When ZERO (0), everything will run in the caller's + * {@link Thread}, but there will still be one thread per pipeline join + * task which is executing concurrently against different source chunks. + * When GT ZERO (0), tasks will run on an {@link ExecutorService} with + * the specified maximum parallelism. + * + * @see #DEFAULT_MAX_PARALLEL + */ + String MAX_PARALLEL = PipelineJoin.class.getName() + ".maxParallel"; + + int DEFAULT_MAX_PARALLEL = 0; + + /** * When <code>true</code>, binding sets observed in the same chunk which * have the binding pattern on the variables for the access path will be * coalesced into a single access path (default @@ -172,7 +173,7 @@ * does NOT reduce the #of solutions generated. * <p> * This option can cause some error in the join hit ratio when it is - * estimated from a cutoff join. + * estimated from a cutoff join. * * @see PipelineJoinStats#getJoinHitRatio() * @@ -194,16 +195,16 @@ String LIMIT = PipelineJoin.class.getName() + ".limit"; long DEFAULT_LIMIT = Long.MAX_VALUE; - - } - /** - * Extended statistics for the join operator. - */ - public static class PipelineJoinStats extends BOpStats { + } - private static final long serialVersionUID = 1L; - + /** + * Extended statistics for the join operator. + */ + public static class PipelineJoinStats extends BOpStats { + + private static final long serialVersionUID = 1L; + /** * The #of duplicate access paths which were detected and filtered out. */ @@ -294,492 +295,505 @@ return 0; return ((double) out) / in; } - - /** - * The #of chunks read from an {@link IAccessPath}. - */ - public final CAT accessPathChunksIn = new CAT(); - - /** - * The #of elements read from an {@link IAccessPath}. - */ - public final CAT accessPathUnitsIn = new CAT(); -// /** -// * The maximum observed fan in for this join dimension (maximum #of -// * sources observed writing on any join task for this join dimension). -// * Since join tasks may be closed and new join tasks re-opened for the -// * same query, join dimension and index partition, and since each join -// * task for the same join dimension could, in principle, have a -// * different fan in based on the actual binding sets propagated this is -// * not necessarily the "actual" fan in for the join dimension. You would -// * have to track the #of distinct partitionId values to track that. -// */ -// public int fanIn; -// -// /** -// * The maximum observed fan out for this join dimension (maximum #of -// * sinks on which any join task is writing for this join dimension). -// * Since join tasks may be closed and new join tasks re-opened for the -// * same query, join dimension and index partition, and since each join -// * task for the same join dimension could, in principle, have a -// * different fan out based on the actual binding sets propagated this is -// * not necessarily the "actual" fan out for the join dimension. -// */ -// public int fanOut; + /** + * The #of chunks read from an {@link IAccessPath}. + */ + public final CAT accessPathChunksIn = new CAT(); - public void add(final BOpStats o) { + /** + * The #of elements read from an {@link IAccessPath}. + */ + public final CAT accessPathUnitsIn = new CAT(); - super.add(o); - - if (o instanceof PipelineJoinStats) { + // /** + // * The maximum observed fan in for this join dimension (maximum #of + // * sources observed writing on any join task for this join dimension). + // * Since join tasks may be closed and new join tasks re-opened for the + // * same query, join dimension and index partition, and since each join + // * task for the same join dimension could, in principle, have a + // * different fan in based on the actual binding sets propagated this + // is + // * not necessarily the "actual" fan in for the join dimension. You + // would + // * have to track the #of distinct partitionId values to track that. + // */ + // public int fanIn; + // + // /** + // * The maximum observed fan out for this join dimension (maximum #of + // * sinks on which any join task is writing for this join dimension). + // * Since join tasks may be closed and new join tasks re-opened for the + // * same query, join dimension and index partition, and since each join + // * task for the same join dimension could, in principle, have a + // * different fan out based on the actual binding sets propagated this + // is + // * not necessarily the "actual" fan out for the join dimension. + // */ + // public int fanOut; - final PipelineJoinStats t = (PipelineJoinStats) o; + public void add(final BOpStats o) { - accessPathDups.add(t.accessPathDups.get()); + super.add(o); - accessPathCount.add(t.accessPathCount.get()); + if (o instanceof PipelineJoinStats) { - accessPathRangeCount.add(t.accessPathRangeCount.get()); + final PipelineJoinStats t = (PipelineJoinStats) o; - accessPathChunksIn.add(t.accessPathChunksIn.get()); + accessPathDups.add(t.accessPathDups.get()); - accessPathUnitsIn.add(t.accessPathUnitsIn.get()); + accessPathCount.add(t.accessPathCount.get()); - inputSolutions.add(t.inputSolutions.get()); + accessPathRangeCount.add(t.accessPathRangeCount.get()); - outputSolutions.add(t.outputSolutions.get()); + accessPathChunksIn.add(t.accessPathChunksIn.get()); -// if (t.fanIn > this.fanIn) { -// // maximum reported fanIn for this join dimension. -// this.fanIn = t.fanIn; -// } -// if (t.fanOut > this.fanOut) { -// // maximum reported fanOut for this join dimension. -// this.fanOut += t.fanOut; -// } + accessPathUnitsIn.add(t.accessPathUnitsIn.get()); - } - - } - - @Override - protected void toString(final StringBuilder sb) { - sb.append(",accessPathDups=" + accessPathDups.get()); - sb.append(",accessPathCount=" + accessPathCount.get()); - sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); - sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); - sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); - sb.append(",inputSolutions=" + inputSolutions.get()); - sb.append(",outputSolutions=" + outputSolutions.get()); - sb.append(",joinHitRatio=" + getJoinHitRatio()); - } - - } + inputSolutions.add(t.inputSolutions.get()); - /** - * Deep copy constructor. - * - * @param op - */ - public PipelineJoin(final PipelineJoin<E> op) { - super(op); - } - - /** - * Shallow copy vararg constructor. - * - * @param args - * @param annotations - */ - public PipelineJoin(final BOp[] args, NV... annotations) { + outputSolutions.add(t.outputSolutions.get()); - this(args, NV.asMap(annotations)); - - } + // if (t.fanIn > this.fanIn) { + // // maximum reported fanIn for this join dimension. + // this.fanIn = t.fanIn; + // } + // if (t.fanOut > this.fanOut) { + // // maximum reported fanOut for this join dimension. + // this.fanOut += t.fanOut; + // } - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public PipelineJoin(final BOp[] args, final Map<String, Object> annotations) { + } - super(args, annotations); + } -// if (arity() != 1) -// throw new IllegalArgumentException(); + @Override + protected void toString(final StringBuilder sb) { + sb.append(",accessPathDups=" + accessPathDups.get()); + sb.append(",accessPathCount=" + accessPathCount.get()); + sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); + sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); + sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); + sb.append(",inputSolutions=" + inputSolutions.get()); + sb.append(",outputSolutions=" + outputSolutions.get()); + sb.append(",joinHitRatio=" + getJoinHitRatio()); + } -// if (left() == null) -// throw new IllegalArgumentException(); + } - } - -// /** -// * The sole operand, which is the previous join in the pipeline join path. -// */ -// public PipelineOp left() { -// -// return (PipelineOp) get(0); -// -// } + /** + * Deep copy constructor. + * + * @param op + */ + public PipelineJoin(final PipelineJoin<E> op) { + super(op); + } - /** - * {@inheritDoc} - * - * @see Annotations#PREDICATE - */ - @SuppressWarnings("unchecked") + /** + * Shallow copy vararg constructor. + * + * @param args + * @param annotations + */ + public PipelineJoin(final BOp[] args, NV... annotations) { + + this(args, NV.asMap(annotations)); + + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public PipelineJoin(final BOp[] args, final Map<String, Object> annotations) { + + super(args, annotations); + + // if (arity() != 1) + // throw new IllegalArgumentException(); + + // if (left() == null) + // throw new IllegalArgumentException(); + + } + + // /** + // * The sole operand, which is the previous join in the pipeline join path. + // */ + // public PipelineOp left() { + // + // return (PipelineOp) get(0); + // + // } + + /** + * {@inheritDoc} + * + * @see Annotations#PREDICATE + */ + @SuppressWarnings("unchecked") public IPredicate<E> getPredicate() { - + return (IPredicate<E>) getRequiredProperty(Annotations.PREDICATE); - - } - /** - * @see Annotations#CONSTRAINTS - */ - public IConstraint[] constraints() { + } - return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); + /** + * @see Annotations#CONSTRAINTS + */ + public IConstraint[] constraints() { - } + return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); - /** - * @see Annotations#OPTIONAL - */ - public boolean isOptional() { + } - return getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); + /** + * @see Annotations#OPTIONAL + */ + public boolean isOptional() { - } + return getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); - /** - * @see Annotations#MAX_PARALLEL - */ - public int getMaxParallel() { + } - return getProperty(Annotations.MAX_PARALLEL, Annotations.DEFAULT_MAX_PARALLEL); + /** + * @see Annotations#MAX_PARALLEL + */ + public int getMaxParallel() { - } + // return 5; + return getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); - /** - * @see Annotations#SELECT - */ - public IVariable<?>[] variablesToKeep() { + } - return getProperty(Annotations.SELECT, null/* defaultValue */); + /** + * @see Annotations#SELECT + */ + public IVariable<?>[] variablesToKeep() { - } + return getProperty(Annotations.SELECT, null/* defaultValue */); - @Override - public PipelineJoinStats newStats() { + } - return new PipelineJoinStats(); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + @Override + public PipelineJoinStats newStats() { - return new FutureTask<Void>(new JoinTask<E>(this, context)); - - } + return new PipelineJoinStats(); - /** - * Pipeline join impl. - */ - private static class JoinTask<E> extends Haltable<Void> implements Callable<Void> { + } - /** - * The join that is being executed. - */ - final private PipelineJoin<?> joinOp; + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - /** - * The constraint (if any) specified for the join operator. - */ - final private IConstraint[] constraints; + return new FutureTask<Void>(new JoinTask<E>(this, context)); - /** - * The maximum parallelism with which the {@link JoinTask} will - * consume the source {@link IBindingSet}s. - * - * @see Annotations#MAX_PARALLEL - */ - final private int maxParallel; + } - /** - * The service used for executing subtasks (optional). - * - * @see #maxParallel - */ - final private Executor service; + /** + * Pipeline join impl. + */ + private static class JoinTask<E> extends Haltable<Void> implements + Callable<Void> { - /** - * True iff the {@link #predicate} operand is an optional pattern (aka if - * this is a SPARQL style left join). - */ - final private boolean optional; + /** + * The join that is being executed. + */ + final private PipelineJoin<?> joinOp; - /** - * The variables to be retained by the join operator. Variables not - * appearing in this list will be stripped before writing out the - * binding set onto the output sink(s). - */ - final private IVariable<?>[] variablesToKeep; + /** + * The constraint (if any) specified for the join operator. + */ + final private IConstraint[] constraints; - /** - * The source for the elements to be joined. - */ - final private IPredicate<E> predicate; + /** + * The maximum parallelism with which the {@link JoinTask} will consume + * the source {@link IBindingSet}s. + * + * @see Annotations#MAX_PARALLEL + */ + final private int maxParallel; - /** - * The relation associated with the {@link #predicate} operand. - */ - final private IRelation<E> relation; - - /** - * The partition identifier -or- <code>-1</code> if we are not reading - * on an index partition. - */ - final private int partitionId; - - /** - * The evaluation context. - */ - final private BOpContext<IBindingSet> context; + /** + * The service used for executing subtasks (optional). + * + * @see #maxParallel + */ + final private Executor service; - /** - * The statistics for this {@link JoinTask}. - */ - final private PipelineJoinStats stats; + /** + * True iff the {@link #predicate} operand is an optional pattern (aka + * if this is a SPARQL style left join). + */ + final private boolean optional; /** + * The variables to be retained by the join operator. Variables not + * appearing in this list will be stripped before writing out the + * binding set onto the output sink(s). + */ + final private IVariable<?>[] variablesToKeep; + + /** + * The source for the elements to be joined. + */ + final private IPredicate<E> predicate; + + /** + * The relation associated with the {@link #predicate} operand. + */ + final private IRelation<E> relation; + + /** + * The partition identifier -or- <code>-1</code> if we are not reading + * on an index partition. + */ + final private int partitionId; + + /** + * The evaluation context. + */ + final private BOpContext<IBindingSet> context; + +// /** +// * When <code>true</code>, the {@link #stats} will be tracked. This is +// * <code>false</code> unless logging is requested for {@link QueryLog} +// * or stats are explicitly request (e.g., to support cutoff joins). +// */ +// final private boolean trackStats; + + /** + * The statistics for this {@link JoinTask}. + */ + final private PipelineJoinStats stats; + + /** * An optional limit on the #of solutions to be produced. The limit is * ignored if it is {@link Long#MAX_VALUE}. */ - final private long limit; - - /** - * When <code>true</code> an attempt will be made to coalesce as-bound - * predicates which result in the same access path. - * - * @see Annotations#COALESCE_DUPLICATE_ACCESS_PATHS - */ - final boolean coalesceAccessPaths; - - /** - * Used to enforce the {@link Annotations#LIMIT} iff one is specified. - */ - final private AtomicLong exactOutputCount = new AtomicLong(); - - /** - * The source from which we read the binding set chunks. - * <p> - * Note: In keeping with the top-down evaluation of the operator tree - * the source should not be set until we begin to execute the - * {@link #left} operand and that should not happen until we are in - * {@link #call()} in order to ensure that the producer will be - * terminated if there is a problem setting up this join. Given that, it - * might need to be an atomic reference or volatile or the like. - */ - final private IAsynchronousIterator<IBindingSet[]> source; + final private long limit; - /** - * Where the join results are written. - * <p> - * Chunks of bindingSets are written pre-Thread unsynchronized buffers - * by {@link ChunkTask}. Those unsynchronized buffers overflow onto the - * per-JoinTask {@link #sink}, which is a thread-safe - * {@link IBlockingBuffer}. The downstream pipeline operator drains that - * {@link IBlockingBuffer} using its iterator(). When the {@link #sink} - * is closed and everything in it has been drained, then the downstream - * operator will conclude that no more bindingSets are available and it - * will terminate. - */ - final private IBlockingBuffer<IBindingSet[]> sink; + /** + * When <code>true</code> an attempt will be made to coalesce as-bound + * predicates which result in the same access path. + * + * @see Annotations#COALESCE_DUPLICATE_ACCESS_PATHS + */ + final boolean coalesceAccessPaths; - /** - * The alternative sink to use when the join is {@link #optional} AND - * {@link BOpContext#getSink2()} returns a distinct buffer for the - * alternative sink. The binding sets from the source are copied onto the - * alternative sink for an optional join if the join fails. Normally the - * {@link BOpContext#getSink()} can be used for both the joins which - * succeed and those which fail. The alternative sink is only necessary - * when the failed join needs to jump out of a join group rather than - * routing directly to the ancestor in the operator tree. - */ - final private IBlockingBuffer<IBindingSet[]> sink2; - - /** - * The thread-local buffer factory for the default sink. - */ - final private TLBFactory threadLocalBufferFactory; - - /** - * The thread-local buffer factory for the optional sink (iff the - * optional sink is defined). - */ - final private TLBFactory threadLocalBufferFactory2; + /** + * Used to enforce the {@link Annotations#LIMIT} iff one is specified. + */ + final private AtomicLong exactOutputCount = new AtomicLong(); - /** - * Instances of this class MUST be created in the appropriate execution - * context of the target {@link DataService} so that the federation and - * the joinNexus references are both correct and so that it has access - * to the local index object for the specified index partition. - * - * @param joinOp - * @param context - */ - public JoinTask(// - final PipelineJoin<E> joinOp,// - final BOpContext<IBindingSet> context - ) { + /** + * The source from which we read the binding set chunks. + * <p> + * Note: In keeping with the top-down evaluation of the operator tree + * the source should not be set until we begin to execute the + * {@link #left} operand and that should not happen until we are in + * {@link #call()} in order to ensure that the producer will be + * terminated if there is a problem setting up this join. Given that, it + * might need to be an atomic reference or volatile or the like. + */ + final private IAsynchronousIterator<IBindingSet[]> source; - if (joinOp == null) - throw new IllegalArgumentException(); - if (context == null) - throw new IllegalArgumentException(); + /** + * Where the join results are written. + * <p> + * Chunks of bindingSets are written pre-Thread unsynchronized buffers + * by {@link ChunkTask}. Those unsynchronized buffers overflow onto the + * per-JoinTask {@link #sink}, which is a thread-safe + * {@link IBlockingBuffer}. The downstream pipeline operator drains that + * {@link IBlockingBuffer} using its iterator(). When the {@link #sink} + * is closed and everything in it has been drained, then the downstream + * operator will conclude that no more bindingSets are available and it + * will terminate. + */ + final private IBlockingBuffer<IBindingSet[]> sink; - this.joinOp = joinOp; - this.predicate = joinOp.getPredicate(); - this.constraints = joinOp.constraints(); - this.maxParallel = joinOp.getMaxParallel(); - if (maxParallel < 0) - throw new IllegalArgumentException(Annotations.MAX_PARALLEL - + "=" + maxParallel); - if (maxParallel > 0) { - // shared service. - service = new LatchedExecutor(context.getIndexManager() - .getExecutorService(), maxParallel); - } else { - // run in the caller's thread. - service = null; - } - this.optional = joinOp.isOptional(); - this.variablesToKeep = joinOp.variablesToKeep(); - this.context = context; - this.relation = context.getRelation(predicate); - this.source = context.getSource(); - this.sink = context.getSink(); - this.sink2 = context.getSink2(); - this.partitionId = context.getPartitionId(); - this.stats = (PipelineJoinStats) context.getStats(); - this.limit = joinOp.getProperty(Annotations.LIMIT,Annotations.DEFAULT_LIMIT); + /** + * The alternative sink to use when the join is {@link #optional} AND + * {@link BOpContext#getSink2()} returns a distinct buffer for the + * alternative sink. The binding sets from the source are copied onto + * the alternative sink for an optional join if the join fails. Normally + * the {@link BOpContext#getSink()} can be used for both the joins which + * succeed and those which fail. The alternative sink is only necessary + * when the failed join needs to jump out of a join group rather than + * routing directly to the ancestor in the operator tree. + */ + final private IBlockingBuffer<IBindingSet[]> sink2; + + /** + * The thread-local buffer factory for the default sink. + */ + final private TLBFactory threadLocalBufferFactory; + + /** + * The thread-local buffer factory for the optional sink (iff the + * optional sink is defined). + */ + final private TLBFactory threadLocalBufferFactory2; + + /** + * Instances of this class MUST be created in the appropriate execution + * context of the target {@link DataService} so that the federation and + * the joinNexus references are both correct and so that it has access + * to the local index object for the specified index partition. + * + * @param joinOp + * @param context + */ + public JoinTask(// + final PipelineJoin<E> joinOp,// + final BOpContext<IBindingSet> context) { + + if (joinOp == null) + throw new IllegalArgumentException(); + if (context == null) + throw new IllegalArgumentException(); + + this.joinOp = joinOp; + this.predicate = joinOp.getPredicate(); + this.constraints = joinOp.constraints(); + this.maxParallel = joinOp.getMaxParallel(); + if (maxParallel < 0) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL + + "=" + maxParallel); + if (maxParallel > 0) { + // shared service. + service = new LatchedExecutor(context.getIndexManager() + .getExecutorService(), maxParallel); + } else { + // run in the caller's thread. + service = null; + } + this.optional = joinOp.isOptional(); + this.variablesToKeep = joinOp.variablesToKeep(); + this.context = context; + this.relation = context.getRelation(predicate); + this.source = context.getSource(); + this.sink = context.getSink(); + this.sink2 = context.getSink2(); + this.partitionId = context.getPartitionId(); + this.stats = (PipelineJoinStats) context.getStats(); + this.limit = joinOp.getProperty(Annotations.LIMIT, + Annotations.DEFAULT_LIMIT); this.coalesceAccessPaths = joinOp.getProperty( Annotations.COALESCE_DUPLICATE_ACCESS_PATHS, Annotations.DEFAULT_COALESCE_DUPLICATE_ACCESS_PATHS); - - this.threadLocalBufferFactory = new TLBFactory(sink); - - this.threadLocalBufferFactory2 = sink2 == null ? null - : new TLBFactory(sink2); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + this.threadLocalBufferFactory = new TLBFactory(sink); - } + this.threadLocalBufferFactory2 = sink2 == null ? null + : new TLBFactory(sink2); - public String toString() { + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - return getClass().getName() + "{ joinOp=" + joinOp + "}"; + } - } + public String toString() { - /** - * Runs the {@link JoinTask}. - * - * @return <code>null</code>. - */ - public Void call() throws Exception { + return getClass().getName() + "{ joinOp=" + joinOp + "}"; -// final long begin = System.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + } - try { + /** + * Runs the {@link JoinTask}. + * + * @return <code>null</code>. + */ + public Void call() throws Exception { - /* - * Consume bindingSet chunks from the source JoinTask(s). - */ - consumeSource(); + // final long begin = System.currentTimeMillis(); - /* - * Flush and close the thread-local output buffers. - */ - threadLocalBufferFactory.flush(); - if (threadLocalBufferFactory2 != null) - threadLocalBufferFactory2.flush(); + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - // flush the sync buffer - flushAndCloseBuffersAndAwaitSinks(); + try { - if (log.isDebugEnabled()) - log.debug("JoinTask done: joinOp=" + joinOp); + /* + * Consume bindingSet chunks from the source JoinTask(s). + */ + consumeSource(); - halted(); + /* + * Flush and close the thread-local output buffers. + */ + threadLocalBufferFactory.flush(); + if (threadLocalBufferFactory2 != null) + threadLocalBufferFactory2.flush(); - return null; + // flush the sync buffer + flushAndCloseBuffersAndAwaitSinks(); - } catch (Throwable t) { + if (log.isDebugEnabled()) + log.debug("JoinTask done: joinOp=" + joinOp); - /* - * This is used for processing errors and also if this task is - * interrupted (because the sink has been closed). - */ + halted(); - halt(t); + return null; - // reset the unsync buffers. - try { - // resetUnsyncBuffers(); - threadLocalBufferFactory.reset(); - if (threadLocalBufferFactory2 != null) - threadLocalBufferFactory2.reset(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + } catch (Throwable t) { - // reset the sync buffer and cancel the sink JoinTasks. - try { - cancelSinks(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + /* + * This is used for processing errors and also if this task is + * interrupted (because the sink has been closed). + */ - /* - * Close source iterators, which will cause any source JoinTasks - * that are still executing to throw a CancellationException - * when the Future associated with the source iterator is - * cancelled. - */ - try { - closeSources(); - } catch (Throwable t2) { - log.error(t2.getLocalizedMessage(), t2); - } + halt(t); - throw new RuntimeException(t); + // reset the unsync buffers. + try { + // resetUnsyncBuffers(); + threadLocalBufferFactory.reset(); + if (threadLocalBufferFactory2 != null) + threadLocalBufferFactory2.reset(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } -// } finally { -// -// stats.elapsed.add(System.currentTimeMillis() - begin); - - } + // reset the sync buffer and cancel the sink JoinTasks. + try { + cancelSinks(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } - } + /* + * Close source iterators, which will cause any source JoinTasks + * that are still executing to throw a CancellationException + * when the Future associated with the source iterator is + * cancelled. + */ + try { + closeSources(); + } catch (Throwable t2) { + log.error(t2.getLocalizedMessage(), t2); + } - /** - * Consume {@link IBindingSet} chunks from the {@link #source}. - * - * @throws Exception - */ - protected void consumeSource() throws Exception { + throw new RuntimeException(t); - IBindingSet[] chunk; + // } finally { + // + // stats.elapsed.add(System.currentTimeMillis() - begin); + } + + } + + /** + * Consume {@link IBindingSet} chunks from the {@link #source}. + * + * @throws Exception + */ + protected void consumeSource() throws Exception { + + IBindingSet[] chunk; + while (!isDone() && (chunk = nextChunk()) != null) { if (chunk.length == 0) { @@ -789,240 +803,240 @@ */ continue; } - - /* - * Consume the chunk until done using either the caller's thread - * or the executor service as appropriate to run subtasks. - */ - if (chunk.length <= 1) { - - /* - * Run on the caller's thread anyway since there is just one - * binding set to be consumed. - */ - - new BindingSetConsumerTask(null/* service */, chunk).call(); - - } else { - - /* - * Run subtasks on either the caller's thread or the shared - * executed service depending on the configured value of - * [maxParallel]. - */ - - new BindingSetConsumerTask(service, chunk).call(); - - } - - } - } + /* + * Consume the chunk until done using either the caller's thread + * or the executor service as appropriate to run subtasks. + */ + if (chunk.length <= 1) { - /** - * Closes the {@link #source} specified to the ctor. - */ - protected void closeSources() { + /* + * Run on the caller's thread anyway since there is just one + * binding set to be consumed. + */ - if (log.isInfoEnabled()) - log.info(toString()); + new BindingSetConsumerTask(null/* service */, chunk) + .call(); - source.close(); + } else { - } + /* + * Run subtasks on either the caller's thread or the shared + * executed service depending on the configured value of + * [maxParallel]. + */ - /** - * Flush and close all output buffers and await sink {@link JoinTask} - * (s). - * <p> - * Note: You MUST close the {@link BlockingBuffer} from which each sink - * reads <em>before</em> invoking this method in order for those sinks - * to terminate. Otherwise the source {@link IAsynchronousIterator}(s) - * on which the sink is reading will remain open and the sink will never - * decide that it has exhausted its source(s). - * - * @throws InterruptedException - * @throws ExecutionException - */ - protected void flushAndCloseBuffersAndAwaitSinks() - throws InterruptedException, ExecutionException { + new BindingSetConsumerTask(service, chunk).call(); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + } - /* - * Close the thread-safe output buffer. For any JOIN except the - * last, this buffer will be the source for one or more sink - * JoinTasks for the next join dimension. Once this buffer is - * closed, the asynchronous iterator draining the buffer will - * eventually report that there is nothing left for it to process. - * - * Note: This is a BlockingBuffer. BlockingBuffer#flush() is a NOP. - */ + } - sink.flush(); - sink.close(); - - if(sink2!=null) { - sink2.flush(); - sink2.close(); - } - - } + } - /** - * Cancel sink {@link JoinTask}(s). - */ - protected void cancelSinks() { + /** + * Closes the {@link #source} specified to the ctor. + */ + protected void closeSources() { - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + if (log.isInfoEnabled()) + log.info(toString()); - sink.reset(); + source.close(); - if (sink.getFuture() != null) { + } - sink.getFuture().cancel(true/* mayInterruptIfRunning */); + /** + * Flush and close all output buffers and await sink {@link JoinTask} + * (s). + * <p> + * Note: You MUST close the {@link BlockingBuffer} from which each sink + * reads <em>before</em> invoking this method in order for those sinks + * to terminate. Otherwise the source {@link IAsynchronousIterator}(s) + * on which the sink is reading will remain open and the sink will never + * decide that it has exhausted its source(s). + * + * @throws InterruptedException + * @throws ExecutionException + */ + protected void flushAndCloseBuffersAndAwaitSinks() + throws InterruptedException, ExecutionException { - } + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - if (sink2 != null) { - - sink2.reset(); + /* + * Close the thread-safe output buffer. For any JOIN except the + * last, this buffer will be the source for one or more sink + * JoinTasks for the next join dimension. Once this buffer is + * closed, the asynchronous iterator draining the buffer will + * eventually report that there is nothing left for it to process. + * + * Note: This is a BlockingBuffer. BlockingBuffer#flush() is a NOP. + */ - if (sink2.getFuture() != null) { + sink.flush(); + sink.close(); - sink2.getFuture().cancel(true/* mayInterruptIfRunning */); + if (sink2 != null) { + sink2.flush(); + sink2.close(); + } - } - - } + } - } + /** + * Cancel sink {@link JoinTask}(s). + */ + protected void cancelSinks() { - /** - * Return a chunk of {@link IBindingSet}s from source. - * - * @return The next chunk -or- <code>null</code> iff the source is - * exhausted. - */ - protected IBindingSet[] nextChunk() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - if (log.isDebugEnabled()) - log.debug("joinOp=" + joinOp); + sink.reset(); - while (!source.isExhausted()) { + if (sink.getFuture() != null) { - halted(); + sink.getFuture().cancel(true/* mayInterruptIfRunning */); - // note: uses timeout to avoid blocking w/o testing [halt]. - if (source.hasNext(10, TimeUnit.MILLISECONDS)) { + } - // read the chunk. - final IBindingSet[] chunk = source.next(); + if (sink2 != null) { - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); + sink2.reset(); - if (log.isDebugEnabled()) - log.debug("Read chunk from source: chunkSize=" - + chunk.length + ", joinOp=" + joinOp); + if (sink2.getFuture() != null) { - return chunk; + sink2.getFuture().cancel(true/* mayInterruptIfRunning */); - } + } - } + } - /* - * Termination condition: the source is exhausted. - */ + } - if (log.isDebugEnabled()) - log.debug("Source exhausted: joinOp=" + joinOp); + /** + * Return a chunk of {@link IBindingSet}s from source. + * + * @return The next chunk -or- <code>null</code> iff the source is + * exhausted. + */ + protected IBindingSet[] nextChunk() throws InterruptedException { - return null; + if (log.isDebugEnabled()) + log.debug("joinOp=" + joinOp); - } + while (!source.isExhausted()) { - /** - * Class consumes a chunk of binding set executing a nested indexed join - * until canceled, interrupted, or all the binding sets are exhausted. - * For each {@link IBindingSet} in the chunk, an {@link AccessPathTask} - * is created which will consume that {@link IBindingSet}. The - * {@link AccessPathTask}s are sorted based on their - * <code>fromKey</code> so as to order the execution of those tasks in a - * manner that will maximize the efficiency of index reads. The ordered - * {@link AccessPathTask}s are then submitted to the caller's - * {@link Executor} or run in the caller's thread if the executor is - * <code>null</code>. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - * @version $Id$ - */ - protected class BindingSetConsumerTask implements Callable<Void> { + halted(); - private final Executor executor; - private final IBindingSet[] chunk; + // note: uses timeout to avoid blocking w/o testing [halt]. + if (source.hasNext(10, TimeUnit.MILLISECONDS)) { - /** - * - * @param executor - * The service that will execute the generated - * {@link AccessPathTask}s -or- <code>null</code> IFF you - * want the {@link AccessPathTask}s to be executed in the - * caller's thread. - * @param chunk - * A chunk of binding sets from the upstream producer. - */ - public BindingSetConsumerTask(final Executor executor, - final IBindingSet[] chunk) { + // read the chunk. + final IBindingSet[] chunk = source.next(); - if (chunk == null) - throw new IllegalArgumentException(); - - this.executor = executor; - - this.chunk = chunk; + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); - } + if (log.isDebugEnabled()) + log.debug("Read chunk from source: chunkSize=" + + chunk.length + ", joinOp=" + joinOp); - /** - * Read chunks from one or more sources until canceled, interrupted, - * or all sources are exhausted and submits {@link AccessPathTask}s - * to the caller's {@link ExecutorService} -or- executes those tasks - * in the caller's thread if no {@link ExecutorService} was provided - * to the ctor. - * <p> - * Note: When running with an {@link ExecutorService}, the caller is - * responsible for waiting on that {@link ExecutorService} until the - * {@link AccessPathTask}s to complete and must verify all tasks - * completed successfully. - * - * @return <code>null</code> - * - * @throws BufferClosedException - * if there is an attempt to output a chunk of - * {@link IBindingSet}s or {@link ISolution}s and the - * output buffer is an {@link IBlockingBuffer} (true for - * all join dimensions exception the lastJoin and also - * true for query on the lastJoin) and that - * {@link IBlockingBuffer} has been closed. - */ + return chunk; + + } + + } + + /* + * Termination condition: the source is exhausted. + */ + + if (log.isDebugEnabled()) + log.debug("Source exhausted: joinOp=" + joinOp); + + return null; + + } + + /** + * Class consumes a chunk of binding set executing a nested indexed join + * until canceled, interrupted, or all the binding sets are exhausted. + * For each {@link IBindingSet} in the chunk, an {@link AccessPathTask} + * is created which will consume that {@link IBindingSet}. The + * {@link AccessPathTask}s are sorted based on their + * <code>fromKey</code> so as to order the execution of those tasks in a + * manner that will maximize the efficiency of index reads. The ordered + * {@link AccessPathTask}s are then submitted to the caller's + * {@link Executor} or run in the caller's thread if the executor is + * <code>null</code>. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + protected class BindingSetConsumerTask implements Callable<Void> { + + private final Executor executor; + private final IBindingSet[] chunk; + + /** + * + * @param executor + * The service that will execute the generated + * {@link AccessPathTask}s -or- <code>null</code> IFF you + * want the {@link AccessPathTask}s to be executed in the + * caller's thread. + * @param chunk + * A chunk of binding sets from the upstream producer. + */ + public BindingSetConsumerTask(final Executor executor, + final IBindingSet[] chunk) { + + if (chunk == null) + throw new IllegalArgumentException(); + + this.executor = executor; + + this.chunk = chunk; + + } + + /** + * Read chunks from one or more sources until canceled, interrupted, + * or all sources are exhausted and submits {@link AccessPathTask}s + * to the caller's {@link ExecutorService} -or- executes those tasks + * in the caller's thread if no {@link ExecutorService} was provided + * to the ctor. + * <p> + * Note: When running with an {@link ExecutorService}, the caller is + * responsible for waiting on that {@link ExecutorService} until the + * {@link AccessPathTask}s to complete and must verify all tasks + * completed successfully. + * + * @return <code>null</code> + * + * @throws BufferClosedException + * if there is an attempt to output a chunk of + * {@link IBindingSet}s or {@link ISolution}s and the + * output buffer is an {@link IBlockingBuffer} (true for + * all join dimensions exception the lastJoin and also + * true for query on the lastJoin) and that + * {@link IBlockingBuffer} has been closed. + */ public Void call() throws Exception { - try { + try { - if (chunk.length == 1) { - - // fast path if the chunk has a single binding set. - runOneTask(); - - return null; - - } + if (chunk.length == 1) { + // fast path if the chunk has a single binding set. + runOneTask(); + + return null; + + } + /* * Generate (and optionally coalesce) the access path tasks. */ @@ -1031,33 +1045,33 @@ /* * Reorder those tasks for better index read performance. */ - reorderTasks(tasks); + reorderTasks(tasks); - /* - * Execute the tasks (either in the caller's thread or on - * the supplied service). - */ - executeTasks(tasks); + /* + * Execute the tasks (either in the caller's thread or on + * the supplied service). + */ + executeTasks(tasks); - return null; + return null; - } catch (Throwable t) { + } catch (Throwable t) { - halt(t); + halt(t); - throw new RuntimeException(t); + throw new RuntimeException(t); - } + } - } + } - /** - * There is exactly one {@link IBindingSet} in the chunk, so run - * exactly one {@link AccessPathTask}. - * - * @throws Exception - */ - private void runOneTask() throws Exception { + /** + * There is exactly one {@link IBindingSet} in the chunk, so run + * exactly one {@link AccessPathTask}. + * + * @throws Exception + */ + private void runOneTask() throws Exception { if (chunk.length != 1) throw new AssertionError(); @@ -1094,10 +1108,11 @@ * * @param chunk * The chunk. - * + * * @return The tasks to process that chunk. */ - protected AccessPathTask[] generateAccessPaths(final IBindingSet[] chunk) { + protected AccessPathTask[] generateAccessPaths( + final IBindingSet[] chunk) { final AccessPathTask[] tasks; @@ -1122,7 +1137,7 @@ /* * Do not coalesce access paths. */ - + tasks = new JoinTask.AccessPathTask[chunk.length]; for (int i = 0; i < chunk.length; i++) { @@ -1157,272 +1172,272 @@ } return tasks; - - } - - /** - * Populates a map of asBound predicates paired to a set of - * bindingSets. - * <p> - * Note: The {@link AccessPathTask} will apply each bindingSet to - * each element visited by the {@link IAccessPath} obtained for the - * asBound {@link IPredicate}. This has the natural consequence of - * eliminating subqueries within the chunk. - * - * @param chunk - * A chunk of bindingSets from the source join dimension. - * - * @return A map which pairs the distinct asBound predicates to the - * bindingSets in the chunk from which the predicate was - * generated. - */ - protected Map<HashedPredicate<E>, Collection<IBindingSet>> combineBindingSets( - final IBindingSet[] chunk) { - if (log.isDebugEnabled()) - log.debug("chunkSize=" + chunk.length); + } - final Map<HashedPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<HashedPredicate<E>, Collection<IBindingSet>>( - chunk.length); + /** + * Populates a map of asBound predicates paired to a set of + * bindingSets. + * <p> + * Note: The {@link AccessPathTask} will apply each bindingSet to + * each element visited by the {@link IAccessPath} obtained for the + * asBound {@link IPredicate}. This has the natural consequence of + * eliminating subqueries within the chunk. + * + * @param chunk + * A chunk of bindingSets from the source join dimension. + * + * @return A map which pairs the distinct asBound predicates to the + * bindingSets in the chunk from which the predicate was + * generated. + */ + protected Map<HashedPredicate<E>, Collection<IBindingSet>> combineBindingSets( + final IBindingSet[] chunk) { - for (IBindingSet bindingSet : chunk) { + if (log.isDebugEnabled()) + log.debug("chunkSize=" + chunk.length); - halted(); + final Map<HashedPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<HashedPredicate<E>, Collection<IBindingSet>>( + chunk.length); - // constrain the predicate to the given bindings. - IPredicate<E> asBound = predicate.asBound(bindingSet); + for (IBindingSet bindingSet : chunk) { - if (partitionId != -1) { + halted(); - /* - * Constrain the predicate to the desired index - * partition. - * - * Note: we do this for scale-out joins since the access - * path will be evaluated by a JoinTask dedicated to - * this index partition, which is part of how we give - * the JoinTask to gain access to the local index object - * for an index partition. - */ + // constrain the predicate to the given bindings. + IPredicate<E> asBound = predicate.asBound(bindingSet); - asBound = asBound.setPartitionId(partitionId); + if (partitionId != -1) { - } + /* + * Constrain the predicate to the desired index + * partition. + * + * Note: we do this for scale-out joins since the access + * path will be evaluated by a JoinTask dedicated to + * this index partition, which is part of how we give + * the JoinTask to gain access to the local index object + * for an index partition. + */ - // lookup the asBound predicate in the map. - final HashedPredicate<E> hashedPred = new HashedPredicate<E>(asBound); - Collection<IBindingSet> values = map.get(hashedPred); + asBound = asBound.setPartitionId(partitionId); - if (values == null) { + } - /* - * This is the first bindingSet for this asBound - * predicate. We create a collection of bindingSets to - * be paired with that predicate and put the collection - * into the map using that predicate as the key. - */ + // lookup the asBound predicate in the map. + final HashedPredicate<E> hashedPred = new HashedPredicate<E>( + asBound); + Collection<IBindingSet> values = map.get(hashedPred); - values = new LinkedList<IBindingSet>(); + if (values == null) { - map.put(hashedPred, values); + /* + * This is the first bindingSet for this asBound + * predicate. We create a collection of bindingSets to + * be paired with that predicate and put the collection + * into the map using that predicate as the key. + */ - } else { + values = new LinkedList<IBindingSet>(); - // more than one bindingSet will use the same access - // path. - stats.accessPathDups.increment(); + map.put(hashedPred, values); - } + } else { - /* - * Add the bindingSet to the collection of bindingSets - * paired with the asBound predicate. - */ + // more than one bindingSet will use the same access + // path. + stats.accessPathDups.increment(); - values.add(bindingSet); + } - } + /* + * Add the bindingSet to the collection of bindingSets + * paired with the asBound predicate. + */ - if (log.isDebugEnabled()) - log.debug("chunkSize=" + chunk.length - + ", #distinct predicates=" + map.size()); + values.add(bindingSet); - return map; + } - } + if (log.isDebugEnabled()) + log.debug("chunkSize=" + chunk.length + + ", #distinct predicates=" + map.size()); - /** - * Creates an {@link AccessPathTask} for each {@link IBindingSet} in - * the given chunk. - * - * @param chunk - * A chunk of {@link IBindingSet}s from one or more - * source {@link JoinTask}s. - * - * @return A chunk of {@link AccessPathTask} in a desirable - * execution order. - * - * @throws Exception - */ - protected AccessPathTask[] getAccessPathTasks( - final Map<HashedPredicate<E>, Collection<IBindingSet>> map) { + return map; - final int n = map.size(); + } - if (log.isDebugEnabled()) - log.debug("#distinct predicates=" + n); + /** + * Creates an {@link AccessPathTask} for each {@link IBindingSet} in + * the given chunk. + * + * @param chunk + * A chunk of {@link IBindingSet}s from one or more + * source {@link JoinTask}s. + * + * @return A chunk of {@link AccessPathTask} in a desirable + * execution order. + * + * @throws Exception + */ + protected AccessPathTask[] getAccessPathTasks( + final Map<HashedPredicate<E>, Collection<IBindingSet>> map) { - final AccessPathTask[] tasks = new JoinTask.AccessPathTask[n]; + final int n = map.size(); - final Iterator<Map.Entry<HashedPredicate<E>, Collection<IBindingSet>>> itr ... [truncated message content] |
From: <tho...@us...> - 2011-01-16 22:45:39
|
Revision: 4117 http://bigdata.svn.sourceforge.net/bigdata/?rev=4117&view=rev Author: thompsonbry Date: 2011-01-16 22:45:32 +0000 (Sun, 16 Jan 2011) Log Message: ----------- More work on [1]. [1] https://sourceforge.net/apps/trac/bigdata/ticket/230 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-16 21:09:33 UTC (rev 4116) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-16 22:45:32 UTC (rev 4117) @@ -728,6 +728,7 @@ * This is used for processing errors and also if this task is * interrupted (because the sink has been closed). */ + // ensure query halts. halt(t); // reset the unsync buffers. @@ -759,7 +760,12 @@ log.error(t2.getLocalizedMessage(), t2); } - throw new RuntimeException(t); + if (getCause() != null) { + // abnormal termination. + throw new RuntimeException(t); + } + // normal termination - ignore exception. + return null; // } finally { // @@ -1040,9 +1046,16 @@ return null; } catch (Throwable t) { - - throw new RuntimeException(halt(t)); + // ensure query halts. + halt(t); + if (getCause() != null) { + // abnormal termination. + throw new RuntimeException(t); + } + // normal terminate - ignore exception. + return null; + } } @@ -1674,7 +1687,13 @@ } catch (Throwable t) { - throw new RuntimeException(halt(t)); + // ensure query halts. + halt(t); + if (getCause() != null) { + // abnormal termination. + throw new RuntimeException(t); + } + // normal termination - ignore exception. } finally { @@ -1880,7 +1899,13 @@ } catch (Throwable t) { - throw new RuntimeException(halt(t)); + // ensure query halts. + halt(t); + if (getCause() != null) { + // abnormal termination. + throw new RuntimeException(t); + } + // normal termination - ignore exception. } finally { @@ -2070,7 +2095,14 @@ } catch (Throwable t) { - throw new RuntimeException(halt(t)); + // ensure query halts. + halt(t); + if (getCause() != null) { + // abnormal termination. + throw new RuntimeException(t); + } + // normal termination - ignore exception. + return null; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-01-20 15:53:55
|
Revision: 4137 http://bigdata.svn.sourceforge.net/bigdata/?rev=4137&view=rev Author: thompsonbry Date: 2011-01-20 15:53:49 +0000 (Thu, 20 Jan 2011) Log Message: ----------- Making the CONSTRAINT on a join apply to the original binding set when the predicate is optional. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-20 13:38:30 UTC (rev 4136) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-20 15:53:49 UTC (rev 4137) @@ -48,6 +48,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; @@ -1683,6 +1684,16 @@ final IBindingSet bs = bindingSets[bindex]; + if (constraints != null) { + + // verify constraint. + if(!BOpUtility.isConsistent(constraints, bs)) { + // skip solutions which fail the constraint. + continue; + } + + } + if (log.isTraceEnabled()) log .trace("Passing on solution which fails an optional join: " This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-01-20 21:40:13
|
Revision: 4146 http://bigdata.svn.sourceforge.net/bigdata/?rev=4146&view=rev Author: thompsonbry Date: 2011-01-20 21:40:07 +0000 (Thu, 20 Jan 2011) Log Message: ----------- Counting solutions which join before applying the filter fixed the 1st and 3rd of these queries. Also applying the filter to the original solution in the case where we output the original solution if the optional join failed fixes the middle query, so all three are good now. http://www.w3.org/2001/sw/DataAccess/tests/data-r2/bound/manifest#dawg-bound-query-001 http://www.w3.org/2001/sw/DataAccess/tests/data-r2/optional-filter/manifest#dawg-optional-filter-002 http://www.w3.org/2001/sw/DataAccess/tests/data-r2/optional-filter/manifest#dawg-optional-filter-003 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-20 19:49:12 UTC (rev 4145) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2011-01-20 21:40:07 UTC (rev 4146) @@ -1684,16 +1684,11 @@ final IBindingSet bs = bindingSets[bindex]; - if (constraints != null) { - - // verify constraint. - if(!BOpUtility.isConsistent(constraints, bs)) { - // skip solutions which fail the constraint. - continue; - } - - } - + if(!BOpUtility.isConsistent(constraints, bs)) { + // Failed by the constraint on the join. + continue; + } + if (log.isTraceEnabled()) log .trace("Passing on solution which fails an optional join: " @@ -2077,6 +2072,21 @@ int bindex = 0; for (IBindingSet bset : bindingSets) { + // #of binding sets accepted. + naccepted++; + + /* #of elements accepted for this binding set. + * + * Note: We count binding sets as accepted before we + * apply the constraints. This has the effect that + * an optional join which produces solutions that + * are then rejected by a FILTER associated with the + * optional predicate WILL NOT pass on the original + * solution even if ALL solutions produced by the + * join are rejected by the filter. + */ + this.naccepted[bindex]++; + /* * Clone the binding set since it is tested for each * element visited. @@ -2098,15 +2108,12 @@ // Accept this binding set. unsyncBuffer.add(bset); - // #of binding sets accepted. - naccepted++; +// // #of binding sets accepted. +// naccepted++; // #of output solutions generated. stats.outputSolutions.increment(); - // #of elements accepted for this binding set. - this.naccepted[bindex]++; - } bindex++; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |