From: <tho...@us...> - 2010-09-05 18:16:10
|
Revision: 3509 http://bigdata.svn.sourceforge.net/bigdata/?rev=3509&view=rev Author: thompsonbry Date: 2010-09-05 18:16:01 +0000 (Sun, 05 Sep 2010) Log Message: ----------- Added a "slice" operator and a basic unit test for that operator. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/IQueryOptions.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/QueryOptions.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/NestedSubqueryWithJoinThreadsTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/RuleStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/JoinMasterTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/rule/TestSlice.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestSlice.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOrder.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SparqlBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/package.html branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/ISlice.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSPARQLBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/package.html branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestSPARQLBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSortBindingSets.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -32,6 +32,8 @@ import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.btree.IRangeQuery; @@ -652,6 +654,24 @@ } + /** + * Cancel the running query (normal termination). + * <p> + * Note: This method provides a means for an operator to indicate that the + * query should halt immediately. It used used by {@link SliceOp}, which + * needs to terminate the entire query once the slice has been satisfied. + * (If {@link SliceOp} just jumped out of its own evaluation loop then the + * query would not produce more results, but it would continue to run and + * the over produced results would just be thrown away.) + * <p> + * Note: When an individual {@link BOp} evaluation throws an exception, the + * {@link QueryEngine} will catch that exception and halt query evaluation + * with that thrown cause. + */ + public void halt() { + + } + /* * I've replaced this with AbstractSplitter for the moment. */ Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,77 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Sep 4, 2010 - */ - -package com.bigdata.bop.aggregation; - -import java.util.Comparator; -import java.util.Map; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IBindingSet; - -/** - * Base class for operators which impose a sort order on binding sets. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -abstract public class ComparatorOp extends BOpBase implements - Comparator<IBindingSet> { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends BOp.Annotations { - - /** - * An {@link ISortOrder}[] specifying the variables on which the sort - * will be imposed and the order (ascending or descending) for each - * variable. - */ - String ORDER = ComparatorOp.class.getName() + ".order"; - - } - - /** - * @param op - */ - public ComparatorOp(BOpBase op) { - super(op); - } - - /** - * @param args - * @param annotations - */ - public ComparatorOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,324 +0,0 @@ -package com.bigdata.bop.aggregation; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BindingSetPipelineOp; -import com.bigdata.bop.HashBindingSet; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstant; -import com.bigdata.bop.IVariable; -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -/** - * A pipelined DISTINCT operator based on a hash table. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z - * thompsonbry $ - */ -public class DistinctBindingSetOp extends BindingSetPipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends BindingSetPipelineOp.Annotations { - - /** - * The initial capacity of the {@link ConcurrentHashMap} used to impose - * the distinct constraint. - * - * @see #DEFAULT_INITIAL_CAPACITY - */ - String INITIAL_CAPACITY = DistinctBindingSetOp.class.getName()+".initialCapacity"; - - int DEFAULT_INITIAL_CAPACITY = 16; - - /** - * The load factor of the {@link ConcurrentHashMap} used to impose - * the distinct constraint. - * - * @see #DEFAULT_LOAD_FACTOR - */ - String LOAD_FACTOR = DistinctBindingSetOp.class.getName()+".loadFactor"; - - float DEFAULT_LOAD_FACTOR = .75f; - - /** - * The concurrency level of the {@link ConcurrentHashMap} used to impose - * the distinct constraint. - * - * @see #DEFAULT_CONCURRENCY_LEVEL - */ - String CONCURRENCY_LEVEL = DistinctBindingSetOp.class.getName()+".concurrencyLevel"; - - int DEFAULT_CONCURRENCY_LEVEL = 16; - - /** - * The variables on which the distinct constraint will be imposed. - * Binding sets with distinct values for the specified variables will be - * passed on. - */ - String VARIABLES = DistinctBindingSetOp.class.getName() + ".variables"; - - } - - /** - * Required deep copy constructor. - */ - public DistinctBindingSetOp(final DistinctBindingSetOp op) { - super(op); - } - - /** - * Required shallow copy constructor. - */ - public DistinctBindingSetOp(final BOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - } - - /** - * @see Annotations#INITIAL_CAPACITY - */ - public int getInitialCapacity() { - - return getProperty(Annotations.INITIAL_CAPACITY, - Annotations.DEFAULT_INITIAL_CAPACITY); - - } - - /** - * @see Annotations#LOAD_FACTOR - */ - public float getLoadFactor() { - - return getProperty(Annotations.LOAD_FACTOR, - Annotations.DEFAULT_LOAD_FACTOR); - - } - - /** - * @see Annotations#CONCURRENCY_LEVEL - */ - public int getConcurrencyLevel() { - - return getProperty(Annotations.CONCURRENCY_LEVEL, - Annotations.DEFAULT_CONCURRENCY_LEVEL); - - } - - /** - * @see Annotations#VARIABLES - */ - public IVariable<?>[] getVariables() { - - return getRequiredProperty(Annotations.VARIABLES); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new DistinctTask(this, context)); - - } - - /** - * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. - */ - private static class Solution { - private final int hash; - - private final IConstant<?>[] vals; - - public Solution(final IConstant<?>[] vals) { - this.vals = vals; - this.hash = java.util.Arrays.hashCode(vals); - } - - public int hashCode() { - return hash; - } - - public boolean equals(final Object o) { - if (this == o) - return true; - if (!(o instanceof Solution)) { - return false; - } - final Solution t = (Solution) o; - if (vals.length != t.vals.length) - return false; - for (int i = 0; i < vals.length; i++) { - // @todo verify that this allows for nulls with a unit test. - if (vals[i] == t.vals[i]) - continue; - if (vals[i] == null) - return false; - if (!vals[i].equals(t.vals[i])) - return false; - } - return true; - } - } - - /** - * Task executing on the node. - */ - static private class DistinctTask implements Callable<Void> { - - private final BOpContext<IBindingSet> context; - - /** - * A concurrent map whose keys are the bindings on the specified - * variables (the keys and the values are the same since the map - * implementation does not allow <code>null</code> values). - */ - private /*final*/ ConcurrentHashMap<Solution, Solution> map; - - /** - * The variables used to impose a distinct constraint. - */ - private final IVariable<?>[] vars; - - DistinctTask(final DistinctBindingSetOp op, - final BOpContext<IBindingSet> context) { - - this.context = context; - - this.vars = op.getVariables(); - - if (vars == null) - throw new IllegalArgumentException(); - - if (vars.length == 0) - throw new IllegalArgumentException(); - - this.map = new ConcurrentHashMap<Solution, Solution>( - op.getInitialCapacity(), op.getLoadFactor(), - op.getConcurrencyLevel()); - - } - - /** - * If the bindings are distinct for the configured variables then return - * those bindings. - * - * @param bset - * The binding set to be filtered. - * - * @return The distinct as bound values -or- <code>null</code> if the - * binding set duplicates a solution which was already accepted. - */ - private IConstant<?>[] accept(final IBindingSet bset) { - - final IConstant<?>[] r = new IConstant<?>[vars.length]; - - for (int i = 0; i < vars.length; i++) { - - /* - * Note: This allows null's. - * - * @todo write a unit test when some variables are not bound. - */ - r[i] = bset.get(vars[i]); - - } - - final Solution s = new Solution(r); - - final boolean distinct = map.putIfAbsent(s, s) == null; - - return distinct ? r : null; - - } - - public Void call() throws Exception { - - final BOpStats stats = context.getStats(); - - final IAsynchronousIterator<IBindingSet[]> itr = context - .getSource(); - - final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - - try { - - while (itr.hasNext()) { - - final IBindingSet[] a = itr.next(); - - stats.chunksIn.increment(); - stats.unitsIn.add(a.length); - - final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); - - int naccepted = 0; - - for (IBindingSet bset : a) { - -// System.err.println("considering: " + bset); - - final IConstant<?>[] vals = accept(bset); - - if (vals != null) { - -// System.err.println("accepted: " -// + Arrays.toString(vals)); - - accepted.add(new HashBindingSet(vars, vals)); - - naccepted++; - - } - - } - - if (naccepted > 0) { - - final IBindingSet[] b = accepted - .toArray(new IBindingSet[naccepted]); - -// System.err.println("output: " -// + Arrays.toString(b)); - - sink.add(b); - - stats.unitsOut.add(naccepted); - stats.chunksOut.increment(); - - } - - } - - sink.flush(); - - // done. - return null; - - } finally { - - sink.close(); - - // discard the map. - map = null; - - } - - } - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,69 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on Sep 24, 2008 - */ - -package com.bigdata.bop.aggregation; - -import java.io.Serializable; - -import com.bigdata.relation.accesspath.IAccessPath; - -/** - * Indicates the first solution to be returned to the caller (offset) and the - * #of solutions to be returned (limit). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public interface ISlice extends Serializable { - - /** - * The first solution to be returned to the caller. A value of ZERO (0) - * indicates that all solutions should be returned. - */ - public long getOffset(); - - /** - * The maximum #of solutions to be returned to the caller. A value of - * {@link Long#MAX_VALUE} indicates that there is no limit. - * - * @todo modify to be consistent with - * {@link IAccessPath#iterator(long, long, int)} where a limit of ZERO - * (0L) is interpreted as NO limit and a limit of - * {@link Long#MAX_VALUE} is interpreted as ZERO (0L) (that is, also - * no limit). - */ - public long getLimit(); - - /** - * The index of the last solution that we will generate (OFFSET + LIMIT). If - * OFFSET + LIMIT would be greater than {@link Long#MAX_VALUE}, then use - * {@link Long#MAX_VALUE} instead. - */ - public long getLast(); - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,55 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on Sep 24, 2008 - */ - -package com.bigdata.bop.aggregation; - -import java.io.Serializable; - -import com.bigdata.bop.IVariable; - -/** - * A variable and an order that will be imposed on the values for that variable. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public interface ISortOrder<E> extends Serializable { - - /** - * The variable whose values will be sorted. - */ - IVariable<E> getVariable(); - - /** - * <code>true</code> iff the values will be placed into an ascending sort - * and <code>false</code> if the values will be placed into a descending - * sort. - */ - boolean isAscending(); - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,112 +0,0 @@ -package com.bigdata.bop.aggregation; - -import java.util.Arrays; -import java.util.Comparator; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BOpUtility; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -/** - * An in-memory merge sort for binding sets. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z - * thompsonbry $ - * - * @todo unit tests. - * @todo do an external merge sort operator. - */ -public class MemorySortOp extends SortOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * Required deep copy constructor. - */ - public MemorySortOp(final MemorySortOp op) { - super(op); - } - - /** - * Required shallow copy constructor. - */ - public MemorySortOp(final BOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new SortTask(this, context)); - - } - - /** - * Task executing on the node. - */ - static private class SortTask implements Callable<Void> { - - private final BOpContext<IBindingSet> context; - - /** - * The binding set comparator. - */ - private final Comparator<IBindingSet> comparator; - - SortTask(final MemorySortOp op, - final BOpContext<IBindingSet> context) { - - this.context = context; - - this.comparator = op.getComparator(); - - } - - public Void call() throws Exception { - - final BOpStats stats = context.getStats(); - - final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - - try { - - final IBindingSet[] all = BOpUtility.toArray(context - .getSource(), stats); - - // sort. - Arrays.sort(all, comparator); - - // update counters. - stats.unitsOut.add(all.length); - stats.chunksOut.increment(); - - // write output and flush. - sink.add(all); - sink.flush(); - - // done. - return null; - - } finally { - - sink.close(); - - } - - } - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,129 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on Sep 24, 2008 - */ - -package com.bigdata.bop.aggregation; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.math.BigInteger; - -/** - * Default implementation. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class Slice implements ISlice, Externalizable { - - /** - * - */ - private static final long serialVersionUID = 5396509164843609197L; - - private long offset; - private long limit; - private long last; - - /** - * A slice corresponding to all results (offset is zero, limit is - * {@link Long#MAX_VALUE}). - */ - public static final transient ISlice ALL = new Slice(0, Long.MAX_VALUE); - - /** - * - * @param offset - * @param limit - * - * @throws IllegalArgumentException - * if offset is negative. - * @throws IllegalArgumentException - * if limit is non-positive. - */ - public Slice(final long offset, final long limit) { - - if (offset < 0) - throw new IllegalArgumentException(); - - if (limit <= 0) - throw new IllegalArgumentException(); - - this.offset = offset; - - this.limit = limit; - - // @todo what is a cheaper way to do this? - this.last = BigInteger.valueOf(offset).add(BigInteger.valueOf(limit)) - .min(BigInteger.valueOf(Long.MAX_VALUE)).longValue(); - - } - - public long getOffset() { - - return offset; - - } - - public long getLimit() { - - return limit; - - } - - public long getLast() { - - return last; - - } - - public String toString() { - - return "Slice{offset="+offset+", limit="+limit+", last="+last+"}"; - - } - - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - - offset = in.readLong(); - limit = in.readLong(); - last = in.readLong(); - - } - - public void writeExternal(ObjectOutput out) throws IOException { - - out.writeLong(offset); - out.writeLong(limit); - out.writeLong(last); - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,86 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Sep 4, 2010 - */ - -package com.bigdata.bop.aggregation; - -import java.util.Map; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BindingSetPipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.PipelineOp; - -/** - * Base class for operators which sort binding sets. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -abstract public class SortOp extends BindingSetPipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends BindingSetPipelineOp.Annotations { - - /** - * The {@link ComparatorOp} which will impose the ordering on the - * binding sets. - * - * @see ComparatorOp - */ - String COMPARATOR = MemorySortOp.class.getName() + ".comparator"; - - } - - /** - * @param op - */ - public SortOp(PipelineOp<IBindingSet> op) { - super(op); - } - - /** - * @param args - * @param annotations - */ - public SortOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - - /** - * @see Annotations#COMPARATOR - */ - public ComparatorOp getComparator() { - - return getRequiredProperty(Annotations.COMPARATOR); - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,80 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on Sep 24, 2008 - */ - -package com.bigdata.bop.aggregation; - -import com.bigdata.bop.IVariable; - -/** - * Default impl. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class SortOrder<E> implements ISortOrder<E> { - - /** - * - */ - private static final long serialVersionUID = -669873421670514139L; - - private final IVariable<E> var; - private final boolean asc; - - /** - * - * @param var - * The variable. - * @param asc - * <code>true</code> for an ascending sort and - * <code>false</code> for a descending sort. - */ - public SortOrder(final IVariable<E> var, final boolean asc) { - - if (var == null) - throw new IllegalArgumentException(); - - this.var = var; - - this.asc = asc; - - } - - public IVariable<E> getVariable() { - - return var; - - } - - public boolean isAscending() { - - return asc; - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,101 +0,0 @@ -package com.bigdata.bop.aggregation; - -import java.util.Comparator; -import java.util.Map; - -import org.openrdf.query.algebra.evaluation.util.ValueComparator; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IVariable; - -/** - * A comparator for SPARQL binding sets. - * - * @see http://www.w3.org/TR/rdf-sparql-query/#modOrderBy - * @see ValueComparator - * - * @todo unit tests. - */ -public class SparqlBindingSetComparatorOp extends ComparatorOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * Required deep copy constructor. - */ - public SparqlBindingSetComparatorOp(final SparqlBindingSetComparatorOp op) { - super(op); - } - - /** - * Required shallow copy constructor. - */ - public SparqlBindingSetComparatorOp(final BOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - } - - /** - * @see Annotations#ORDER - */ - public ISortOrder<?>[] getOrder() { - - return getRequiredProperty(Annotations.ORDER); - - } - - /** - * The sort order to be imposed. - */ - private transient ISortOrder<?>[] order; - - private transient Comparator vc; - - public int compare(final IBindingSet bs1, final IBindingSet bs2) { - - if (order == null) { - - // lazy initialization. - order = getOrder(); - - if (order == null) - throw new IllegalArgumentException(); - - if (order.length == 0) - throw new IllegalArgumentException(); - - // comparator for RDF Value objects. - vc = new ValueComparator(); - - } - - for (int i = 0; i < order.length; i++) { - - final ISortOrder<?> o = order[i]; - - final IVariable v = o.getVariable(); - - int ret = vc.compare(bs1.get(v).get(), bs2.get(v).get()); - - if (!o.isAscending()) - ret = -ret; - - if (ret != 0) { - // not equal for this variable. - return ret; - } - - } - - // equal for all variables. - return 0; - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html 2010-09-05 17:02:34 UTC (rev 3508) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html 2010-09-05 18:16:01 UTC (rev 3509) @@ -1,17 +0,0 @@ -<html> -<head> -<title>solution modifier operators (distinct, sort, slice, and aggregation)</title> -</head> -<body> - -<p> - - This package provides distinct, sort, and aggregation operators. All of - these are potentially high volume hash partitioned operations against a - clustered database. Both in memory and disk based versions of the each - operator should be implemented. - -</p> - -</body> -</html> \ No newline at end of file Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -0,0 +1,77 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 4, 2010 + */ + +package com.bigdata.bop.solutions; + +import java.util.Comparator; +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; +import com.bigdata.bop.IBindingSet; + +/** + * Base class for operators which impose a sort order on binding sets. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +abstract public class ComparatorOp extends BOpBase implements + Comparator<IBindingSet> { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends BOp.Annotations { + + /** + * An {@link ISortOrder}[] specifying the variables on which the sort + * will be imposed and the order (ascending or descending) for each + * variable. + */ + String ORDER = ComparatorOp.class.getName() + ".order"; + + } + + /** + * @param op + */ + public ComparatorOp(BOpBase op) { + super(op); + } + + /** + * @param args + * @param annotations + */ + public ComparatorOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -0,0 +1,324 @@ +package com.bigdata.bop.solutions; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.HashBindingSet; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * A pipelined DISTINCT operator based on a hash table. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + */ +public class DistinctBindingSetOp extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends BindingSetPipelineOp.Annotations { + + /** + * The initial capacity of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_INITIAL_CAPACITY + */ + String INITIAL_CAPACITY = DistinctBindingSetOp.class.getName()+".initialCapacity"; + + int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The load factor of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_LOAD_FACTOR + */ + String LOAD_FACTOR = DistinctBindingSetOp.class.getName()+".loadFactor"; + + float DEFAULT_LOAD_FACTOR = .75f; + + /** + * The concurrency level of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_CONCURRENCY_LEVEL + */ + String CONCURRENCY_LEVEL = DistinctBindingSetOp.class.getName()+".concurrencyLevel"; + + int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The variables on which the distinct constraint will be imposed. + * Binding sets with distinct values for the specified variables will be + * passed on. + */ + String VARIABLES = DistinctBindingSetOp.class.getName() + ".variables"; + + } + + /** + * Required deep copy constructor. + */ + public DistinctBindingSetOp(final DistinctBindingSetOp op) { + super(op); + } + + /** + * Required shallow copy constructor. + */ + public DistinctBindingSetOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + } + + /** + * @see Annotations#INITIAL_CAPACITY + */ + public int getInitialCapacity() { + + return getProperty(Annotations.INITIAL_CAPACITY, + Annotations.DEFAULT_INITIAL_CAPACITY); + + } + + /** + * @see Annotations#LOAD_FACTOR + */ + public float getLoadFactor() { + + return getProperty(Annotations.LOAD_FACTOR, + Annotations.DEFAULT_LOAD_FACTOR); + + } + + /** + * @see Annotations#CONCURRENCY_LEVEL + */ + public int getConcurrencyLevel() { + + return getProperty(Annotations.CONCURRENCY_LEVEL, + Annotations.DEFAULT_CONCURRENCY_LEVEL); + + } + + /** + * @see Annotations#VARIABLES + */ + public IVariable<?>[] getVariables() { + + return getRequiredProperty(Annotations.VARIABLES); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new DistinctTask(this, context)); + + } + + /** + * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. + */ + private static class Solution { + private final int hash; + + private final IConstant<?>[] vals; + + public Solution(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Solution)) { + return false; + } + final Solution t = (Solution) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo verify that this allows for nulls with a unit test. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + } + + /** + * Task executing on the node. + */ + static private class DistinctTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + */ + private /*final*/ ConcurrentHashMap<Solution, Solution> map; + + /** + * The variables used to impose a distinct constraint. + */ + private final IVariable<?>[] vars; + + DistinctTask(final DistinctBindingSetOp op, + final BOpContext<IBindingSet> context) { + + this.context = context; + + this.vars = op.getVariables(); + + if (vars == null) + throw new IllegalArgumentException(); + + if (vars.length == 0) + throw new IllegalArgumentException(); + + this.map = new ConcurrentHashMap<Solution, Solution>( + op.getInitialCapacity(), op.getLoadFactor(), + op.getConcurrencyLevel()); + + } + + /** + * If the bindings are distinct for the configured variables then return + * those bindings. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + private IConstant<?>[] accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[vars.length]; + + for (int i = 0; i < vars.length; i++) { + + /* + * Note: This allows null's. + * + * @todo write a unit test when some variables are not bound. + */ + r[i] = bset.get(vars[i]); + + } + + final Solution s = new Solution(r); + + final boolean distinct = map.putIfAbsent(s, s) == null; + + return distinct ? r : null; + + } + + public Void call() throws Exception { + + final BOpStats stats = context.getStats(); + + final IAsynchronousIterator<IBindingSet[]> itr = context + .getSource(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + try { + + while (itr.hasNext()) { + + final IBindingSet[] a = itr.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(a.length); + + final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); + + int naccepted = 0; + + for (IBindingSet bset : a) { + +// System.err.println("considering: " + bset); + + final IConstant<?>[] vals = accept(bset); + + if (vals != null) { + +// System.err.println("accepted: " +// + Arrays.toString(vals)); + + accepted.add(new HashBindingSet(vars, vals)); + + naccepted++; + + } + + } + + if (naccepted > 0) { + + final IBindingSet[] b = accepted + .toArray(new IBindingSet[naccepted]); + +// System.err.println("output: " +// + Arrays.toString(b)); + + sink.add(b); + + stats.unitsOut.add(naccepted); + stats.chunksOut.increment(); + + } + + } + + sink.flush(); + + // done. + return null; + + } finally { + + sink.close(); + + // discard the map. + map = null; + + } + + } + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -0,0 +1,55 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +/* + * Created on Sep 24, 2008 + */ + +package com.bigdata.bop.solutions; + +import java.io.Serializable; + +import com.bigdata.bop.IVariable; + +/** + * A variable and an order that will be imposed on the values for that variable. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface ISortOrder<E> extends Serializable { + + /** + * The variable whose values will be sorted. + */ + IVariable<E> getVariable(); + + /** + * <code>true</code> iff the values will be placed into an ascending sort + * and <code>false</code> if the values will be placed into a descending + * sort. + */ + boolean isAscending(); + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -0,0 +1,112 @@ +package com.bigdata.bop.solutions; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * An in-memory merge sort for binding sets. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + * + * @todo unit tests. + * @todo do an external merge sort operator. + */ +public class MemorySortOp extends SortOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Required deep copy constructor. + */ + public MemorySortOp(final MemorySortOp op) { + super(op); + } + + /** + * Required shallow copy constructor. + */ + public MemorySortOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new SortTask(this, context)); + + } + + /** + * Task executing on the node. + */ + static private class SortTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + + /** + * The binding set comparator. + */ + private final Comparator<IBindingSet> comparator; + + SortTask(final MemorySortOp op, + final BOpContext<IBindingSet> context) { + + this.context = context; + + this.comparator = op.getComparator(); + + } + + public Void call() throws Exception { + + final BOpStats stats = context.getStats(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + try { + + final IBindingSet[] all = BOpUtility.toArray(context + .getSource(), stats); + + // sort. + Arrays.sort(all, comparator); + + // update counters. + stats.unitsOut.add(all.length); + stats.chunksOut.increment(); + + // write output and flush. + sink.add(all); + sink.flush(); + + // done. + return null; + + } finally { + + sink.close(); + + } + + } + + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-05 18:16:01 UTC (rev 3509) @@ -0,0 +1,275 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 5, 2010 + */ + +package com.bigdata.bop.solutions; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.UnsynchronizedArrayBuffer; +import com.bigdata.service.IBigdataFederation; + +/** + * An operator which imposes an offset/limit on a binding set pipeline. + * <p> + * Note: join processing typically involves concurrent processes, hence the + * order of the results will not be stable unless the results are sorted before + * applying the slice. When a slice is applied without a sort, the same query + * may return different results each time it is evaluated. + * <p> + * Note: When running on an {@link IBigdataFederation}, this operator must be + * imposed on the query controller so it can count the solutions as they flow + * through. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo If this operator is invoked for each chunk output by a query onto the + * pipeline then it will over produce unless (A) it is given the same + * {@link BOpStats} each time; and (B) it is not invoked for two chunks + * concurrently. + * <p> + * A safer way to impose the slice constraint is by wrapping the query + * buffer on the query controller. Once the slice is satisfied, it can + * just cancel the query. The only drawback of this approach is that the + * wrapping a buffer is not really the same as applying a {@link BOp} to + * the pipeline so it falls outside of the standard operator evaluation + * logic. + * + * @todo If we allow complex operator trees in which "subqueries" can also use a + * slice then either then need to run as their own query with their own + * {@link RunningQuery} state or the API for cancelling a running query as + * used here needs to only cancel evaluation of the child operators. + * Otherwise we could cancel all operator evaluation for the query, + * including operators which are ancestors of the {@link SliceOp}. + */ +public class SliceOp extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID ... [truncated message content] |