From: <tho...@us...> - 2011-02-09 17:00:10
|
Revision: 4185 http://bigdata.svn.sourceforge.net/bigdata/?rev=4185&view=rev Author: thompsonbry Date: 2011-02-09 17:00:01 +0000 (Wed, 09 Feb 2011) Log Message: ----------- More work on the GROUP_BY operator. Defined various aggregate operators (MIN, MAX, SUM, COUNT, etc). They all need unit tests. The semantics of many of these operators needs to be reviewed. Defined BIND(var,expr) operator, which binds the variable to the result of evaluating the value expression as side-effect. Modified the ORDER_BY stress test to verify the ordering imposed. Modified DistinctBindingSetOp to pass the hash map as shared state (it was only distinct for each invocation). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestAll.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestCOUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestGROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestSAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestSUM.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -27,11 +27,15 @@ */ package com.bigdata.bop; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -59,6 +63,39 @@ private final IBlockingBuffer<E[]> sink2; + private final AtomicBoolean lastInvocation = new AtomicBoolean(false); + + /** + * Set by the {@link QueryEngine} when the criteria specified by + * {@link #isLastInvocation()} are satisfied. + */ + public void setLastInvocation() { + lastInvocation.set(true); + } + + /** + * <code>true</code> iff this is the last invocation of the operator. The + * property is only set to <code>true</code> for operators which: + * <ol> + * <li>{@link BOp.Annotations#EVALUATION_CONTEXT} is + * {@link BOpEvaluationContext#CONTROLLER}</li> + * <li>{@link PipelineOp.Annotations#THREAD_SAFE} is <code>false</code></li> + * </ol> + * Under these circumstances, it is possible for the {@link IQueryClient} to + * atomically decide that a specific invocation of the operator task for the + * query will be the last invocation for that task. This is not possible if + * the operator allows concurrent evaluation tasks. Sharded operators are + * intrinsically concurrent since they can evaluate at each shard in + * parallel. This is why the evaluation context is locked to the query + * controller. In addition, the operator must declare that it is NOT thread + * safe in order for the query engine to serialize its evaluation tasks. + * + * @return + */ + public boolean isLastInvocation() { + return lastInvocation.get(); + } + /** * The interface for a running query. * <p> Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,66 @@ +package com.bigdata.bop; + +import java.util.Map; + +/** + * Operator causes a variable to be bound to the result of its evaluation as a + * side-effect. + * + * @author thompsonbry + */ +public class Bind<E> extends ImmutableBOp implements IValueExpression<E> { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Required deep copy constructor. + */ + public Bind(BOpBase op) { + super(op); + } + + /** + * @param var + * The {@link IVariable} which will be bound to result of + * evaluating the associated value expression. + * @param expr + * The {@link IValueExpression} to be evaluated. + */ + public Bind(IVariable<E> var, IValueExpression<E> expr) { + + this(new BOp[] { var, expr }, null/* annotations */); + + } + + /** + * Required shallow copy constructor. + * @param args + * @param annotations + */ + public Bind(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + @SuppressWarnings("unchecked") + @Override + public E get(final IBindingSet bindingSet) { + + final IVariable<E> var = (IVariable<E>) get(0); + + final IValueExpression<E> expr = (IValueExpression<E>) get(1); + + // evaluate the value expression. + E val = expr.get(bindingSet); + + // bind the variable as a side-effect. + bindingSet.set(var, new Constant<E>(val)); + + // return the evaluated value + return val; + + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,16 @@ +package com.bigdata.bop; + +/** + * An aggregate operator, such as SUM, COUNT, MIN, MAX, etc. + * + * @author thompsonbry + */ +public interface IAggregate<E> extends IValueExpression<E>{ + + /** + * Return the current value of the aggregate (this has a side-effect on the + * internal state of the {@link IAggregate} operator). + */ + E get(IBindingSet bset); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -2,6 +2,11 @@ import java.io.Serializable; +/** + * An expression which may be evaluated to a value. + * + * @author mrpersonick + */ public interface IValueExpression<E> extends BOp, Serializable { /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -109,6 +109,10 @@ * @todo Unit tests for {@link ChunkedRunningQuery} to verify that it * eventually schedules operator tasks which were deferred to * prevent concurrent evaluation. + * + * @todo This is currently not used. However, it could simplify the + * logic for operators, such as SLICE, which otherwise depend on + * {@link #SHARED_STATE} to provide their own synchronization. */ String THREAD_SAFE = PipelineOp.class.getName() + ".threadSafe"; @@ -334,7 +338,27 @@ return getProperty(PipelineOp.Annotations.PIPELINED, PipelineOp.Annotations.DEFAULT_PIPELINED); } - + + /** + * Return <code>true</code> iff concurrent invocations of the operator are + * permitted. + * <p> + * Note: Operators which are not thread-safe still permit concurrent + * evaluation for <em>distinct</em> partitions. In order to ensure that all + * invocations of the operator within a query are serialized (no more than + * one concurrent invocation) you must also specify + * {@link BOpEvaluationContext#CONTROLLER}. + * + * @see Annotations#THREAD_SAFE + * @see BOp.Annotations#EVALUATION_CONTEXT + */ + public boolean isThreadSafe() { + + return getProperty(Annotations.THREAD_SAFE, + Annotations.DEFAULT_THREAD_SAFE); + + } + /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -14,13 +14,17 @@ import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.bop.engine.BOpStats; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; /** * A pipelined DISTINCT operator based on a hash table. + * <p> + * Note: This implementation is a pipelined operator which inspects each chunk + * of solutions as they arrive and those solutions which are distinct for each + * chunk processed. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z @@ -58,8 +62,23 @@ public DistinctBindingSetOp(final BOp[] args, final Map<String, Object> annotations) { - super(args, annotations); + super(args, annotations); + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + + // shared state is used to share the hash table. + if (isSharedState()) { + throw new UnsupportedOperationException(Annotations.SHARED_STATE + + "=" + isSharedState()); + } + } /** @@ -101,6 +120,12 @@ } + public BOpStats newStats() { + + return new DistinctStats(this); + + } + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { return new FutureTask<Void>(new DistinctTask(this, context)); @@ -145,6 +170,37 @@ return true; } } + + /** + * Extends {@link BOpStats} to provide the shared state for the distinct + * solution groups across multiple invocations of the DISTINCT operator. + */ + private static class DistinctStats extends BOpStats { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private final ConcurrentHashMap<Solution, Solution> map; + + public DistinctStats(final DistinctBindingSetOp op) { + + this.map = new ConcurrentHashMap<Solution, Solution>( + op.getInitialCapacity(), op.getLoadFactor(), + op.getConcurrencyLevel()); + + } + + } /** * Task executing on the node. @@ -153,12 +209,12 @@ private final BOpContext<IBindingSet> context; - /** - * A concurrent map whose keys are the bindings on the specified - * variables (the keys and the values are the same since the map - * implementation does not allow <code>null</code> values). - */ - private /*final*/ ConcurrentHashMap<Solution, Solution> map; + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + */ + private final ConcurrentHashMap<Solution, Solution> map; /** * The variables used to impose a distinct constraint. @@ -178,9 +234,8 @@ if (vars.length == 0) throw new IllegalArgumentException(); - this.map = new ConcurrentHashMap<Solution, Solution>( - op.getInitialCapacity(), op.getLoadFactor(), - op.getConcurrencyLevel()); + // The map is shared state across invocations of this operator task. + this.map = ((DistinctStats) context.getStats()).map; } @@ -235,6 +290,7 @@ stats.chunksIn.increment(); stats.unitsIn.add(a.length); + // The distinct solutions accepted from this chunk. final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); int naccepted = 0; @@ -243,14 +299,26 @@ // System.err.println("considering: " + bset); + /* + * Test to see if this solution is distinct from those + * already seen. + */ final IConstant<?>[] vals = accept(bset); if (vals != null) { + /* + * This is a distinct solution. Copy only the + * variables used to select distinct solutions into + * a new binding set and add that to the set of + * [accepted] binding sets which will be emitted by + * this operator. + */ + // System.err.println("accepted: " // + Arrays.toString(vals)); - final HashBindingSet tmp = new HashBindingSet(); + final ListBindingSet tmp = new ListBindingSet(); for (int i = 0; i < vars.length; i++) { @@ -268,12 +336,19 @@ if (naccepted > 0) { + /* + * At least one solution was accepted as distinct, so + * copy the selected solutions to the output of the + * operator. + */ + final IBindingSet[] b = accepted .toArray(new IBindingSet[naccepted]); // System.err.println("output: " // + Arrays.toString(b)); + // copy the distinct solutions to the output. sink.add(b); // stats.unitsOut.add(naccepted); @@ -285,6 +360,23 @@ sink.flush(); + if(context.isLastInvocation()) { + + /* + * Discard the map. + * + * Note: The map can not be discarded (or cleared) until the + * last invocation. However, we only get the benefit of the + * lastInvocation signal if the operator is single threaded + * and running on the query controller. That is not a + * requirement for this DISTINCT implementation, so the map + * is not going to be cleared until the query goes out of + * scope and is swept by GC. + */ + map.clear(); + + } + // done. return null; @@ -292,9 +384,6 @@ sink.close(); - // discard the map. - map = null; - } } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,111 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 4, 2010 + */ + +package com.bigdata.bop.solutions; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.PipelineOp; + +/** + * Base class for operators which perform aggregation operations on binding + * sets. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: SortOp.java 3665 2010-09-28 16:53:22Z thompsonbry $ + */ +abstract public class GroupByOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The ordered set of variables declared by {@link #COMPUTE} which are + * projected out of the group by operator. + */ + String SELECT = GroupByOp.class.getName() + ".select"; + + /** + * The ordered set of {@link IValueExpression}s which are to be + * computed. + * + * TODO This really needs to be VAR := EXPR. EXPR can only reference the + * source variables or variables declared earlier in the ordered + * collection. If an EXPR references a source variable, then it must + * wrap that source variable with an aggregation operator (SUM, COUNT, + * MIN, MAX, AVG, GROUP_CONCAT, or SAMPLE). Only source variables and + * constants may appear as operands of aggregation operators. [We need a + * BIND() operator for this, which might wind up being the same as a + * LET.] + * + * TODO Decide how we will handle AVG. + */ + String COMPUTE = GroupByOp.class.getName() + ".compute"; + + /** + * The ordered set of or one or more variables defining the aggregation + * groups (required). The variables named in this collection MUST be + * variables declared for the incoming solutions. + */ + String GROUP_BY = GroupByOp.class.getName() + ".groupBy"; + + /** + * An {@link IConstraint}[] applied to the aggregated solutions + * (optional). The {@link IConstraint}s MAY NOT include aggregation + * operators and may only reference variables declared by + * {@link #COMPUTE}. + * + * TODO Should be the BEV of an {@link IValueExpression}, which might or + * might not be an {@link IConstraint}. + */ + String HAVING = GroupByOp.class.getName() + ".having"; + + } + + /** + * @param op + */ + public GroupByOp(final GroupByOp op) { + super(op); + } + + /** + * @param args + * @param annotations + */ + public GroupByOp(final BOp[] args, final Map<String, Object> annotations) { + super(args, annotations); + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,551 @@ +package com.bigdata.bop.solutions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.ConcurrentHashMapAnnotations; +import com.bigdata.bop.IAggregate; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * An in-memory GROUP_BY for binding sets. + * <p> + * Note: This implementation is a pipelined operator which aggregates each chunk + * of solutions as they arrive and outputs empty messages (containing no + * solutions) until the last chunk is consumed. This operator relies on + * {@link BOpContext#isLastInvocation()} in order to decide when to write its + * output solutions, which requires the operator to (a) be evaluated on the + * controller and (b) declare itself as NOT thread-safe. In addition, the + * operator must be marked as SHARED_STATE := true such that the hash table + * associated with the {@link BOpStats} is shared across multiple invocations of + * this operator for a given query. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + * + * @todo GROUP_BY implementation which depends on an ORDER_BY operator to setup + * the correct order and then performs the aggregations in a single pass + * over the ordered data. + * + * @todo GROUP_BY implementation using an HTree suitable for use when the #of + * groups is very large. The HTree would be associated with the allocation + * context for the (queryId,bopId(,shardId))). (The shardId would be used + * iff the GROUP_BY operator was hash partitioned across the nodes.) + * + * @todo In scale-out, we can hash partition the GROUP_BY operator over the + * nodes as long as all of the aggregation functions can be combined from + * the partitions. If AVG is used, then it needs to be replaced by SUM and + * COUNT in the GROUP_BY operator and the use of the AVG in the SELECT + * needs to be rewritten as (SUM(v)/COUNT(v)). + * + * @todo As a special twist, there can also be memory burdens, even with a small + * #of groups, when the aggregated solution data is very large and a + * GROUP_CONCAT function is specified such that it combines a large #of + * input solution bindings into a big string. + * + * FIXME How should we handle DISTINCT semantics for GROUP_BY? (I think + * that we just insert a {@link DistinctBindingSetOp} before the + * GROUP_BY). + * + * FIXME How should we handle nulls (missing values) during aggregation? + * (It appears that nulls and type errors are generally handled by the + * aggregate operator ignoring the detail record). + * + * FIXME All of the {@link IAggregate} operators have a side-effect. In + * order for them to have isolated side-effects for distinct groups, they + * would have to either internalize a value map for the group or each + * group would have to use a distinct instance. If the latter, then + * provide for this on the operator, e.g., newInstance(), and document + * why. + */ +public class MemoryGroupByOp extends GroupByOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final transient Logger log = Logger + .getLogger(MemoryGroupByOp.class); + + public interface Annotations extends GroupByOp.Annotations, + ConcurrentHashMapAnnotations { + + } + + /** + * Required deep copy constructor. + */ + public MemoryGroupByOp(final MemoryGroupByOp op) { + super(op); + } + + /** + * Required shallow copy constructor. + */ + public MemoryGroupByOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + + // shared state is used to share the hash table. + if (isSharedState()) { + throw new UnsupportedOperationException(Annotations.SHARED_STATE + + "=" + isSharedState()); + } + + // single threaded required for pipelining w/ isLastInvocation() hook. + if (isThreadSafe()) { + throw new UnsupportedOperationException(Annotations.THREAD_SAFE + + "=" + isThreadSafe()); + } + + // operator is pipelined, but relies on isLastEvaluation() hook. + if (!isPipelined()) { + throw new UnsupportedOperationException(Annotations.PIPELINED + "=" + + isPipelined()); + } + + } + + /** + * @see Annotations#INITIAL_CAPACITY + */ + public int getInitialCapacity() { + + return getProperty(Annotations.INITIAL_CAPACITY, + Annotations.DEFAULT_INITIAL_CAPACITY); + + } + + /** + * @see Annotations#LOAD_FACTOR + */ + public float getLoadFactor() { + + return getProperty(Annotations.LOAD_FACTOR, + Annotations.DEFAULT_LOAD_FACTOR); + + } + + /** + * @see Annotations#CONCURRENCY_LEVEL + */ + public int getConcurrencyLevel() { + + return getProperty(Annotations.CONCURRENCY_LEVEL, + Annotations.DEFAULT_CONCURRENCY_LEVEL); + + } + + public BOpStats newStats() { + + return new GroupByStats(this); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new GroupByTask(this, context)); + + } + + /** + * Wrapper used for the solution groups in the {@link ConcurrentHashMap}. + */ + private static class SolutionGroup { + + /** The precomputed hash code for {@link #vals}. */ + private final int hash; + + /** The values for the groupBy variables which define a distinct group. */ + private final IConstant<?>[] vals; + + /** + * The values for the variables which are being computed by the + * aggregation. The binding set is when the {@link SolutionGroup} is + * first constructed. + * <p> + * Note: Updates to this binding set MUST be protected by synchronizing + * on {@link SolutionGroup}. + */ + private final IBindingSet aggregatedBSet; + + public String toString() { + return super.toString() + // + "{group=" + Arrays.toString(vals) + // + ",solution=" + aggregatedBSet + // + "}"; + } + + public SolutionGroup(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + this.aggregatedBSet = new ListBindingSet(); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof SolutionGroup)) { + return false; + } + final SolutionGroup t = (SolutionGroup) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo verify that this allows for nulls with a unit test. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + + /** + * Apply the {@link IValueExpression}s to compute the updated variable + * bindings in the {@link SolutionGroup}. + * + * @param bset + * An input solution. + * @param compute + * The ordered array of {@link IValueExpression}s which + * define the aggregated variables. + */ + public void aggregate(final IBindingSet bset, + final IValueExpression<?>[] compute) { + + /* + * @todo The aggregated variables are all undefined the first time a + * source binding set is presented and need to be initialized to an + * appropriate value. + */ + + // synchronize for visibility. + synchronized(this) { + } + + throw new UnsupportedOperationException(); + + } + + } // SolutionGroup + + /** + * Extends {@link BOpStats} to provide the shared state for the solution + * groups across multiple invocations of the GROUP_BY operator. + */ + private static class GroupByStats extends BOpStats { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private /*final*/ ConcurrentHashMap<SolutionGroup, SolutionGroup> map; + + public GroupByStats(final MemoryGroupByOp op) { + + this.map = new ConcurrentHashMap<SolutionGroup, SolutionGroup>( + op.getInitialCapacity(), op.getLoadFactor(), + op.getConcurrencyLevel()); + + } + + } + + /** + * Task executing on the node. + */ + static private class GroupByTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private final ConcurrentHashMap<SolutionGroup, SolutionGroup> map; + + /** + * The ordered array of variables which define the distinct groups to + * be aggregated. + */ + private final IVariable<?>[] groupBy; + + /** + * The {@link IValueExpression}s used to compute each of the variables + * in the aggregated solutions. + */ + private final IValueExpression<?>[] compute; + + /** + * Optional constraints applied to the aggregated solutions. + */ + private final IConstraint[] having; + + /** + * Optional set of variables to be projected out of the GROUP_BY + * operator. When <code>null</code>, all variables will be projected + * out. + */ + private final IVariable<?>[] select; + + GroupByTask(final MemoryGroupByOp op, + final BOpContext<IBindingSet> context) { + + this.context = context; + + // must be non-null, and non-empty array w/o dups. + this.groupBy = (IVariable[]) op + .getRequiredProperty(GroupByOp.Annotations.GROUP_BY); + + if (groupBy == null) + throw new IllegalArgumentException(); + + if (groupBy.length == 0) + throw new IllegalArgumentException(); + + /* + * Must be non-null, and non-empty array. Any variables in the + * source solutions may only appear within aggregation operators + * such as SUM, COUNT, etc. Variables declared in [compute] may be + * referenced inside the value expressions as long as they do not + * appear within an aggregation function, but they they must be + * defined earlier in the ordered compute[]. The value expressions + * must include an assignment to the appropriate aggregate variable. + * + * FIXME This must include a LET or BIND to assign the computed + * value to the appropriate variable. + * + * FIXME verify references to unaggregated and aggregated variables. + */ + this.compute = (IValueExpression<?>[]) op + .getRequiredProperty(GroupByOp.Annotations.COMPUTE); + + if (compute == null) + throw new IllegalArgumentException(); + + if (compute.length == 0) + throw new IllegalArgumentException(); + + // may be null or empty[]. + this.having = (IConstraint[]) op + .getRequiredProperty(GroupByOp.Annotations.HAVING); + + /* + * The variables to project out of the GROUP_BY operator. This may + * be null, but not empty[]. + * + * TODO Variables may only appear once and must be distinct from the + * source variables. + */ + this.select = (IVariable[]) op + .getRequiredProperty(GroupByOp.Annotations.SELECT); + + if (select != null && select.length == 0) + throw new IllegalArgumentException(); + + // The map is shared state across invocations of this operator task. + this.map = ((GroupByStats) context.getStats()).map; + + } + + /** + * Return the "row" for the groupBy variables. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + private SolutionGroup accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[groupBy.length]; + + for (int i = 0; i < groupBy.length; i++) { + + /* + * Note: This allows null's. + * + * @todo write a unit test when some variables are not bound. + */ + r[i] = bset.get(groupBy[i]); + + } + + final SolutionGroup s = new SolutionGroup(r); + + map.putIfAbsent(s, s); + + return s; + + } + + public Void call() throws Exception { + + final BOpStats stats = context.getStats(); + + final boolean isLastInvocation = context.isLastInvocation(); + + final IAsynchronousIterator<IBindingSet[]> itr = context + .getSource(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + try { + + /* + * Present each source solution in turn, identifying the group + * into which it falls and then applying the value expressions + * to update the aggregated variable bindings for that group. + */ + while (itr.hasNext()) { + + final IBindingSet[] a = itr.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(a.length); + + for (IBindingSet bset : a) { + + // identify the solution group. + final SolutionGroup solutionGroup = accept(bset); + + // aggregate the bindings + solutionGroup.aggregate(bset, compute); + + } + + } + + if (isLastInvocation) { + + /* + * Write aggregated solutions on the sink, applying the + * [having] filter to remove any solutions which do not + * satisfy its constraints. + */ + + final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); + + int naccepted = 0; + + for(SolutionGroup solutionGroup: map.values()) { + + synchronized(solutionGroup) { + + IBindingSet bset = solutionGroup.aggregatedBSet; + + // verify optional constraint(s) + if (having != null + && !BOpUtility.isConsistent(having, bset)) { + + // skip this group. + continue; + + } + + /* + * We will accept this solution group, so filter out + * any variables which are not being projected out + * of this operator. + */ + if (log.isDebugEnabled()) + log.debug("accepted: " + solutionGroup); + + // optionally strip off unnecessary variables. + bset = select == null ? bset : bset + .copy(select); + + accepted.add(bset); + + naccepted++; + + } + + } + + /* + * Output the aggregated bindings for the accepted + * solutions. + */ + if (naccepted > 0) { + + final IBindingSet[] b = accepted + .toArray(new IBindingSet[naccepted]); + + sink.add(b); + + // flush the output. + sink.flush(); + + // discard the map. + map.clear(); + + } + + } + + // done. + return null; + + } finally { + + sink.close(); + + } + + } // call() + + } // GroupByTask + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -79,6 +79,9 @@ // pure binding set operators. suite.addTest(com.bigdata.bop.bset.TestAll.suite()); + // bind(var,expr) + suite.addTestSuite(TestBind.class); + // index operators. suite.addTest(com.bigdata.bop.ndx.TestAll.suite()); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,72 @@ +/** + * + */ +package com.bigdata.bop; + +import com.bigdata.bop.bindingSet.ListBindingSet; + +import junit.framework.TestCase2; + +/** + * Unit tests for {@link Bind}. + * + * @author thompsonbry + * + * @todo Write a test where the {@link IValueExpression} given to bind is more + * complex than an {@link IVariable} or an {@link IConstant}. + */ +public class TestBind extends TestCase2 { + + /** + * + */ + public TestBind() { + } + + /** + * @param name + */ + public TestBind(String name) { + super(name); + } + + /** + * Unit test of bind(var,constant). + */ + public void test_bind_constant() { + + final IBindingSet bset = new ListBindingSet(); + + final IVariable<?> y = Var.var("y"); + + // verify bind() returns the value of the constant. + assertEquals(Integer.valueOf(12), new Bind(y, new Constant<Integer>( + Integer.valueOf(12))).get(bset)); + + // verify side-effect on the binding set. + assertEquals(new Constant<Integer>(Integer.valueOf(12)), bset.get(y)); + + } + + /** + * Unit test of bind(var,otherVar). + */ + public void test_bind_var() { + + final IBindingSet bset = new ListBindingSet(); + + final IVariable<?> x = Var.var("x"); + + final IVariable<?> y = Var.var("y"); + + bset.set(x, new Constant<Integer>(12)); + + // verify bind() returns the value of the other variable. + assertEquals(Integer.valueOf(12), new Bind(y, x).get(bset)); + + // verify side-effect on the binding set. + assertEquals(new Constant<Integer>(Integer.valueOf(12)), bset.get(y)); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -85,6 +85,7 @@ Constant.class,// Var.class,// QuoteOp.class,// + Bind.class,// // com.bigdata.bop.constraint EQ.class,// NE.class,// Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -110,17 +110,17 @@ * multiple chunks of solutions. */ - // stress test for SliceOp. + // stress test for SLICE suite.addTestSuite(TestQueryEngine_Slice.class); - // ORDER BY implementations. + // stress test for ORDER_BY suite.addTestSuite(TestQueryEngine_SortOp.class); - // @todo DISTINCT implementations. -// suite.addTestSuite(TestQueryEngine_SortOp.class); + // stress test for DISTINCT. + suite.addTestSuite(TestQueryEngine_DistinctOp.class); - // @todo GROUP BY implementations. -// suite.addTestSuite(TestQueryEngine_SortOp.class); + // stress test for GROUP_BY. + suite.addTestSuite(TestQueryEngine_GroupByOp.class); return suite; Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,306 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.solutions.ComparatorOp; +import com.bigdata.bop.solutions.ISortOrder; +import com.bigdata.bop.solutions.MemorySortOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.bop.solutions.SortOrder; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * Test suite for DISTINCT solution operators when integrated with the query + * engine. This test suite is designed to examine cases where the DISTINCT + * operator will have to buffer multiple chunks of solutions before finally + * reporting the aggregated solutions. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: TestQueryEngine2.java 3489 2010-09-01 18:27:35Z thompsonbry $ + * + * @todo Test each DISTINCT implementation here. + */ +public class TestQueryEngine_DistinctOp extends TestCase2 { + + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + Journal jnl; + QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * + */ + public TestQueryEngine_DistinctOp() { + } + + /** + * @param name + */ + public TestQueryEngine_DistinctOp(String name) { + super(name); + } + + public void testStressThreadSafe() throws Exception { + + for (int i = 0; i < 100; i++) { + + try { + + test_distinct_threadSafe(); + + } catch (Throwable t) { + + fail("Failed after " + i + " trials", t); + + } + + } + + } + + /** + * @todo Unit test for DISTINCT. How to judge correctness? + */ + public void test_distinct_threadSafe() throws Exception { + + final long timeout = 10000; // ms + + final int ntrials = 10000; + + final int poolSize = 10; + + doDistinctTest(10000/* maxInt */, timeout, ntrials, poolSize); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * + * @param timeout + * @param ntrials + * @param poolSize + * + * @return The #of successful trials. + * + * @throws Exception + */ + protected void doDistinctTest(final int maxInt, + final long timeout, final int ntrials, final int poolSize) + throws Exception { + + fail("write test helper"); + + int ngiven = 0; + final IVariable<?> a = Var.var("a"); + final IBindingSet[][] chunks = new IBindingSet[ntrials][]; + { + final Random r = new Random(); + for (int i = 0; i < chunks.length; i++) { + // random non-zero chunk size + chunks[i] = new IBindingSet[r.nextInt(10) + 1]; + for (int j = 0; j < chunks[i].length; j++) { + final IBindingSet bset = new ListBindingSet(); + bset.set(a, new Constant<Integer>(r.nextInt(maxInt))); + chunks[i][j] = bset; + ngiven++; + } + } + } + + final int startId = 1; + final int sortId = 2; + + /* + * Note: The StartOp breaks up the initial set of chunks into multiple + * IChunkMessages, which results in multiple invocations of the SortOp. + */ + final PipelineOp startOp = new StartOp(new BOp[]{}, NV.asMap(new NV[]{// + new NV(SliceOp.Annotations.BOP_ID, startId),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = new MemorySortOp(new BOp[] {startOp}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, sortId),// + new NV(MemorySortOp.Annotations.COMPARATOR, + new IntegerComparatorOp( + new ISortOrder[] { new SortOrder(a, + true) })),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(MemorySortOp.Annotations.PIPELINED, false),// + })); + + final UUID queryId = UUID.randomUUID(); + final IRunningQuery q = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1/* partitionId */, + newBindingSetIterator(chunks))); + + // consume solutions. + int nsolutions = 0; + final IAsynchronousIterator<IBindingSet[]> itr = q.iterator(); + while (itr.hasNext()) { + nsolutions += itr.next().length; + } + + // wait for the query to terminate. + q.get(); + + // Verify stats. + final BOpStats stats = (BOpStats) q.getStats().get(sortId); + if (log.isInfoEnabled()) + log.info(getClass().getName() + "." + getName() + " : " + stats); + assertNotNull(stats); + assertEquals(ngiven, nsolutions); + assertEquals(ngiven, stats.unitsIn.get()); + assertEquals(ngiven, stats.unitsOut.get()); + + } + + /** + * Helper class for comparing solution sets having variables which evaluate + * to {@link Integer} values. + */ + static private class IntegerComparatorOp extends ComparatorOp + { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** The sort order. */ + final private ISortOrder<?> [] _sors; + + public IntegerComparatorOp ( final ISortOrder<?> sors [] ) + { + super ( new BOp [] {}, NV.asMap ( new NV [] { new NV ( ComparatorOp.Annotations.ORDER, sors ) } ) ) ; + _sors = sors ; + } + + public int compare ( IBindingSet o1, IBindingSet o2 ) + { + for ( ISortOrder<?> sor : _sors ) + { + int ret = compare ( sor, o1, o2 ) ; + if ( 0 != ret ) + return ret ; + } + return 0 ; + } + + private int compare ( ISortOrder<?> sor, IBindingSet lhs, IBindingSet rhs ) + { + int compare = 0 ; + + IConstant<?> lhsv = lhs.get ( sor.getVariable () ) ; + IConstant<?> rhsv = rhs.get ( sor.getVariable () ) ; + + if ( null == lhsv && null == rhsv ) + return 0 ; + else if ( null == lhsv ) + compare = -1 ; + else if ( null == rhsv ) + compare = 1 ; + else + compare = ((Integer) lhsv.get()).compareTo(((Integer) rhsv + .get())) ; + + return compare * ( sor.isAscending () ? 1 : -1 ) ; + } + + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,306 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.solutions.ComparatorOp; +import com.bigdata.bop.solutions.ISortOrder; +import com.bigdata.bop.solutions.MemorySortOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.bop.solutions.SortOrder; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * Test suite for GROUP_BY operators when integrated with the query engine. This + * test suite is designed to examine cases where the GROUP_BY operator will have + * to buffer multiple chunks of solutions before finally reporting the aggregated + * solutions. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: TestQueryEngine2.java 3489 2010-09-01 18:27:35Z thompsonbry $ + * + * @todo Test each GROUP_BY implementation here. + */ +public class TestQueryEngine_GroupByOp extends TestCase2 { + + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + Journal jnl; + QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * + */ + public TestQueryEngine_GroupByOp() { + } + + /** + * @param name + */ + public TestQueryEngine_GroupByOp(String name) { + super(name); + } + + public void testStressThreadSafe() throws Exception { + + for (int i = 0; i < 100; i++) { + + try { + + test_groupBy_threadSafe(); + + } catch (Throwable t) { + + fail("Failed after " + i + " trials", t); + + } + + } + + } + + /** + * @todo Unit test for GROUP BY. How to judge correctness? + */ + public void test_groupBy_threadSafe() throws Exception { + + final long timeout = 10000; // ms + + final int ntrials = 10000; + + final int poolSize = 10; + + doGroupByTest(10000/* maxInt */, timeout, ntrials, poolSize); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * + * @param timeout + * @param ntrials + * @param poolSize + * + * @return The #of successful trials. + * + * @throws Exception + */ + protected void doGroupByTest(final int maxInt, + final long timeout, final int ntrials, final int poolSize) + throws Exception { + + fail("write test helper"); + + int ngiven = 0; + final IVariable<?> a = Var.var("a"); + final IBindingSet[][] chunks = new IBindingSet[ntrials][]; + { + final Random r = new Random(); + for (int i = 0; i < chunks.length; i++) { + // random non-zero chunk size + chunks[i] = new IBindingSet[r.nextInt(10) + 1]; + for (int j = 0; j < chunks[i].length; j++) { + final IBindingSet bset = new ListBindingSet(); + bset.set(a, new Constant<Integer>(r.nextInt(maxInt))); + chunks[i][j] = bset; + ngiven++; + } + } + } + + final int startId = 1; + final int sortId = 2; + + /* + * Note: The StartOp breaks up the initial set of chunks into multiple + * IChunkMessages, which results in multiple invocations of the SortOp. + */ + final PipelineOp startOp = new StartOp(new BOp[]{}, NV.asMap(new NV[]{// + new NV(SliceOp.Annotations.BOP_ID, startId),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = new MemorySortOp(new BOp[] {startOp}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, sortId),// + new NV(MemorySortOp.Annotations.COMPARATOR, + new IntegerComparatorOp( + new ISortOrder[... [truncated message content] |