From: <tho...@us...> - 2011-02-17 22:58:16
|
Revision: 4207 http://bigdata.svn.sourceforge.net/bigdata/?rev=4207&view=rev Author: thompsonbry Date: 2011-02-17 22:58:07 +0000 (Thu, 17 Feb 2011) Log Message: ----------- Redo of failed commit. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IVariable.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/PartitionedJoinGroup.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestVar.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestUnion.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/AbstractJoinGraphTestCase.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBSBMData.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestQueryHints.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByStagedOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/htree/raba/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestGroupByUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/RunQuery.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -246,6 +246,36 @@ return args[index]; } + +// /** +// * Return a new {@link BOp} where the specified argument has been replaced +// * by the given value. This is a copy-on-write operation. The original +// * {@link BOp} is NOT modified by this method. +// * +// * @param index +// * The index of the argument whose value will be changed. +// * @param arg +// * The new value for that argument. +// * +// * @return A new operator in which the given argument has been replaced. +// * +// * @throws IndexOutOfBoundsException +// * unless <i>index</i> is in (0:{@link #arity()}]. +// * @throws IllegalArgumentException +// * if <i>arg</i> is <code>null</code>. +// */ +// public BOp setArg(final int index,final BOp arg) { +// +// if(arg == null) +// throw new IllegalArgumentException(); +// +// final BOpBase tmp = this.clone(); +// +// tmp._set(index, arg); +// +// return tmp; +// +// } /** * Set the value of an operand. @@ -264,7 +294,7 @@ * * @todo thread safety and visibility.... */ - final protected void set(final int index, final BOp op) { + final protected void _set(final int index, final BOp op) { this.args[index] = op; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -79,7 +79,8 @@ * <ol> * <li>{@link BOp.Annotations#EVALUATION_CONTEXT} is * {@link BOpEvaluationContext#CONTROLLER}</li> - * <li>{@link PipelineOp.Annotations#THREAD_SAFE} is <code>false</code></li> + * <li>{@link PipelineOp.Annotations#MAX_PARALLEL} is <code>1</code></li> + * <li>{@link PipelineOp.Annotations#PIPELINED} is <code>true</code></li> * </ol> * Under these circumstances, it is possible for the {@link IQueryClient} to * atomically decide that a specific invocation of the operator task for the @@ -90,7 +91,8 @@ * controller. In addition, the operator must declare that it is NOT thread * safe in order for the query engine to serialize its evaluation tasks. * - * @return + * @todo This should be a ctor parameter. We just have to update the test + * suites for the changed method signature. */ public boolean isLastInvocation() { return lastInvocation.get(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -24,7 +24,7 @@ /** * @param var - * The {@link IVariable} which will be bound to result of + * The {@link IVariable} which will be bound to the result of * evaluating the associated value expression. * @param expr * The {@link IValueExpression} to be evaluated. @@ -44,23 +44,42 @@ super(args, annotations); } + /** + * Return the variable which will be bound to the result of evaluating the + * associated value expression. + */ @SuppressWarnings("unchecked") - @Override + public IVariable<E> getVar() { + + return (IVariable<E>) get(0); + + } + + /** + * Return the value expression. + */ + @SuppressWarnings("unchecked") + public IValueExpression<E> getExpr() { + + return (IValueExpression<E>) get(1); + + } + public E get(final IBindingSet bindingSet) { - - final IVariable<E> var = (IVariable<E>) get(0); - final IValueExpression<E> expr = (IValueExpression<E>) get(1); + final IVariable<E> var = getVar(); + final IValueExpression<E> expr = getExpr(); + // evaluate the value expression. E val = expr.get(bindingSet); - + // bind the variable as a side-effect. bindingSet.set(var, new Constant<E>(val)); - + // return the evaluated value return val; - + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IVariable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IVariable.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IVariable.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -41,5 +41,10 @@ * {@link IVariableOrConstant#getName()} */ int hashCode(); + + /** + * Return <code>true</code> iff this is the special variable <code>*</code> + */ + boolean isWildcard(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -35,7 +35,7 @@ import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; -import com.bigdata.bop.engine.ChunkedRunningQuery; +import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -101,24 +101,65 @@ boolean DEFAULT_SHARED_STATE = false; /** - * Annotation may be used to indicate operators which are not thread - * safe (default {@value #DEFAULT_THREAD_SAFE}). Concurrent invocations - * of the evaluation task will not be scheduled for a given shard for an - * operator which is not thread safe. - * - * @todo Unit tests for {@link ChunkedRunningQuery} to verify that it - * eventually schedules operator tasks which were deferred to - * prevent concurrent evaluation. - * - * @todo This is currently not used. However, it could simplify the - * logic for operators, such as SLICE, which otherwise depend on - * {@link #SHARED_STATE} to provide their own synchronization. + * This option may be used to place an optional limit on the #of + * concurrent tasks which may run for the same (bopId,shardId) for a + * given query (default {@value #DEFAULT_MAX_PARALLEL}). The query is + * guaranteed to make progress as long as this is some positive integer. + * While limiting this value can limit the concurrency with which + * certain operators are evaluated and that can have a negative effect + * on the throughput, it controls both the demand on the JVM heap and + * the #of threads consumed. + * <p> + * Note: {@link #MAX_PARALLEL} is the annotation for pipelined joins + * which has the strongest effect on performance. Changes to both + * {@link #MAX_MESSAGES_PER_TASK} and {@link #PIPELINE_QUEUE_CAPACITY} + * have less effect and performance tends to be best around a modest + * value (10) for those annotations. */ - String THREAD_SAFE = PipelineOp.class.getName() + ".threadSafe"; + String MAX_PARALLEL = PipelineOp.class.getName() + ".maxParallel"; - boolean DEFAULT_THREAD_SAFE = true; + /** + * @see #MAX_PARALLEL + */ + int DEFAULT_MAX_PARALLEL = 5; /** + * For a pipelined operator, this is the maximum number of messages that + * will be assigned to a single invocation of the evaluation task for + * that operator (default {@value #DEFAULT_MAX_MESSAGES_PER_TASK}). By + * default the {@link QueryEngine} MAY (and generally does) combine + * multiple {@link IChunkMessage}s from the work queue of an operator + * for each evaluation pass made for that operator. When ONE (1), each + * {@link IChunkMessage} will be assigned to a new evaluation task for + * the operator. The value of this annotation must be a positive + * integer. If the operator is not-pipelined, then the maximum amount of + * data to be assigned to an evaluation task is governed by + * {@link #MAX_MEMORY} instead. + */ + String MAX_MESSAGES_PER_TASK = PipelineOp.class.getName() + + ".maxMessagesPerTask"; + + /** + * @see #MAX_MESSAGES_PER_TASK + */ + int DEFAULT_MAX_MESSAGES_PER_TASK = 10; + + /** + * For pipelined operators, this is the capacity of the input queue for + * that operator. Producers will block if the input queue for the target + * operator is at its capacity. This provides an important limit on the + * amount of data which can be buffered on the JVM heap during pipelined + * query evaluation. + */ + String PIPELINE_QUEUE_CAPACITY = PipelineOp.class.getName() + + ".pipelineQueueCapacity"; + + /** + * @see #PIPELINE_QUEUE_CAPACITY + */ + int DEFAULT_PIPELINE_QUEUE_CAPACITY = 10; + + /** * Annotation used to mark pipelined (aka vectored) operators. When * <code>false</code> the operator will use either "at-once" or * "blocked" evaluation depending on how it buffers its data for @@ -126,6 +167,9 @@ */ String PIPELINED = PipelineOp.class.getName() + ".pipelined"; + /** + * @see #PIPELINED + */ boolean DEFAULT_PIPELINED = true; /** @@ -159,87 +203,11 @@ */ String MAX_MEMORY = PipelineOp.class.getName() + ".maxMemory"; + /** + * @see #MAX_MEMORY + */ int DEFAULT_MAX_MEMORY = 0; -// /** -// * Annotation used to mark a set of (non-optional) joins which may be -// * freely reordered by the query optimizer in order to minimize the -// * amount of work required to compute the solutions. -// * <p> -// * Note: Optional joins MAY NOT appear within a join graph. Optional -// * joins SHOULD be evaluated as part of the "tail plan" following the -// * join graph, but before operations such as SORT, DISTINCT, etc. When -// * the query plan includes {@link #CONDITIONAL_GROUP}s, those groups -// * include a leading {@link #JOIN_GRAPH} (required joins) followed by -// * zero or more optional joins. -// */ -// String JOIN_GRAPH = PipelineOp.class.getName() + ".joinGraph"; - -// /** -// * Annotation used to mark a set of operators belonging to a conditional -// * binding group. Bindings within with the group will be discarded if -// * any required operator in the group fails. For example, if a binding -// * set exits via the alternative sink for a required join then any -// * conditional bindings within the group will be discarded. -// * <p> -// * Together with {@link #ALT_SINK_GROUP}, the {@link #CONDITIONAL_GROUP} -// * annotation provides the information necessary in order to decide the -// * re-entry point in the query plan when a join within an conditional -// * binding group fails. -// * <p> -// * The {@link #CONDITIONAL_GROUP} annotation controls the -// * {@link IBindingSet#push()} and {@link IBindingSet#pop(boolean)} of -// * individual solutions as they propagate through the pipeline. When a -// * pipeline starts, the {@link IBindingSet} stack contains only the top -// * level symbol table (i.e., name/value bindings). When an intermediate -// * solution enters a {@link PipelineOp} marked as belonging to a -// * {@link #CONDITIONAL_GROUP}, a new symbol table is -// * {@link IBindingSet#push() pushed} onto the stack for that solution. -// * If the solution leaves the optional join group via the default sink, -// * then the symbol table is "saved" when it is -// * {@link IBindingSet#pop(boolean) popped} off of the stack. If the -// * solution leaves the join group via the alternative sink, then the -// * symbol table is discarded when it is {@link IBindingSet#pop(boolean) -// * popped} off of the stack. This provides for conditional binding of -// * variables within the operators of the group. -// * <p> -// * The value of the {@link #CONDITIONAL_GROUP} is an {@link Integer} -// * which uniquely identifies the group within the query. -// * -// * @deprecated The binding set stack push/pop mechanisms are not -// * sufficient to support optional join groups. This -// * annotation will be removed unless it proves valuable for -// * marking the elements of a join group, in which case the -// * javadoc needs to be updated. -// */ -// String CONDITIONAL_GROUP = PipelineOp.class.getName() + ".conditionalGroup"; - -// /** -// * Annotation used to designate the target when a required operator -// * within an {@link #CONDITIONAL_GROUP} fails. The value of this -// * annotation must be the {@link #CONDITIONAL_GROUP} identifier -// * corresponding to the next conditional binding group in the query -// * plan. If there is no such group, then the {@link #ALT_SINK_REF} -// * should be used instead to specify the target operator in the -// * pipeline, e.g., a {@link SliceOp}. -// * <p> -// * The target {@link #CONDITIONAL_GROUP} is specified (rather than the -// * bopId of the target join) since the non-optional joins in the target -// * {@link #CONDITIONAL_GROUP} be reordered by the query optimizer. The -// * entry point for solutions redirected to the {@link #ALT_SINK_GROUP} -// * is therefore the first operator in the target -// * {@link #CONDITIONAL_GROUP}. This decouples the routing decisions from -// * the join ordering decisions. -// * -// * @see #CONDITIONAL_GROUP -// * @see #ALT_SINK_REF -// * -// * @deprecated The binding set stack push/pop mechanisms are not -// * sufficient to support optional join groups. This -// * annotation will be removed. -// */ -// String ALT_SINK_GROUP = PipelineOp.class.getName() + ".altSinkGroup"; - } /** @@ -261,13 +229,19 @@ final Map<String, Object> annotations) { super(args, annotations); - + + if (getMaxParallel() < 1) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL + "=" + + getMaxParallel()); + + // @todo range check the rest of the annotations. + } /** * @see BufferAnnotations#CHUNK_CAPACITY */ - public int getChunkCapacity() { + final public int getChunkCapacity() { return getProperty(Annotations.CHUNK_CAPACITY, Annotations.DEFAULT_CHUNK_CAPACITY); @@ -277,7 +251,7 @@ /** * @see BufferAnnotations#CHUNK_OF_CHUNKS_CAPACITY */ - public int getChunkOfChunksCapacity() { + final public int getChunkOfChunksCapacity() { return getProperty(Annotations.CHUNK_OF_CHUNKS_CAPACITY, Annotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); @@ -287,7 +261,7 @@ /** * @see BufferAnnotations#CHUNK_TIMEOUT */ - public long getChunkTimeout() { + final public long getChunkTimeout() { return getProperty(Annotations.CHUNK_TIMEOUT, Annotations.DEFAULT_CHUNK_TIMEOUT); @@ -334,31 +308,49 @@ * @see Annotations#PIPELINED * @see Annotations#MAX_MEMORY */ - public boolean isPipelined() { - return getProperty(PipelineOp.Annotations.PIPELINED, + final public boolean isPipelined() { + + return getProperty(PipelineOp.Annotations.PIPELINED, PipelineOp.Annotations.DEFAULT_PIPELINED); + } +// /** +// * Return <code>true</code> iff concurrent invocations of the operator are +// * permitted. +// * <p> +// * Note: Operators which are not thread-safe still permit concurrent +// * evaluation for <em>distinct</em> partitions. In order to ensure that all +// * invocations of the operator within a query are serialized (no more than +// * one concurrent invocation) you must also specify +// * {@link BOpEvaluationContext#CONTROLLER}. +// * +// * @see Annotations#THREAD_SAFE +// * @see BOp.Annotations#EVALUATION_CONTEXT +// */ +// public boolean isThreadSafe() { +// +// return getProperty(Annotations.THREAD_SAFE, +// Annotations.DEFAULT_THREAD_SAFE); +// +// } + /** - * Return <code>true</code> iff concurrent invocations of the operator are - * permitted. - * <p> - * Note: Operators which are not thread-safe still permit concurrent - * evaluation for <em>distinct</em> partitions. In order to ensure that all - * invocations of the operator within a query are serialized (no more than - * one concurrent invocation) you must also specify - * {@link BOpEvaluationContext#CONTROLLER}. + * The maximum parallelism with which tasks may be evaluated for this + * operator (this is a per-shard limit in scale-out). A value of ONE (1) + * indicates that at most ONE (1) instance of this task may be executing in + * parallel for a given shard and may be used to indicate that the operator + * evaluation task is not thread-safe. * - * @see Annotations#THREAD_SAFE - * @see BOp.Annotations#EVALUATION_CONTEXT + * @see Annotations#MAX_PARALLEL */ - public boolean isThreadSafe() { + final public int getMaxParallel() { - return getProperty(Annotations.THREAD_SAFE, - Annotations.DEFAULT_THREAD_SAFE); - - } + return getProperty(PipelineOp.Annotations.MAX_PARALLEL, + PipelineOp.Annotations.DEFAULT_MAX_PARALLEL); + } + /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a @@ -366,7 +358,7 @@ * * @see Annotations#SHARED_STATE */ - public boolean isSharedState() { + final public boolean isSharedState() { return getProperty(Annotations.SHARED_STATE, Annotations.DEFAULT_SHARED_STATE); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -124,6 +124,12 @@ } + public boolean isWildcard() { + + return name.length() == 1 && name.charAt(0) == '*'; + + } + // public int compareTo(IVariableOrConstant arg0) { // // // order vars before ids Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -4,11 +4,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; +import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.ImmutableBOp; import com.bigdata.bop.NV; import com.bigdata.bop.Var; -import com.bigdata.bop.BOp.Annotations; /** * Abstract base class for aggregate functions. @@ -17,21 +17,100 @@ * * @param <E> */ -abstract public class AggregateBase<E> extends ImmutableBOp implements IAggregate<E> { +public class AggregateBase<E> extends ImmutableBOp implements IAggregate<E> { /** * */ private static final long serialVersionUID = 1L; + /** + * A type safe enumeration of well known aggregate functions. + */ + static public enum FunctionCode { + + /** + * The count of the #of computed value expressions within the solution + * group. In combination with the special keyword DISTINCT, this is the + * #of distinct values from the computed value expression within the + * solution group. When given with the special variable <code>*</code>, + * this is the count of the #of solutions (or distinct solutions if also + * combined with DISTINCT) within the group. + */ + COUNT(0), + + /** + * The sum of the computed value expressions within the solution group. + * In combination with the special keyword DISTINCT, this is the sum of + * the distinct values from the computed value expressions within the + * solution group. + */ + SUM(1), + + /** + * The average is defined as + * <code>AVG(expr) := SUM(expr)/COUNT(expr)</code>. Note that both SUM + * and COUNT can be hash partitioned over a cluster, so it often makes + * sense to rewrite AVG(expr) internally in terms of COUNT and SUM. This + * may be combined with DISTINCT. + */ + AVG(2), + + /** + * MIN(expr) is the minimum observed value for the computed value + * expressions according to the ordering semantics of + * <code>ORDER BY expr ASC</code>. This may be combined with DISTINCT. + */ + MIN(3), + + /** + * MAX(expr) is the maximum observed value for the computed value + * expressions according to the ordering semantics of + * <code>ORDER BY expr ASC</code>. This may be combined with DISTINCT. + */ + MAX(4), + + /** + * The combined values of the computed value expressions as a string. + * This may be combined with DISTINCT. + */ + GROUP_CONCAT(5), + + /** + * This evaluates to an arbitrary value of the computed value + * expressions. This may be combined with DISTINCT to sample from the + * distinct computed values. While the implementation is not required to + * choose randomly among the values to be sampled, random sampling may + * prove more useful to some applications. + */ + SAMPLE(6); + + private FunctionCode(int code) { + this.code = code; + } + + final private int code; + + public int getCode() { + return code; + } + + } + public interface Annotations extends ImmutableBOp.Annotations { /** + * The aggregate function identifier ({@link FunctionCode#COUNT}, + * {@link FunctionCode#SUM}, etc). + */ + String FUNCTION_CODE = AggregateBase.class.getName() + ".functionCode"; + + /** * Optional boolean property indicates whether the aggregate applies to * the distinct within group solutions (default * {@value #DEFAULT_DISTINCT}). */ - String DISTINCT = AggregateBase.class.getName()+".distinct"; + String DISTINCT = AggregateBase.class.getName() + ".distinct"; boolean DEFAULT_DISTINCT = false; @@ -41,36 +120,40 @@ super(op); } + /** + * Core shallow copy constructor. The <i>distinct</i> option is modeled + * using {@link Annotations#DISTINCT}. The <i>expr</i> is modeled as the + * first argument for the aggregate function. + * + * @param args + * @param annotations + */ public AggregateBase(BOp[] args, Map<String, Object> annotations) { super(args, annotations); - if (!isWildcardAllowed() && getExpression() == Var.var("*")) { - - /* - * Only COUNT may use the wildcard '*' variable. - */ - - throw new UnsupportedOperationException("'*' not permitted."); - - } - } /** - * + * @param functionCode + * The type safe value identifying the desired aggregate + * function. * @param distinct * <code>true</code> iff the keyword DISTINCT was used, for * example <code>COUNT(DISTINCT y)</code> * @param expr * The value expression to be computed, for example * <code>x</code> in <code>COUNT(DISTINCT x)</code> or - * <code>y+x</code> in <code>MIN(x+y)</code>. + * <code>y+x</code> in <code>MIN(x+y)</code>. Note that only + * COUNT may be used with the special variable <code>*</code>. */ - public AggregateBase(final boolean distinct, final IValueExpression<E> expr) { + public AggregateBase(final FunctionCode functionCode, + final boolean distinct, final IValueExpression<E> expr) { - this(new BOp[] { expr }, distinct ? NV.asMap(new NV( - Annotations.DISTINCT, true)) : null); + this(new BOp[] { expr }, NV.asMap(// + new NV(Annotations.FUNCTION_CODE, functionCode), // + new NV(Annotations.DISTINCT, distinct))// + ); } @@ -87,15 +170,33 @@ } + public boolean isWildcard() { + + return get(0).equals(Var.var("*")); + + } + /** - * Return <code>true</code> iff the {@link IValueExpression} may be the - * special variable <code>*</code>. The default implementation always - * returns <code>false</code>. + * Operation is not implemented by this class and must be overridden if the + * {@link AggregateBase} is to be directly evaluated. However, note that the + * computation of aggregate functions is often based on hard coded + * recognition of the appropriate function code. */ - public boolean isWildcardAllowed() { + public E get(IBindingSet bset) { + throw new UnsupportedOperationException(); + } - return false; - + public AggregateBase<E> setExpression(final IValueExpression<E> newExpr) { + + if (newExpr == null) + throw new IllegalArgumentException(); + + final AggregateBase<E> tmp = (AggregateBase<E>) this.clone(); + + tmp._set(0, newExpr); + + return tmp; + } - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -7,6 +7,14 @@ * An aggregate operator, such as SUM, COUNT, MIN, MAX, etc. * * @author thompsonbry + * + * @todo In order to assign nice labels to select expressions we need to know + * (or be able to generate) the original syntactic expression, e.g., + * <code>i+j<code> or <code>SUM(i*2)+j</code>. The textual value of these + * expressions will be used as if they were variable names. Since a + * subquery could be part of a SELECT expression, this means that we need + * to be able to do this for any SPARQL query construct. I do not believe + * that openrdf currently supports this. */ public interface IAggregate<E> extends IValueExpression<E>{ @@ -29,11 +37,20 @@ * </pre> */ boolean isDistinct(); + + /** + * Return <code>true</code> iff the {@link IValueExpression} is the special + * variable <code>*</code> (but note that this is only allowed for COUNT). + */ + boolean isWildcard(); /** * Return the {@link IValueExpression} to be computed by the aggregate. For - * <code>COUNT</code> this may be the special variable <code>*</code>, which - * is interpreted to mean all variables declared in the source solutions. + * example, is the aggregate function is <code>SUM(i+2)</code>, then this + * expression would be <code>i+2</code>. For <code>COUNT</code> this may be + * the special variable <code>*</code>, which is interpreted to mean all + * variables declared in the source solutions. The "DISTINCT" keyword is + * reported separately by {@link #isDistinct()}. */ IValueExpression<E> getExpression(); @@ -42,5 +59,16 @@ * internal state of the {@link IAggregate} operator). */ E get(IBindingSet bset); - + + /** + * Return a new {@link IAggregate} where the expression has been replaced by + * the given expression (copy-on-write). + * + * @param newExpr + * The new expression. + * + * @return The new {@link IAggregate}. + */ + IAggregate<E> setExpression(IValueExpression<E> newExpr); + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -349,7 +349,7 @@ continue; } - tmp.set(i, val.clone()); + tmp._set(i, val.clone()); // modified = true; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -59,13 +59,6 @@ * <p> * Note: This operator must execute on the query controller. * <p> - * The {@link PipelineOp.Annotations#SINK_REF} of each child operand should be - * overridden to specify the parent of the this operator. If you fail to do - * this, then the intermediate results of the subqueries will be routed to this - * operator. This may cause unnecessary network traffic when running against the - * {@link IBigdataFederation}. It may also cause the query to block if the - * buffer capacity is limited. - * <p> * If you want to route intermediate results from other computations into * subqueries, then consider a {@link Tee} pattern instead. * <p> @@ -73,14 +66,12 @@ * * <pre> * SLICE[1]( - * UNION[2]([...],{subqueries=[a{sinkRef=1},b{sinkRef=1},c{sinkRef=1}]}) + * UNION[2]([...],{subqueries=[a,b,c]}) * ) * </pre> * * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be run once for each source {@link IBindingSet}. The output of - * those subqueries is explicitly routed to the SLICE operator using - * {@link PipelineOp.Annotations#SINK_REF} for efficiency in scale-out. + * subquery will be run once for each source {@link IBindingSet}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -109,19 +100,19 @@ * The maximum parallelism with which the subqueries will be evaluated * (default is unlimited). */ - String MAX_PARALLEL = AbstractSubqueryOp.class.getName() - + ".maxParallel"; + String MAX_PARALLEL_SUBQUERIES = AbstractSubqueryOp.class.getName() + + ".maxParallelSubqueries"; - int DEFAULT_MAX_PARALLEL = Integer.MAX_VALUE; + int DEFAULT_MAX_PARALLEL_SUBQUERIES = Integer.MAX_VALUE; } /** - * @see Annotations#MAX_PARALLEL + * @see Annotations#MAX_PARALLEL_SUBQUERIES */ - public int getMaxParallel() { - return getProperty(Annotations.MAX_PARALLEL, - Annotations.DEFAULT_MAX_PARALLEL); + public int getMaxParallelSubqueries() { + return getProperty(Annotations.MAX_PARALLEL_SUBQUERIES, + Annotations.DEFAULT_MAX_PARALLEL_SUBQUERIES); } /** @@ -207,8 +198,8 @@ this.subqueries = (BOp[]) controllerOp .getRequiredProperty(Annotations.SUBQUERIES); - this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, - Annotations.DEFAULT_MAX_PARALLEL); + this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL_SUBQUERIES, + Annotations.DEFAULT_MAX_PARALLEL_SUBQUERIES); this.executor = new LatchedExecutor(context.getIndexManager() .getExecutorService(), nparallel); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -1080,8 +1080,10 @@ new NV(BOp.Annotations.BOP_ID, joinId),// // @todo Why not use a factory which avoids bopIds already in use? new NV(PipelineJoin.Annotations.PREDICATE, vTarget.pred.setBOpId(3)), - // disallow parallel evaluation. - new NV(PipelineJoin.Annotations.MAX_PARALLEL,0), + // disallow parallel evaluation of tasks. + new NV(PipelineJoin.Annotations.MAX_PARALLEL,1), + // disallow parallel evaluation of chunks. + new NV(PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS,0), // disable access path coalescing new NV(PipelineJoin.Annotations.COALESCE_DUPLICATE_ACCESS_PATHS,false), // cutoff join. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/PartitionedJoinGroup.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/PartitionedJoinGroup.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/PartitionedJoinGroup.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -713,7 +713,9 @@ .asMap(new NV[] { new NV(JoinGraph.Annotations.BOP_ID, idFactory.nextId()), // new NV(JoinGraph.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER) }) // + BOpEvaluationContext.CONTROLLER),// + new NV(PipelineOp.Annotations.SHARED_STATE,true),// + }) // ); return queryOp; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -74,8 +74,8 @@ super(args, annotations); - if (getMaxParallel() != 1) - throw new IllegalArgumentException(Annotations.MAX_PARALLEL + "=" + if (getMaxParallelSubqueries() != 1) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL_SUBQUERIES + "=" + getMaxParallel()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -42,8 +42,7 @@ * * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel for each * source {@link IBindingSet}. The output of those subqueries will be routed to - * the UNION operator (their parent) unless the subqueries explicitly override - * this behavior using {@link PipelineOp.Annotations#SINK_REF}. + * the UNION operator (their parent). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -405,6 +405,33 @@ } + /** + * Return the {@link BOp} having the specified id. + * + * @param bopId + * The {@link BOp} identifier. + * + * @return The {@link BOp}. + * + * @throws IllegalArgumentException + * if there is no {@link BOp} with that identifier declared in + * this query. + */ + final public BOp getBOp(final int bopId) { + + final BOp bop = getBOpIndex().get(bopId); + + if (bop == null) { + + throw new IllegalArgumentException("Not found: id=" + bopId + + ", query=" + query); + + } + + return bop; + + } + /** * @param queryEngine * The {@link QueryEngine} on which the query is running. In @@ -620,6 +647,9 @@ try { + if(log.isInfoEnabled())//FIXME TRACE + log.info(msg.toString()); + if (runState.startOp(msg)) { /* @@ -673,10 +703,13 @@ if (!queryId.equals(msg.queryId)) throw new IllegalArgumentException(); - lock.lock(); + lock.lock(); try { + if(log.isInfoEnabled())//FIXME TRACE + log.info(msg.toString()); + // update per-operator statistics. final BOpStats tmp = statsMap.putIfAbsent(msg.bopId, msg.taskStats); @@ -1129,6 +1162,21 @@ } + /** + * Return the textual representation of the {@link RunState} of this query. + * <p> + * Note: Exposed for log messages in derived classes since {@link #runState} + * is private. + */ + protected String runStateString() { + lock.lock(); + try { + return runState.toString(); + } finally { + lock.unlock(); + } + } + public String toString() { final StringBuilder sb = new StringBuilder(getClass().getName()); sb.append("{queryId=" + queryId); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-17 13:38:48 UTC (rev 4206) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-17 22:58:07 UTC (rev 4207) @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +44,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; @@ -57,6 +57,7 @@ import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IMultiSourceAsynchronousIterator; import com.bigdata.relation.accesspath.MultiSourceSequentialAsynchronousIterator; +import com.bigdata.rwstore.sector.IMemoryManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.Memoizer; @@ -72,13 +73,17 @@ * distribution of the shards. This evaluation strategy is compatible with both * the {@link Journal} (aka standalone) and the {@link IBigdataFederation} (aka * clustered or scale-out). + * <p> + * Note: The challenge with this implementation is managing the amount of data + * buffered on the JVM heap without introducing control structures which can + * result in deadlock or starvation. This has been addressed to a large extent + * by sharing a lock between this class and the per-operator input work queues + * using modified version of the JSR 166 classes. For high volume operator at + * once evaluation, we need to buffer the data on the native process heap using + * the {@link IMemoryManager}. * - * @todo The challenge with this implementation is managing the amount of data - * buffered on the JVM heap without introducing control structures which - * can result in deadlock or starvation. One way to manage this is to move - * the data off of the JVM heap onto direct ByteBuffers and then - * potentially spilling blocks to disk, e.g., using an RWStore based cache - * pattern. + * @todo {@link IMemoryManager} integration and support + * {@link PipelineOp.Annotations#MAX_MEMORY}. */ public class ChunkedRunningQuery extends AbstractRunningQuery { @@ -90,96 +95,6 @@ */ private final static Logger chunkTaskLog = Logger .getLogger(ChunkTask.class); - -// /** -// * The maximum number of operator tasks which may be concurrently executed -// * for a given (bopId,shardId). -// * -// * @see QueryEngineTestAnnotations#MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD -// */ -// final private int maxConcurrentTasksPerOperatorAndShard; - -// /** -// * The maximum #of concurrent tasks for this query across all operators and -// * shards. -// * -// * Note: This is not a safe option and MUST be removed. It is possible for -// * N-1 tasks to backup with the Nth task not running due to concurrent -// * execution of some of the N-t tasks. -// */ -// final private int maxConcurrentTasks = 10; - - /* - * FIXME Explore the use of this semaphore to limit the maximum #of messages - * further. (Note that placing a limit on messages would allow us to buffer - * potentially many chunks. That could be solved by making LocalChunkMessage - * transparent in terms of the #of chunks or _binding_sets_ which it is - * carrying, but let's take this one step at a time). - * - * The first issue is ensuring that the query continue to make progress when - * a semaphore with a limited #of permits is introduced. This is because the - * ChunkFutureTask only attempts to schedule the next task for a given - * (bopId,shardId) but we could have failed to accept outstanding work for - * any of a number of operator/shard combinations. Likewise, the QueryEngine - * tells the RunningQuery to schedule work each time a message is dropped - * onto the QueryEngine, but the signal to execute more work is lost if the - * permits were not available immediately. - * - * One possibility would be to have a delayed retry. Another would be to - * have ChunkTaskFuture try to run *any* messages, not just messages for the - * same (bopId,shardId). - * - * Also, when scheduling work, there needs to be some bias towards the - * downstream operators in the query plan in order to ensure that they get a - * chance to clear work from upstream operators. This suggests that we might - * carry an order[] and use it to scan the work queue -- or make the work - * queue a priority heap using the order[] to place a primary sort over the - * bopIds in terms of the evaluation order and letting the shardIds fall in - * increasing shard order so we have a total order for the priority heap (a - * total order may also require a tie breaker, but I think that the priority - * heap allows ties). - * - * This concept of memory overhead and permits would be associated with the - * workload waiting on a given node for processing. (In scale-out, we do not - * care how much data is moving in the cluster, only how much data is - * challenging an individual machine). - * - * This emphasize again why we need to get the data off of the Java heap. - * - * The same concept should apply for chained buffers. Maybe one way to do - * this is to allocate a fixed budget to each query for the Java heap and - * the C heap and then the query blocks or goes to disk. - */ -// /** -// * The maximum number of binding sets which may be outstanding before a task -// * which is producing binding sets will block. This value may be used to -// * limit the memory demand of a query in which some operators produce -// * binding sets faster than other operators can consume them. -// * -// * @todo This could be generalized to consider the Java heap separately from -// * the native heap as we get into the use of native ByteBuffers to -// * buffer intermediate results. -// * -// * @todo This is expressed in terms of messages and not {@link IBindingSet}s -// * because the {@link LocalChunkMessage} does not self-report the #of -// * {@link IBindingSet}s (or chunks). [It should really be bytes on the -// * heap even if we can count binding sets and #s of bindings, but we -// * do not serialize all binding sets so we have to have one measure -// * for serialized and one measure for live objects.] -// */ -// final private int maxOutstandingMessageCount = 100; -// -// /** -// * A counting semaphore used to limit the #of outstanding binding set chunks -// * which may be buffered before a producer will block when trying to emit -// * another chunk. -// * -// * @see HandleChunkBuffer#outputChunk(IBindingSet[]) -// * @see #scheduleNext(BSBundle) -// * -// * @see #maxOutstandingMessageCount -// */ -// final private Semaphore outstandingMessageSemaphore = new Semaphore(maxOutstandingMessageCount); /** * A collection of (bopId,partitionId) keys mapped onto a collection of @@ -188,58 +103,39 @@ */ private final ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>> operatorFutures; - /** - * A map of unbounded work queues for each (bopId,partitionId). Empty queues - * are removed from the map. - * <p> - * The map is guarded by the {@link #lock}. - */ + /** + * A map of unbounded work queues for each (bopId,partitionId). Empty queues + * are removed from the map. + * <p> + * The map is guarded by the {@link #lock}. + * + * FIXME Either this and/or {@link #operatorFutures} must be a weak value + * map in order to ensure that entries are eventually cleared in scale-out + * where the #of entries can potentially be very large since they are per + * (bopId,shardId). While these maps were initially declared as + * {@link ConcurrentHashMap} instances, if we remove entries once the + * map/queue entry is empty, this appears to open a concurrency hole which + * does not exist if we leave entries with empty map/queue values in the + * map. Changing to a weak value map should provide the necessary pruning of + * unused entries without opening up this concurrency hole. + */ private final Map<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> operatorQueues; - -// /** -// * When running in stand alone, we can chain together the operators and have -// * much higher throughput. Each operator has an {@link BlockingBuffer} which -// * is essentially its input queue. The operator will drain its input queue -// * using {@link BlockingBuffer#iterator()}. -// * <p> -// * Each operator closes its {@link IBlockingBuffer} sink(s) once its own -// * source has been closed and it has finished processing that source. Since -// * multiple producers can target the same operator, we need a means to -// * ensure that the source for the target operator is not closed until each -// * producer which targets that operator has closed its corresponding sink. -// * <p> -// * In order to support this many-to-one producer/consumer pattern, we wrap -// * the input queue (a {@link BlockingBuffer}) for each operator having -// * multiple sources with a {@link MultiplexBlockingBuffer}. This class gives -// * each producer their own view on the underlying {@link BlockingBuffer}. -// * The underlying {@link BlockingBuffer} will not be closed until all -// * source(s) have closed their view of that buffer. This collection keeps -// * track of the {@link MultiplexBlockingBuffer} wrapping the -// * {@link BlockingBuffer} which is the input queue for each operator. -// * <p> -// * The input queues themselves are {@link BlockingBuffer} objects. Those -// * objects are available from this map using -// * {@link MultiplexBlockingBuffer#getBackingBuffer()}. These buffers are -// * pre-allocated by {@link #populateInputBufferMap(BOp)}. -// * {@link #startTasks(BOp)} is responsible for starting the operator tasks -// * in a "back-to-front" order. {@link #startQuery(IChunkMessage)} kicks off -// * the query and invokes {@link #startTasks(BOp)} to chain the input queues -// * and output queues together (when so chained, the output queues are skins -// * over the input queues obtained from {@link MultiplexBlockingBuffer}). -// * -// * FIXME The inputBufferMap will let us construct consumer producer chains -// * where the consumer _waits_ for all producer(s) which target the consumer -// * to close the sink associated with that consumer. Unlike when attaching an -// * {@link IChunkMessage} to an already running operator, the consumer will -// * NOT terminate (due to lack up input) until each running producer -// * terminating that consumer terminates. This will improve concurrency, -// * result in fewer task instances, and have better throughput than attaching -// * a chunk to an already running task. However, in scale-out we will have -// * tasks running on different nodes so we can not always chain together the -// * producer and consumer in this tightly integrated manner. -// */ -// final private ConcurrentHashMap<Integer/*operator*/, MultiplexBlockingBuffer<IBindingSet[]>/*inputQueue*/> inputBufferMap; + /** + * FIXME It appears that this is Ok based on a single unit test known to + * fail when {@link #removeMapOperatorQueueEntries} is <code>true</code>, + * but I expect that a similar concurrency problem could also exist for the + * {@link #operatorFutures} even through it does not produce a deadlock. + */ + static private final boolean removeMapOperatorFutureEntries = false; + + /** + * FIXME See operatorQueues for why removing the map entries appears to + * cause problems. This is problem is demonstrated by + * TestQueryEngine#test_query_slice_noLimit() when + * {@link PipelineOp.Annotations#PIPELINE_QUEUE_CAPACITY} is ONE (1). + */ + static private final boolean removeMapOperatorQueueEntries = false; // /** // * The chunks available for immediate processing (they must have been @@ -285,286 +181,12 @@ super(queryEngine, queryId, controller, clientProxy, query); -//// combineReceivedChunks = query.getProperty( -//// QueryEngineTestAnnotations.COMBINE_RECEIVED_CHUNKS, -//// QueryEngineTestAnnotations.DEFAULT_COMBINE_RECEIVED_CHUNKS); - -// this.maxConcurrentTasksPerOperatorAndShard = 300; -// this.maxConcurrentTasksPerOperatorAndShard = query -// .getProperty( -// QueryEngineTestAnnotations.MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD, -// QueryEngineTestAnnotations.DEFAULT_MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD); - this.operatorFutures = new ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>>(); this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); -// /* -// * Setup the BOpStats object for each pipeline operator in the query. -// */ -// if (controller) { -// -//// runState = new RunState(this); -// -//// statsMap = new ConcurrentHashMap<Integer, BOpStats>(); -//// -//// populateStatsMap(query); -// -//// /* -//// * FIXME Review the concept of mutation queries. It used to be that -//// * queries could only either read or write. Now we have access paths -//// * which either read or write and each query could use zero or more -//// * such access paths. -//// */ -//// if (true/*!query.isMutation()*/) { -//// -//// // read-only query. -//// -//// final BOpStats queryStats = statsMap.get(query.getId()); -// -//// queryBuffer = new BlockingBufferWithStats<IBindingSet[]>(query, -//// queryStats); -//// -//// queryIterator = new QueryResultIterator<IBindingSet[]>(this, -//// queryBuffer.iterator()); -// -//// } else { -//// -//// // Note: Not used for mutation queries. -//// queryBuffer = null; -//// queryIterator = null; -// -// } -// -// } else { -// -//// runState = null; // Note: only on the query controller. -//// statsMap = null; // Note: only on the query controller. -//// queryBuffer = null; // Note: only on the query controller. -//// queryIterator = null; // Note: only when queryBuffer is defined. -// -// } - } -// /** -// * Take a chunk generated by some pass over an operator and make it -// * available to the target operator. How this is done depends on whether the -// * query is running against a standalone database or the scale-out database. -// * <p> -// * Note: The return value is used as part of the termination criteria for -// * the query. -// * <p> -// * The default implementation supports a standalone database. The generated -// * chunk is left on the Java heap and handed off synchronously using -// * {@link QueryEngine#acceptChunk(IChunkMessage)}. That method will queue -// * the chunk for asynchronous processing. -// * -// * @param bop -// * The operator which wrote on the sink. -// * @param sinkId -// * The identifier of the target operator. -// * @param sink -// * The intermediate results to be passed to that target operator. -// * -// * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) -// * for scale-up. For scale-out, there will be at least one -// * {@link IChunkMessage} per index partition over which the -// * intermediate results were mapped. -// */ -// protected <E> int handleOutputChunk(final BOp bop, final int sinkId, -// final IBlockingBuffer<IBindingSet[]> sink) { -// -// if (bop == null) -// throw new IllegalArgumentException(); -// -// if (sink == null) -// throw new IllegalArgumentException(); -// -// if (inputBufferMap != null && inputBufferMap.get(sinkId) != null) { -// /* -// * FIXME The sink is just a wrapper for t... [truncated message content] |