From: <tho...@us...> - 2011-03-31 17:11:35
|
Revision: 4357 http://bigdata.svn.sourceforge.net/bigdata/?rev=4357&view=rev Author: thompsonbry Date: 2011-03-31 17:11:26 +0000 (Thu, 31 Mar 2011) Log Message: ----------- This commits an implementation of an in-memory hash-join subquery operator. https://sourceforge.net/apps/trac/bigdata/ticket/272 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INBinarySearch.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INHashMap.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/constraint/TestINConstraint.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/AbstractSubqueryTestCase.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryHashJoinOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/model/TestFactory.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryJoinAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/JoinAnnotations.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -27,6 +27,8 @@ */ package com.bigdata.bop; +import java.util.Iterator; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -332,35 +334,71 @@ } /** - * Copy the bound values from the element into a binding set using the - * caller's variable names. + * Copy the values for variables in the predicate from the element, applying + * them to the caller's {@link IBindingSet}. * - * @param vars - * The ordered list of variables. - * @param e - * The element. - * @param bindingSet - * The binding set, which is modified as a side-effect. - * - * @todo This appears to be unused, in which case it should be dropped. + * @param src + * The source binding set. + * @param dst + * The target binding set, which is modified as a side-effect. + * + * @todo move to {@link BOpUtility}? + * @todo unit tests. + * @todo only copy / retain SELECTed variables within this method? */ - final public void bind(final IVariable<?>[] vars, final IElement e, - final IBindingSet bindingSet) { + @SuppressWarnings("unchecked") + static public void copyValues(final IBindingSet src, final IBindingSet dst) { - for (int i = 0; i < vars.length; i++) { + final Iterator<Map.Entry<IVariable, IConstant>> itr = src.iterator(); - final IVariable<?> var = vars[i]; + while (itr.hasNext()) { - @SuppressWarnings("unchecked") - final Constant<?> newval = new Constant(e.get(i)); + final Map.Entry<IVariable, IConstant> e = itr.next(); - bindingSet.set(var, newval); + final IVariable<?> var = (IVariable<?>) e.getKey(); + final IConstant<?> val = e.getValue(); + + if (val != null) { + + dst.set(var, val); + + } + } } // /** +// * Copy the bound values from the element into a binding set using the +// * caller's variable names. +// * +// * @param vars +// * The ordered list of variables. +// * @param e +// * The element. +// * @param bindingSet +// * The binding set, which is modified as a side-effect. +// * +// * @todo This appears to be unused, in which case it should be dropped. +// */ +// final public void bind(final IVariable<?>[] vars, final IElement e, +// final IBindingSet bindingSet) { +// +// for (int i = 0; i < vars.length; i++) { +// +// final IVariable<?> var = vars[i]; +// +// @SuppressWarnings("unchecked") +// final Constant<?> newval = new Constant(e.get(i)); +// +// bindingSet.set(var, newval); +// +// } +// +// } + +// /** // * Cancel the running query (normal termination). // * <p> // * Note: This method provides a means for an operator to indicate that the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -97,10 +97,13 @@ // String KEY_ORDER = "keyOrder"; /** - * <code>true</code> iff the predicate has SPARQL optional semantics. + * <code>true</code> iff the predicate has SPARQL optional semantics + * (default {@value #DEFAULT_OPTIONAL}). */ - String OPTIONAL = (IPredicate.class.getName() + ".optional").intern(); + String OPTIONAL = IPredicate.class.getName() + ".optional"; + boolean DEFAULT_OPTIONAL = false; + // /** // * Constraints on the elements read from the relation. // * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -27,7 +27,6 @@ package com.bigdata.bop; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -38,8 +37,6 @@ import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.bop.solutions.SortOp.Annotations; -import com.bigdata.relation.accesspath.IAsynchronousIterator; /** * Abstract base class for pipeline operators where the data moving along the @@ -72,17 +69,14 @@ * the ancestor in the operator tree which serves as the default sink * for binding sets (optional, default is the parent). */ - String SINK_REF = (PipelineOp.class.getName() + ".sinkRef").intern(); + String SINK_REF = PipelineOp.class.getName() + ".sinkRef"; /** * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of * the ancestor in the operator tree which serves as the alternative * sink for binding sets (default is no alternative sink). - * - * @see #ALT_SINK_GROUP */ - String ALT_SINK_REF = (PipelineOp.class.getName() + ".altSinkRef") - .intern(); + String ALT_SINK_REF = PipelineOp.class.getName() + ".altSinkRef"; /** * The value reported by {@link PipelineOp#isSharedState()} (default @@ -97,8 +91,7 @@ * When <code>true</code>, the {@link QueryEngine} will impose the * necessary constraints when the operator is evaluated. */ - String SHARED_STATE = (PipelineOp.class.getName() + ".sharedState") - .intern(); + String SHARED_STATE = PipelineOp.class.getName() + ".sharedState"; boolean DEFAULT_SHARED_STATE = false; @@ -118,7 +111,7 @@ * have less effect and performance tends to be best around a modest * value (10) for those annotations. */ - String MAX_PARALLEL = (PipelineOp.class.getName() + ".maxParallel").intern(); + String MAX_PARALLEL = PipelineOp.class.getName() + ".maxParallel"; /** * @see #MAX_PARALLEL @@ -138,8 +131,8 @@ * data to be assigned to an evaluation task is governed by * {@link #MAX_MEMORY} instead. */ - String MAX_MESSAGES_PER_TASK = (PipelineOp.class.getName() - + ".maxMessagesPerTask").intern(); + String MAX_MESSAGES_PER_TASK = PipelineOp.class.getName() + + ".maxMessagesPerTask"; /** * @see #MAX_MESSAGES_PER_TASK @@ -153,8 +146,8 @@ * amount of data which can be buffered on the JVM heap during pipelined * query evaluation. */ - String PIPELINE_QUEUE_CAPACITY = (PipelineOp.class.getName() - + ".pipelineQueueCapacity").intern(); + String PIPELINE_QUEUE_CAPACITY = PipelineOp.class.getName() + + ".pipelineQueueCapacity"; /** * @see #PIPELINE_QUEUE_CAPACITY @@ -166,49 +159,44 @@ * <code>false</code> the operator will use either "at-once" or * "blocked" evaluation depending on how it buffers its data for * evaluation. - * <p> - * When <code>false</code>, operator "at-once" evaluation will be used - * if either (a) the operator is buffering data on the Java heap; or (b) - * the operator is buffering data on the native heap and the amount of - * buffered data does not exceed the specified value for - * {@link #MAX_MEMORY}. For convenience, you may specify - * {@link Integer#MAX_VALUE} for {@link #MAX_MEMORY} to indicate that - * "at-once" evaluation is required. - * <p> - * When data are buffered on the Java heap, "at-once" evaluation is - * implied and the data will be made available to the operator as a - * single {@link IAsynchronousIterator} when the operator is invoked. - * <p> - * When {@link #MAX_MEMORY} is positive, data are marshaled in - * {@link ByteBuffer}s and the operator will be invoked once either (a) - * its memory threshold for the buffered data has been exceeded; or (b) - * no predecessor of the operator is running (or can be triggered) -and- - * all inputs for the operator have been materialized on this node. Note - * that some operators DO NOT support multiple pass evaluation - * semantics. Such operators MUST throw an exception if the value of - * this annotation could result in multiple evaluation passes. + * + * @see PipelineOp#isPipelined() */ - String PIPELINED = (PipelineOp.class.getName() + ".pipelined").intern(); + String PIPELINED = PipelineOp.class.getName() + ".pipelined"; /** * @see #PIPELINED */ boolean DEFAULT_PIPELINED = true; - /** - * For non-{@link #PIPELINED} operators, this non-negative value - * specifies the maximum #of bytes which the operator may buffer on the - * native heap before evaluation of the operator is triggered -or- ZERO - * (0) if the operator buffers the data on the Java heap (default - * {@value #DEFAULT_MAX_MEMORY}). For a sharded operation, the value is - * the maximum #of bytes which may be buffered per shard. - */ - String MAX_MEMORY = (PipelineOp.class.getName() + ".maxMemory").intern(); + /** + * This annotation is only used for non-{@link #PIPELINED} operators and + * specifies the maximum #of bytes which the operator may buffer on the + * native heap before evaluation of the operator is triggered (default + * {@value #DEFAULT_MAX_MEMORY}). + * <p> + * If an operator buffers its data on the Java heap then this MUST be + * ZERO (0L). At-once evaluation is always used for non-pipelined + * operators which buffer their data on the Java heap as there is no + * ready mechanism to bound their heap demand. + * <p> + * If an operator buffers its data on the native heap, then this MUST be + * some positive value which specifies the maximum #of bytes which may + * be buffered before the operator is evaluated. At-once evaluation for + * operators which buffer their data on the native heap is indicated + * with the value {@link Long#MAX_VALUE}. + * <p> + * Note: For a sharded operation, the value is the maximum #of bytes + * which may be buffered per shard. + * + * @see PipelineOp#isPipelined() + */ + String MAX_MEMORY = PipelineOp.class.getName() + ".maxMemory"; /** * @see #MAX_MEMORY */ - int DEFAULT_MAX_MEMORY = 0; + long DEFAULT_MAX_MEMORY = 0L; } @@ -457,24 +445,46 @@ } - /** - * Assert that this operator is annotated as an "at-once" operator which - * buffers its data on the java heap. - */ + /** + * Assert that this operator is annotated as an "at-once" operator which + * buffers its data on the java heap. The requirements are: + * + * <pre> + * PIPELINED := false + * MAX_MEMORY := 0 + * </pre> + * + * When the operator is not pipelined then it is either "blocked" or + * "at-once". When MAX_MEMORY is ZERO, the operator will buffer its data on + * the Java heap. All operators which buffer data on the java heap will + * buffer an unbounded amount of data and are therefore "at-once" rather + * than "blocked". Operators which buffer their data on the native heap may + * support either "blocked" and/or "at-once" evaluation, depending on the + * operators. E.g., a hash join can be either "blocked" or "at-once" while + * an ORDER-BY is always "at-once". + */ protected void assertAtOnceJavaHeapOp() { - // operator may not be broken into multiple tasks. - if (getMaxParallel() != 1) { - throw new UnsupportedOperationException(Annotations.MAX_PARALLEL - + "=" + getMaxParallel()); - } - // operator is "at-once" (not pipelined). if (isPipelined()) { throw new UnsupportedOperationException(Annotations.PIPELINED + "=" + isPipelined()); } - } +// // operator may not be broken into multiple tasks. +// if (getMaxParallel() != 1) { +// throw new UnsupportedOperationException(Annotations.MAX_PARALLEL +// + "=" + getMaxParallel()); +// } + // operator must buffer its data on the Java heap + final long maxMemory = getProperty(Annotations.MAX_MEMORY, + Annotations.DEFAULT_MAX_MEMORY); + + if (maxMemory != 0L) + throw new UnsupportedOperationException(Annotations.MAX_MEMORY + + "=" + maxMemory); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -225,7 +225,7 @@ final public boolean isOptional() { - return (Boolean) getProperty(Annotations.OPTIONAL,Boolean.FALSE); + return (Boolean) getProperty(Annotations.OPTIONAL,Annotations.DEFAULT_OPTIONAL); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INBinarySearch.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INBinarySearch.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INBinarySearch.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -176,7 +176,7 @@ if (x == null) { - // not yet bound : @todo should this reject an unbound variable? + // not yet bound : FIXME Modify to return false - variables must be bound return true; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INHashMap.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INHashMap.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/INHashMap.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -159,7 +159,7 @@ if (x == null) { - // not yet bound. + // not yet bound : FIXME Modify to return false - variables must be bound return true; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -27,24 +27,36 @@ package com.bigdata.bop.controller; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import org.apache.log4j.Logger; + import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; +import com.bigdata.counters.CAT; +import com.bigdata.relation.accesspath.AbstractUnsynchronizedArrayBuffer; +import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.UnsyncLocalOutputBuffer; /** * Hash join with subquery. @@ -74,58 +86,57 @@ */ private static final long serialVersionUID = 1L; - public interface Annotations extends PipelineOp.Annotations { + static private final transient Logger log = Logger + .getLogger(SubqueryHashJoinOp.class); + public interface Annotations extends SubqueryJoinAnnotations { + /** * The join variables (required). This is an {@link IVariable}[] with * at least one variable. The order of the entries is used when forming * the as-bound keys for the hash table. Duplicate elements and null * elements are not permitted. */ - String JOIN_VARS = SubqueryHashJoinOp.class.getName() + ".subquery"; - - /** - * The subquery to be evaluated (required). This should be a - * {@link PipelineOp}. (It is basically the equivalent of the - * {@link IPredicate} for a {@link PipelineJoin}). - */ - String SUBQUERY = SubqueryHashJoinOp.class.getName() + ".subquery"; + String JOIN_VARS = SubqueryHashJoinOp.class.getName() + ".joinVars"; - /** - * An optional {@link IVariable}[] identifying the variables to be - * retained in the {@link IBindingSet}s written out by the operator. All - * variables are retained unless this annotation is specified. - * - * @todo This should be on {@link SubqueryOp} as well. - */ - String SELECT = SubqueryHashJoinOp.class.getName() + ".select"; +// /** +// * The subquery to be evaluated (required). This should be a +// * {@link PipelineOp}. (It is basically the equivalent of the +// * {@link IPredicate} for a {@link PipelineJoin}). +// */ +// String SUBQUERY = SubqueryHashJoinOp.class.getName() + ".subquery"; +// +// /** +// * When <code>true</code> the subquery has optional semantics (if the +// * subquery fails, the original binding set will be passed along to the +// * downstream sink anyway) (default {@value #DEFAULT_OPTIONAL}). +// * +// * @todo This is somewhat in conflict with how we mark optional +// * predicates to support the RTO. The OPTIONAL marker might +// * need to be moved onto the subquery. +// */ +// String OPTIONAL = SubqueryHashJoinOp.class.getName() + ".optional"; +// +// boolean DEFAULT_OPTIONAL = false; - /** - * An {@link IConstraint}[] which places restrictions on the legal - * patterns in the variable bindings (optional). - * - * @todo This should be on {@link SubqueryOp} as well. - */ - String CONSTRAINTS = SubqueryHashJoinOp.class.getName() + ".constraints"; +// /** +// * An optional {@link IVariable}[] identifying the variables to be +// * retained in the {@link IBindingSet}s written out by the operator. All +// * variables are retained unless this annotation is specified. +// * +// * FIXME This should be on {@link SubqueryOp} as well. +// */ +// String SELECT = SubqueryHashJoinOp.class.getName() + ".select"; +// +// /** +// * An {@link IConstraint}[] which places restrictions on the legal +// * patterns in the variable bindings (optional). +// * +// * FIXME This should be on {@link SubqueryOp} as well. +// */ +// String CONSTRAINTS = SubqueryHashJoinOp.class.getName() + ".constraints"; - /** - * When <code>true</code> the subquery has optional semantics (if the - * subquery fails, the original binding set will be passed along to the - * downstream sink anyway) (default {@value #DEFAULT_OPTIONAL}). - */ - String OPTIONAL = SubqueryHashJoinOp.class.getName() + ".optional"; - - boolean DEFAULT_OPTIONAL = false; - } - -// /** -// * @see Annotations#MAX_PARALLEL -// */ -// public int getMaxParallel() { -// return getProperty(Annotations.MAX_PARALLEL, -// Annotations.DEFAULT_MAX_PARALLEL); -// } /** * Deep copy constructor. @@ -145,10 +156,6 @@ super(args, annotations); -// if (!getEvaluationContext().equals(BOpEvaluationContext.CONTROLLER)) -// throw new IllegalArgumentException(Annotations.EVALUATION_CONTEXT -// + "=" + getEvaluationContext()); - final IVariable<?>[] joinVars = (IVariable[]) getRequiredProperty(Annotations.JOIN_VARS); if (joinVars.length == 0) @@ -165,22 +172,6 @@ assertAtOnceJavaHeapOp(); -// if (!getProperty(Annotations.CONTROLLER, Annotations.DEFAULT_CONTROLLER)) -// throw new IllegalArgumentException(Annotations.CONTROLLER); - -// // The id of this operator (if any). -// final Integer thisId = (Integer)getProperty(Annotations.BOP_ID); -// -// for(BOp op : args) { -// -// final Integer sinkId = (Integer) op -// .getRequiredProperty(Annotations.SINK_REF); -// -// if(sinkId.equals(thisId)) -// throw new RuntimeException("Operand may not target ") -// -// } - } public SubqueryHashJoinOp(final BOp[] args, NV... annotations) { @@ -195,22 +186,219 @@ } + @Override + public HashJoinStats newStats() { + + return new HashJoinStats(); + + } + /** - * Evaluates the arguments of the operator as subqueries. The arguments are - * evaluated in order. An {@link Executor} with limited parallelism to - * evaluate the arguments. If the controller operator is interrupted, then - * the subqueries are cancelled. If a subquery fails, then all subqueries - * are cancelled. + * Extended statistics for the join operator. + * + * TODO Make use of additional stats, verify in unit tests, and then trim + * the unused stats. */ + public static class HashJoinStats extends BOpStats { + + private static final long serialVersionUID = 1L; + + /** + * The running sum of the range counts of the accepted as-bound access + * paths. + */ + public final CAT accessPathRangeCount = new CAT(); + + /** + * The #of input solutions consumed (not just accepted). + * <p> + * This counter is highly correlated with {@link BOpStats#unitsIn} but + * is incremented only when we begin evaluation of the + * {@link IAccessPath} associated with a specific input solution. + * <p> + * When {@link Annotations#COALESCE_DUPLICATE_ACCESS_PATHS} is + * <code>true</code>, multiple input binding sets can be mapped onto the + * same {@link IAccessPath} and this counter will be incremented by the + * #of such input binding sets. + */ + public final CAT inputSolutions = new CAT(); + + /** + * The #of output solutions generated. This is incremented as soon as + * the solution is produced and is used by {@link #getJoinHitRatio()}. + * Of necessity, updates to {@link #inputSolutions} slightly lead + * updates to {@link #inputSolutions}. + */ + public final CAT outputSolutions = new CAT(); + + /** + * The estimated join hit ratio. This is computed as + * + * <pre> + * outputSolutions / inputSolutions + * </pre> + * + * It is ZERO (0) when {@link #inputSolutions} is ZERO (0). + * <p> + * The join hit ratio is always accurate when the join is fully + * executed. However, when a cutoff join is used to estimate the join + * hit ratio a measurement error can be introduced into the join hit + * ratio unless {@link Annotations#COALESCE_DUPLICATE_ACCESS_PATHS} is + * <code>false</code>, {@link Annotations#MAX_PARALLEL} is GT ONE (1), + * or {@link Annotations#MAX_PARALLEL_CHUNKS} is GT ZERO (0). + * <p> + * When access paths are coalesced because there is an inner loop over + * the input solutions mapped onto the same access path. This inner loop + * the causes {@link PipelineJoinStats#inputSolutions} to be incremented + * by the #of coalesced access paths <em>before</em> any + * {@link #outputSolutions} are counted. Coalescing access paths + * therefore can cause the join hit ratio to be underestimated as there + * may appear to be more input solutions consumed than were actually + * applied to produce output solutions if the join was cutoff while + * processing a set of input solutions which were identified as using + * the same as-bound access path. + * <p> + * The worst case can introduce substantial error into the estimated + * join hit ratio. Consider a cutoff of <code>100</code>. If one input + * solution generates 100 output solutions and two input solutions are + * mapped onto the same access path, then the input count will be 2 and + * the output count will be 100, which gives a reported join hit ration + * of <code>100/2</code> when the actual join hit ratio is + * <code>100/1</code>. + * <p> + * A similar problem can occur if {@link Annotations#MAX_PARALLEL} or + * {@link Annotations#MAX_PARALLEL_CHUNKS} is GT ONE (1) since input + * count can be incremented by the #of threads before any output + * solutions are generated. Estimation error can also occur if multiple + * join tasks are run in parallel for different chunks of input + * solutions. + */ + public double getJoinHitRatio() { + final long in = inputSolutions.get(); + final long out = outputSolutions.get(); + if (in == 0) + return 0; + return ((double) out) / in; + } + + /** + * The #of chunks read from an {@link IAccessPath}. + */ + public final CAT accessPathChunksIn = new CAT(); + + /** + * The #of elements read from an {@link IAccessPath}. + */ + public final CAT accessPathUnitsIn = new CAT(); + + public void add(final BOpStats o) { + + super.add(o); + + if (o instanceof HashJoinStats) { + + final HashJoinStats t = (HashJoinStats) o; + + accessPathRangeCount.add(t.accessPathRangeCount.get()); + + accessPathChunksIn.add(t.accessPathChunksIn.get()); + + accessPathUnitsIn.add(t.accessPathUnitsIn.get()); + + inputSolutions.add(t.inputSolutions.get()); + + outputSolutions.add(t.outputSolutions.get()); + } + + } + + @Override + protected void toString(final StringBuilder sb) { + sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); + sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); + sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); + sb.append(",inputSolutions=" + inputSolutions.get()); + sb.append(",outputSolutions=" + outputSolutions.get()); + sb.append(",joinHitRatio=" + getJoinHitRatio()); + } + + } + + /** + * Evaluation task. + */ private static class ControllerTask implements Callable<Void> { private final BOpContext<IBindingSet> context; - private final boolean optional; + + /** + * The operator which is being evaluated. + */ + private final SubqueryHashJoinOp joinOp; + + /** + * The join variables. + * + * @see SubqueryHashJoinOp.Annotations#JOIN_VARS + */ + private final IVariable<?>[] joinVars; + + /** + * The variables to be retained by the join operator. Variables not + * appearing in this list will be stripped before writing out the + * binding set onto the output sink(s). + * + * @see SubqueryHashJoinOp.Annotations#SELECT + */ + final private IVariable<?>[] selectVars; + + /** + * An array of constraints to be applied to the generated solutions + * (optional). + * + * @see SubqueryHashJoinOp.Annotations#CONSTRAINTS + */ + final private IConstraint[] constraints; + + /** + * The subquery to be evaluated. + * + * @see SubqueryHashJoinOp.Annotations#SUBQUERY + */ private final PipelineOp subquery; - public ControllerTask(final SubqueryHashJoinOp controllerOp, final BOpContext<IBindingSet> context) { + /** + * <code>true</code> iff the subquery has OPTIONAL semantics. + * + * @see IPredicate.Annotations#OPTIONAL + */ + private final boolean optional; - if (controllerOp == null) + /** + * Where the join results are written. + * <p> + * Solutions are written on a {@link UnsyncLocalOutputBuffer}, which + * converts them into chunks. Those {@link UnsyncLocalOutputBuffer} + * overflows onto the {@link #sink}. + */ + final private IBlockingBuffer<IBindingSet[]> sink; + + /** + * The alternative sink to use when the join is {@link #optional} AND + * {@link BOpContext#getSink2()} returns a distinct buffer for the + * alternative sink. The binding sets from the source are copied onto + * the alternative sink for an optional join if the join fails. Normally + * the {@link BOpContext#getSink()} can be used for both the joins which + * succeed and those which fail. The alternative sink is only necessary + * when the failed join needs to jump out of a join group rather than + * routing directly to the ancestor in the operator tree. + */ + final private IBlockingBuffer<IBindingSet[]> sink2; + + public ControllerTask(final SubqueryHashJoinOp joinOp, + final BOpContext<IBindingSet> context) { + + if (joinOp == null) throw new IllegalArgumentException(); if (context == null) @@ -218,207 +406,446 @@ this.context = context; - this.optional = controllerOp.getProperty(Annotations.OPTIONAL, + this.joinOp = joinOp; + + this.joinVars = (IVariable<?>[]) joinOp + .getRequiredProperty(Annotations.JOIN_VARS); + + this.selectVars = (IVariable<?>[]) joinOp + .getProperty(Annotations.SELECT); + + this.constraints = joinOp.getProperty( + Annotations.CONSTRAINTS, null/* defaultValue */); + + this.subquery = (PipelineOp) joinOp + .getRequiredProperty(Annotations.SUBQUERY); + + this.optional = joinOp.getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); - this.subquery = (PipelineOp) controllerOp - .getRequiredProperty(Annotations.SUBQUERY); + this.sink = context.getSink(); + + this.sink2 = context.getSink2(); } - /** - * Evaluate the subquery. - * - * @todo Support limited parallelism for each binding set read from the - * source. We will need to keep track of the running subqueries in - * order to wait on them before returning from this method and in - * order to cancel them if something goes wrong. - */ public Void call() throws Exception { - + + if (log.isDebugEnabled()) + log.debug("Evaluating subquery hash join: " + joinOp); + + final HashJoinStats stats = (HashJoinStats) context.getStats(); + + final QueryEngine queryEngine = context.getRunningQuery() + .getQueryEngine(); + try { - final IAsynchronousIterator<IBindingSet[]> sitr = context - .getSource(); - - while(sitr.hasNext()) { - - final IBindingSet[] chunk = sitr.next(); - - for(IBindingSet bset : chunk) { + /* + * Materialize the binding sets and populate a hash map. + */ + final IBindingSet[] all = BOpUtility.toArray(context + .getSource(), stats); - final IRunningQuery runningSubquery = new SubqueryTask( - bset, subquery, context).call(); + if (log.isDebugEnabled()) + log.debug("Materialized: " + all.length + + " source solutions."); - if (!runningSubquery.isDone()) { + final Map<Key, Bucket> map = new LinkedHashMap<Key, Bucket>(); - throw new AssertionError("Future not done: " - + runningSubquery.toString()); - - } + for (IBindingSet bset : all) { + final Key key = makeKey(bset); + + Bucket b = map.get(key); + + if(b == null) { + + map.put(key, b = new Bucket(bset)); + + } else { + + b.add(bset); + } - + } - - // Now that we know the subqueries ran Ok, flush the sink. - context.getSink().flush(); - - // Done. - return null; - } finally { + if (log.isDebugEnabled()) + log.debug("There are : " + map.size() + + " distinct combinations of the join vars: " + + Arrays.toString(joinVars)); - context.getSource().close(); + /* + * Run the subquery once. + * + * TODO We may want to use hash-joins at a position other than + * the head of the query plan, in which case we would invoke the + * hash join once per input binding set and the input bindings + * would be passed into the subquery. [I do not believe that + * this can be reconciled with "at-once" evaluation] + */ - context.getSink().close(); + final IRunningQuery runningSubquery = queryEngine + .eval((PipelineOp) subquery); - if (context.getSink2() != null) - context.getSink2().close(); + try { - } - - } + if (log.isDebugEnabled()) + log.debug("Running subquery..."); + + /* + * For each solution for the subquery, probe the hash map. + * If there is a hit, output the cross product of the + * solution with the solutions in the map having the same + * as-bound values for their join vars. + * + * When outputting a solution, first test the constraints. + * If they are satisfied, then output the SELECTed + * variables. + */ - /** - * Run a subquery. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - */ - private class SubqueryTask implements Callable<IRunningQuery> { + final UnsyncLocalOutputBuffer<IBindingSet> unsyncBuffer = new UnsyncLocalOutputBuffer<IBindingSet>( + joinOp.getChunkCapacity(), sink); - /** - * The evaluation context for the parent query. - */ - private final BOpContext<IBindingSet> parentContext; + // The iterator draining the subquery + final IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = runningSubquery + .iterator(); - /** - * The source binding set. This will be copied to the output if - * there are no solutions for the subquery (optional join - * semantics). - */ - private final IBindingSet bset; - - /** - * The root operator for the subquery. - */ - private final BOp subQueryOp; + while (subquerySolutionItr.hasNext()) { - public SubqueryTask(final IBindingSet bset, final BOp subQuery, - final BOpContext<IBindingSet> parentContext) { + final IBindingSet[] chunk = subquerySolutionItr.next(); - this.bset = bset; - - this.subQueryOp = subQuery; + if (log.isDebugEnabled()) + log.debug("Considering chunk of " + chunk.length + + " solutions from the subquery"); - this.parentContext = parentContext; + for (IBindingSet subquerySolution : chunk) { - } +// stats.accessPathUnitsIn.increment(); - public IRunningQuery call() throws Exception { + if (log.isDebugEnabled()) + log.debug("Considering " + subquerySolution); - // The subquery - IRunningQuery runningSubquery = null; - // The iterator draining the subquery - IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; - try { + final Key key = makeKey(subquerySolution); - final QueryEngine queryEngine = parentContext.getRunningQuery() - .getQueryEngine(); + // Probe the hash map. + Bucket b = map.get(key); - runningSubquery = queryEngine.eval((PipelineOp) subQueryOp, - bset); + if (b == null) + continue; + + for(SolutionHit src : b.solutions) { - long ncopied = 0L; - try { - - // Iterator visiting the subquery solutions. - subquerySolutionItr = runningSubquery.iterator(); + /* + * #of elements accepted for this binding set. + * + * Note: We count binding sets as accepted + * before we apply the constraints. This has the + * effect that an optional join which produces + * solutions that are then rejected by a FILTER + * associated with the optional predicate WILL + * NOT pass on the original solution even if ALL + * solutions produced by the join are rejected + * by the filter. + */ + src.nhits++; - // Copy solutions from the subquery to the query. - ncopied = BOpUtility.copy(subquerySolutionItr, - parentContext.getSink(), null/* sink2 */, - null/* constraints */, null/* stats */); + if (log.isDebugEnabled()) + log.debug("Join with " + src); - // wait for the subquery to halt / test for errors. - runningSubquery.get(); - - } catch (InterruptedException ex) { + /* + * Clone the source binding set since it is + * tested for each element visited. + */ + IBindingSet bset = src.solution.clone(); - // this thread was interrupted, so cancel the subquery. - runningSubquery - .cancel(true/* mayInterruptIfRunning */); + // propagate bindings from the subquery + BOpContext + .copyValues(subquerySolution/* src */, + bset/* dst */); - // rethrow the exception. - throw ex; - - } - - if (ncopied == 0L && optional) { + if (log.isDebugEnabled()) + log.debug("Joined solution: " + bset); + if (constraints != null + && !BOpUtility.isConsistent( + constraints, bset)) { + + // solution rejected by constraint(s). + + if (log.isDebugEnabled()) + log.debug("Join fails constraint(s): " + + bset); + + continue; + + } + + // strip off unnecessary variables. + bset = selectVars == null ? bset : bset + .copy(selectVars); + + if (log.isDebugEnabled()) + log.debug("Output solution: " + bset); + + // Accept this binding set. + unsyncBuffer.add(bset); + + // #of output solutions generated. + stats.outputSolutions.increment(); + + } + + } + + } + + if (optional) { + /* - * Since there were no solutions for the subquery, copy - * the original binding set to the default sink. - * - * @todo If we add a CONSTRAINTS annotation to the - * SubqueryOp then we need to make sure that it is - * applied to all solutions copied out of the subquery. + * Note: when NO subquery solutions joined for a given + * source binding set AND the subquery is OPTIONAL then + * we output the _original_ binding set to the sink join + * task(s), but the original binding set still must pass + * any constraint on the join. */ - parentContext.getSink().add(new IBindingSet[]{bset}); - - } + // Thread-local buffer iff optional sink is in use. + final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = sink2 == null ? null + : new UnsyncLocalOutputBuffer<IBindingSet>( + joinOp.getChunkCapacity(), sink2); + + for(Bucket b : map.values()) { + + for(SolutionHit hit : b.solutions) { + + if (hit.nhits > 0) + continue; + + final IBindingSet bs = hit.solution; + + if (log.isDebugEnabled()) + log.debug("Optional solution: " + bs); + + if (constraints != null) { + if (!BOpUtility.isConsistent(constraints, + bs)) { + + // Failed by the constraint on the join. + + if (log.isDebugEnabled()) + log + .debug("Optional solution failed by constraints: " + + hit); + + continue; + } + } + + if (log.isTraceEnabled()) + log.trace("Output optional solution: " + bs); + + if (unsyncBuffer2 == null) { + // use the default sink. + unsyncBuffer.add(bs); + } else { + // use the alternative sink. + unsyncBuffer2.add(bs); + } + + stats.outputSolutions.increment(); + + } + + } + + if (sink2 != null) { + unsyncBuffer2.flush(); + sink2.flush(); + } + + } // if(optional) - // done. - return runningSubquery; + /* + * Flush the output. + */ + unsyncBuffer.flush(); + sink.flush(); } catch (Throwable t) { - if (runningSubquery == null - || runningSubquery.getCause() != null) { - /* - * If things fail before we start the subquery, or if a - * subquery fails (due to abnormal termination), then - * propagate the error to the parent and rethrow the - * first cause error out of the subquery. - * - * Note: IHaltable#getCause() considers exceptions - * triggered by an interrupt to be normal termination. - * Such exceptions are NOT propagated here and WILL NOT - * cause the parent query to terminate. - */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt( - runningSubquery == null ? t - : runningSubquery.getCause())); + if (runningSubquery.getCause() != null) { + /* + * If a subquery fails (due to abnormal termination), + * then propagate the error to the parent and rethrow + * the first cause error out of the subquery. + * + * Note: IHaltable#getCause() considers exceptions + * triggered by an interrupt to be normal termination. + * Such exceptions are NOT propagated here and WILL NOT + * cause the parent query to terminate. + */ + + throw new RuntimeException(runningSubquery.getCause()); + } - - return runningSubquery; - + } finally { - try { + runningSubquery.cancel(true/* mayInterruptIfRunning */); - // ensure subquery is halted. - if (runningSubquery != null) - runningSubquery - .cancel(true/* mayInterruptIfRunning */); - - } finally { + } - // ensure the subquery solution iterator is closed. - if (subquerySolutionItr != null) - subquerySolutionItr.close(); + // done. + return null; - } - - } + } finally { + sink.close(); + if (sink2 != null) + sink2.close(); + } - } // SubqueryTask + } + /** + * Return an array of constants corresponding to the as-bound values of + * the join variables for the given solution. + * + * @param bset + * The solution. + * + * @return The as-bound values for the join variables for that solution. + */ + private Key makeKey(final IBindingSet bset) { + + final IConstant<?>[] vals = new IConstant<?>[joinVars.length]; + + for (int i = 0; i < joinVars.length; i++) { + + final IVariable<?> v = joinVars[i]; + + vals[i] = bset.get(v); + + } + + return new Key(vals); + + } + } // ControllerTask + /** + * Wrapper for the keys in the hash table. This is necessary for the hash + * table to compare the keys as equal and also provides a efficiencies in + * the hash code and equals() methods. + */ + private static class Key { + + private final int hash; + + private final IConstant<?>[] vals; + + public Key(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Key)) { + return false; + } + final Key t = (Key) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + } + + /** + * An input solution and a hit counter. + */ + private static class SolutionHit { + + /** + * The input solution. + */ + final public IBindingSet solution; + + /** + * The #of hits on that input solution when processing the join against + * the subquery. + */ + public int nhits = 0; + + private SolutionHit(final IBindingSet solution) { + + if(solution == null) + throw new IllegalArgumentException(); + + this.solution = solution; + + } + + public String toString() { + + return getClass().getName() + "{nhits=" + nhits + ",solution=" + + solution + "}"; + + } + + } // class SolutionHit + + /** + * A group of solutions having the same as-bound values for their join vars. + * Each solution is paired with a hit counter so we can support OPTIONAL + * semantics for the join. + */ + private static class Bucket { + + /** + * A set of solutions (and their hit counters) which have the same + * as-bound values for the join variables. + */ + final List<SolutionHit> solutions = new LinkedList<SolutionHit>(); + + public String toString() { + return super.toString() + // + "{#solutions=" + solutions.size() + // + "}"; + } + + public Bucket(final IBindingSet solution) { + + add(solution); + + } + + public void add(final IBindingSet solution) { + + if (solution == null) + throw new IllegalArgumentException(); + + solutions.add(new SolutionHit(solution)); + + } + + } // Bucket + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryJoinAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryJoinAnnotations.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryJoinAnnotations.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -0,0 +1,64 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Mar 31, 2011 + */ + +package com.bigdata.bop.controller; + +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.join.JoinAnnotations; +import com.bigdata.bop.join.PipelineJoin; + +/** + * Annotations for joins against a subquery. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface SubqueryJoinAnnotations extends JoinAnnotations { + + /** + * The subquery to be evaluated for each binding sets presented to the + * {@link SubqueryOp} (required). This should be a {@link PipelineOp}. (It + * is basically the equivalent of the {@link IPredicate} for a + * {@link PipelineJoin}). + */ + String SUBQUERY = SubqueryOp.class.getName() + ".subquery"; + + /** + * When <code>true</code> the subquery has optional semantics (if the + * subquery fails, the original binding set will be passed along to the + * downstream sink anyway) (default {@value #DEFAULT_OPTIONAL}). + * + * @todo This is somewhat in conflict with how we mark optional predicates + * to support the RTO. The OPTIONAL marker might need to be moved onto + * the subquery. + */ + String OPTIONAL = SubqueryOp.class.getName() + ".optional"; + + boolean DEFAULT_OPTIONAL = false; + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryJoinAnnotations.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-03-31 07:50:59 UTC (rev 4356) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-03-31 17:11:26 UTC (rev 4357) @@ -29,19 +29,17 @@ import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IPredicate; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.join.JoinAnnotations; import com.bigdata.relation.accesspath.IAsynchronousIterator; /** @@ -65,6 +63,9 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * * @see AbstractSubqueryOp + * + * FIXME Support {@link JoinAnnotations#SELECT} + * FIXME Support {@link JoinAnnotations#CONSTRAINTS} */ public class SubqueryOp extends PipelineOp { @@ -73,25 +74,8 @@ */ private static final long serialVersionUID = 1L; - public interface Annotations extends PipelineOp.Annotations { + public interface Annotations extends SubqueryJoinAnnotations { - /** - * The subquery to be evaluated for each binding sets presented to the - * {@link SubqueryOp} (required). This should be a {@link PipelineOp}. - * (It is basically the equivalent of the {@link IPredicate} for a - * {@link PipelineJoin}). - */ - String SUBQUERY = (SubqueryOp.class.getName() + ".subquery").intern(); - - /** - ... [truncated message content] |