|
From: <tho...@us...> - 2010-09-05 18:16:10
|
Revision: 3509
http://bigdata.svn.sourceforge.net/bigdata/?rev=3509&view=rev
Author: thompsonbry
Date: 2010-09-05 18:16:01 +0000 (Sun, 05 Sep 2010)
Log Message:
-----------
Added a "slice" operator and a basic unit test for that operator.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/IQueryOptions.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/QueryOptions.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/NestedSubqueryWithJoinThreadsTask.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/RuleStats.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/JoinMasterTask.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/rule/TestSlice.java
branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestSlice.java
branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOrder.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SparqlBindingSetComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/package.html
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/ISlice.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/Slice.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSPARQLBindingSetComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java
Removed Paths:
-------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/package.html
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestMemorySortOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestSPARQLBindingSetComparatorOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSortBindingSets.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -32,6 +32,8 @@
import org.apache.log4j.Logger;
import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.QueryEngine;
+import com.bigdata.bop.solutions.SliceOp;
import com.bigdata.btree.IIndex;
import com.bigdata.btree.ILocalBTreeView;
import com.bigdata.btree.IRangeQuery;
@@ -652,6 +654,24 @@
}
+ /**
+ * Cancel the running query (normal termination).
+ * <p>
+ * Note: This method provides a means for an operator to indicate that the
+ * query should halt immediately. It used used by {@link SliceOp}, which
+ * needs to terminate the entire query once the slice has been satisfied.
+ * (If {@link SliceOp} just jumped out of its own evaluation loop then the
+ * query would not produce more results, but it would continue to run and
+ * the over produced results would just be thrown away.)
+ * <p>
+ * Note: When an individual {@link BOp} evaluation throws an exception, the
+ * {@link QueryEngine} will catch that exception and halt query evaluation
+ * with that thrown cause.
+ */
+ public void halt() {
+
+ }
+
/*
* I've replaced this with AbstractSplitter for the moment.
*/
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,77 +0,0 @@
-/**
-
-Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-*/
-/*
- * Created on Sep 4, 2010
- */
-
-package com.bigdata.bop.aggregation;
-
-import java.util.Comparator;
-import java.util.Map;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.BOpBase;
-import com.bigdata.bop.IBindingSet;
-
-/**
- * Base class for operators which impose a sort order on binding sets.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-abstract public class ComparatorOp extends BOpBase implements
- Comparator<IBindingSet> {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public interface Annotations extends BOp.Annotations {
-
- /**
- * An {@link ISortOrder}[] specifying the variables on which the sort
- * will be imposed and the order (ascending or descending) for each
- * variable.
- */
- String ORDER = ComparatorOp.class.getName() + ".order";
-
- }
-
- /**
- * @param op
- */
- public ComparatorOp(BOpBase op) {
- super(op);
- }
-
- /**
- * @param args
- * @param annotations
- */
- public ComparatorOp(BOp[] args, Map<String, Object> annotations) {
- super(args, annotations);
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,324 +0,0 @@
-package com.bigdata.bop.aggregation;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.FutureTask;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.BOpContext;
-import com.bigdata.bop.BindingSetPipelineOp;
-import com.bigdata.bop.HashBindingSet;
-import com.bigdata.bop.IBindingSet;
-import com.bigdata.bop.IConstant;
-import com.bigdata.bop.IVariable;
-import com.bigdata.bop.engine.BOpStats;
-import com.bigdata.relation.accesspath.IAsynchronousIterator;
-import com.bigdata.relation.accesspath.IBlockingBuffer;
-
-/**
- * A pipelined DISTINCT operator based on a hash table.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z
- * thompsonbry $
- */
-public class DistinctBindingSetOp extends BindingSetPipelineOp {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public interface Annotations extends BindingSetPipelineOp.Annotations {
-
- /**
- * The initial capacity of the {@link ConcurrentHashMap} used to impose
- * the distinct constraint.
- *
- * @see #DEFAULT_INITIAL_CAPACITY
- */
- String INITIAL_CAPACITY = DistinctBindingSetOp.class.getName()+".initialCapacity";
-
- int DEFAULT_INITIAL_CAPACITY = 16;
-
- /**
- * The load factor of the {@link ConcurrentHashMap} used to impose
- * the distinct constraint.
- *
- * @see #DEFAULT_LOAD_FACTOR
- */
- String LOAD_FACTOR = DistinctBindingSetOp.class.getName()+".loadFactor";
-
- float DEFAULT_LOAD_FACTOR = .75f;
-
- /**
- * The concurrency level of the {@link ConcurrentHashMap} used to impose
- * the distinct constraint.
- *
- * @see #DEFAULT_CONCURRENCY_LEVEL
- */
- String CONCURRENCY_LEVEL = DistinctBindingSetOp.class.getName()+".concurrencyLevel";
-
- int DEFAULT_CONCURRENCY_LEVEL = 16;
-
- /**
- * The variables on which the distinct constraint will be imposed.
- * Binding sets with distinct values for the specified variables will be
- * passed on.
- */
- String VARIABLES = DistinctBindingSetOp.class.getName() + ".variables";
-
- }
-
- /**
- * Required deep copy constructor.
- */
- public DistinctBindingSetOp(final DistinctBindingSetOp op) {
- super(op);
- }
-
- /**
- * Required shallow copy constructor.
- */
- public DistinctBindingSetOp(final BOp[] args,
- final Map<String, Object> annotations) {
-
- super(args, annotations);
-
- }
-
- /**
- * @see Annotations#INITIAL_CAPACITY
- */
- public int getInitialCapacity() {
-
- return getProperty(Annotations.INITIAL_CAPACITY,
- Annotations.DEFAULT_INITIAL_CAPACITY);
-
- }
-
- /**
- * @see Annotations#LOAD_FACTOR
- */
- public float getLoadFactor() {
-
- return getProperty(Annotations.LOAD_FACTOR,
- Annotations.DEFAULT_LOAD_FACTOR);
-
- }
-
- /**
- * @see Annotations#CONCURRENCY_LEVEL
- */
- public int getConcurrencyLevel() {
-
- return getProperty(Annotations.CONCURRENCY_LEVEL,
- Annotations.DEFAULT_CONCURRENCY_LEVEL);
-
- }
-
- /**
- * @see Annotations#VARIABLES
- */
- public IVariable<?>[] getVariables() {
-
- return getRequiredProperty(Annotations.VARIABLES);
-
- }
-
- public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
-
- return new FutureTask<Void>(new DistinctTask(this, context));
-
- }
-
- /**
- * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}.
- */
- private static class Solution {
- private final int hash;
-
- private final IConstant<?>[] vals;
-
- public Solution(final IConstant<?>[] vals) {
- this.vals = vals;
- this.hash = java.util.Arrays.hashCode(vals);
- }
-
- public int hashCode() {
- return hash;
- }
-
- public boolean equals(final Object o) {
- if (this == o)
- return true;
- if (!(o instanceof Solution)) {
- return false;
- }
- final Solution t = (Solution) o;
- if (vals.length != t.vals.length)
- return false;
- for (int i = 0; i < vals.length; i++) {
- // @todo verify that this allows for nulls with a unit test.
- if (vals[i] == t.vals[i])
- continue;
- if (vals[i] == null)
- return false;
- if (!vals[i].equals(t.vals[i]))
- return false;
- }
- return true;
- }
- }
-
- /**
- * Task executing on the node.
- */
- static private class DistinctTask implements Callable<Void> {
-
- private final BOpContext<IBindingSet> context;
-
- /**
- * A concurrent map whose keys are the bindings on the specified
- * variables (the keys and the values are the same since the map
- * implementation does not allow <code>null</code> values).
- */
- private /*final*/ ConcurrentHashMap<Solution, Solution> map;
-
- /**
- * The variables used to impose a distinct constraint.
- */
- private final IVariable<?>[] vars;
-
- DistinctTask(final DistinctBindingSetOp op,
- final BOpContext<IBindingSet> context) {
-
- this.context = context;
-
- this.vars = op.getVariables();
-
- if (vars == null)
- throw new IllegalArgumentException();
-
- if (vars.length == 0)
- throw new IllegalArgumentException();
-
- this.map = new ConcurrentHashMap<Solution, Solution>(
- op.getInitialCapacity(), op.getLoadFactor(),
- op.getConcurrencyLevel());
-
- }
-
- /**
- * If the bindings are distinct for the configured variables then return
- * those bindings.
- *
- * @param bset
- * The binding set to be filtered.
- *
- * @return The distinct as bound values -or- <code>null</code> if the
- * binding set duplicates a solution which was already accepted.
- */
- private IConstant<?>[] accept(final IBindingSet bset) {
-
- final IConstant<?>[] r = new IConstant<?>[vars.length];
-
- for (int i = 0; i < vars.length; i++) {
-
- /*
- * Note: This allows null's.
- *
- * @todo write a unit test when some variables are not bound.
- */
- r[i] = bset.get(vars[i]);
-
- }
-
- final Solution s = new Solution(r);
-
- final boolean distinct = map.putIfAbsent(s, s) == null;
-
- return distinct ? r : null;
-
- }
-
- public Void call() throws Exception {
-
- final BOpStats stats = context.getStats();
-
- final IAsynchronousIterator<IBindingSet[]> itr = context
- .getSource();
-
- final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
-
- try {
-
- while (itr.hasNext()) {
-
- final IBindingSet[] a = itr.next();
-
- stats.chunksIn.increment();
- stats.unitsIn.add(a.length);
-
- final List<IBindingSet> accepted = new LinkedList<IBindingSet>();
-
- int naccepted = 0;
-
- for (IBindingSet bset : a) {
-
-// System.err.println("considering: " + bset);
-
- final IConstant<?>[] vals = accept(bset);
-
- if (vals != null) {
-
-// System.err.println("accepted: "
-// + Arrays.toString(vals));
-
- accepted.add(new HashBindingSet(vars, vals));
-
- naccepted++;
-
- }
-
- }
-
- if (naccepted > 0) {
-
- final IBindingSet[] b = accepted
- .toArray(new IBindingSet[naccepted]);
-
-// System.err.println("output: "
-// + Arrays.toString(b));
-
- sink.add(b);
-
- stats.unitsOut.add(naccepted);
- stats.chunksOut.increment();
-
- }
-
- }
-
- sink.flush();
-
- // done.
- return null;
-
- } finally {
-
- sink.close();
-
- // discard the map.
- map = null;
-
- }
-
- }
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISlice.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,69 +0,0 @@
-/*
-
-Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-*/
-/*
- * Created on Sep 24, 2008
- */
-
-package com.bigdata.bop.aggregation;
-
-import java.io.Serializable;
-
-import com.bigdata.relation.accesspath.IAccessPath;
-
-/**
- * Indicates the first solution to be returned to the caller (offset) and the
- * #of solutions to be returned (limit).
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-public interface ISlice extends Serializable {
-
- /**
- * The first solution to be returned to the caller. A value of ZERO (0)
- * indicates that all solutions should be returned.
- */
- public long getOffset();
-
- /**
- * The maximum #of solutions to be returned to the caller. A value of
- * {@link Long#MAX_VALUE} indicates that there is no limit.
- *
- * @todo modify to be consistent with
- * {@link IAccessPath#iterator(long, long, int)} where a limit of ZERO
- * (0L) is interpreted as NO limit and a limit of
- * {@link Long#MAX_VALUE} is interpreted as ZERO (0L) (that is, also
- * no limit).
- */
- public long getLimit();
-
- /**
- * The index of the last solution that we will generate (OFFSET + LIMIT). If
- * OFFSET + LIMIT would be greater than {@link Long#MAX_VALUE}, then use
- * {@link Long#MAX_VALUE} instead.
- */
- public long getLast();
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,55 +0,0 @@
-/*
-
-Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-*/
-/*
- * Created on Sep 24, 2008
- */
-
-package com.bigdata.bop.aggregation;
-
-import java.io.Serializable;
-
-import com.bigdata.bop.IVariable;
-
-/**
- * A variable and an order that will be imposed on the values for that variable.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-public interface ISortOrder<E> extends Serializable {
-
- /**
- * The variable whose values will be sorted.
- */
- IVariable<E> getVariable();
-
- /**
- * <code>true</code> iff the values will be placed into an ascending sort
- * and <code>false</code> if the values will be placed into a descending
- * sort.
- */
- boolean isAscending();
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,112 +0,0 @@
-package com.bigdata.bop.aggregation;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.BOpContext;
-import com.bigdata.bop.BOpUtility;
-import com.bigdata.bop.IBindingSet;
-import com.bigdata.bop.engine.BOpStats;
-import com.bigdata.relation.accesspath.IBlockingBuffer;
-
-/**
- * An in-memory merge sort for binding sets.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z
- * thompsonbry $
- *
- * @todo unit tests.
- * @todo do an external merge sort operator.
- */
-public class MemorySortOp extends SortOp {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * Required deep copy constructor.
- */
- public MemorySortOp(final MemorySortOp op) {
- super(op);
- }
-
- /**
- * Required shallow copy constructor.
- */
- public MemorySortOp(final BOp[] args,
- final Map<String, Object> annotations) {
-
- super(args, annotations);
-
- }
-
- public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
-
- return new FutureTask<Void>(new SortTask(this, context));
-
- }
-
- /**
- * Task executing on the node.
- */
- static private class SortTask implements Callable<Void> {
-
- private final BOpContext<IBindingSet> context;
-
- /**
- * The binding set comparator.
- */
- private final Comparator<IBindingSet> comparator;
-
- SortTask(final MemorySortOp op,
- final BOpContext<IBindingSet> context) {
-
- this.context = context;
-
- this.comparator = op.getComparator();
-
- }
-
- public Void call() throws Exception {
-
- final BOpStats stats = context.getStats();
-
- final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
-
- try {
-
- final IBindingSet[] all = BOpUtility.toArray(context
- .getSource(), stats);
-
- // sort.
- Arrays.sort(all, comparator);
-
- // update counters.
- stats.unitsOut.add(all.length);
- stats.chunksOut.increment();
-
- // write output and flush.
- sink.add(all);
- sink.flush();
-
- // done.
- return null;
-
- } finally {
-
- sink.close();
-
- }
-
- }
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Slice.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,129 +0,0 @@
-/*
-
-Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-*/
-/*
- * Created on Sep 24, 2008
- */
-
-package com.bigdata.bop.aggregation;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.math.BigInteger;
-
-/**
- * Default implementation.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-public class Slice implements ISlice, Externalizable {
-
- /**
- *
- */
- private static final long serialVersionUID = 5396509164843609197L;
-
- private long offset;
- private long limit;
- private long last;
-
- /**
- * A slice corresponding to all results (offset is zero, limit is
- * {@link Long#MAX_VALUE}).
- */
- public static final transient ISlice ALL = new Slice(0, Long.MAX_VALUE);
-
- /**
- *
- * @param offset
- * @param limit
- *
- * @throws IllegalArgumentException
- * if offset is negative.
- * @throws IllegalArgumentException
- * if limit is non-positive.
- */
- public Slice(final long offset, final long limit) {
-
- if (offset < 0)
- throw new IllegalArgumentException();
-
- if (limit <= 0)
- throw new IllegalArgumentException();
-
- this.offset = offset;
-
- this.limit = limit;
-
- // @todo what is a cheaper way to do this?
- this.last = BigInteger.valueOf(offset).add(BigInteger.valueOf(limit))
- .min(BigInteger.valueOf(Long.MAX_VALUE)).longValue();
-
- }
-
- public long getOffset() {
-
- return offset;
-
- }
-
- public long getLimit() {
-
- return limit;
-
- }
-
- public long getLast() {
-
- return last;
-
- }
-
- public String toString() {
-
- return "Slice{offset="+offset+", limit="+limit+", last="+last+"}";
-
- }
-
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
-
- offset = in.readLong();
- limit = in.readLong();
- last = in.readLong();
-
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
-
- out.writeLong(offset);
- out.writeLong(limit);
- out.writeLong(last);
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,86 +0,0 @@
-/**
-
-Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-*/
-/*
- * Created on Sep 4, 2010
- */
-
-package com.bigdata.bop.aggregation;
-
-import java.util.Map;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.BindingSetPipelineOp;
-import com.bigdata.bop.IBindingSet;
-import com.bigdata.bop.PipelineOp;
-
-/**
- * Base class for operators which sort binding sets.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-abstract public class SortOp extends BindingSetPipelineOp {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public interface Annotations extends BindingSetPipelineOp.Annotations {
-
- /**
- * The {@link ComparatorOp} which will impose the ordering on the
- * binding sets.
- *
- * @see ComparatorOp
- */
- String COMPARATOR = MemorySortOp.class.getName() + ".comparator";
-
- }
-
- /**
- * @param op
- */
- public SortOp(PipelineOp<IBindingSet> op) {
- super(op);
- }
-
- /**
- * @param args
- * @param annotations
- */
- public SortOp(BOp[] args, Map<String, Object> annotations) {
- super(args, annotations);
- }
-
- /**
- * @see Annotations#COMPARATOR
- */
- public ComparatorOp getComparator() {
-
- return getRequiredProperty(Annotations.COMPARATOR);
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SortOrder.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,80 +0,0 @@
-/*
-
-Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved.
-
-Contact:
- SYSTAP, LLC
- 4501 Tower Road
- Greensboro, NC 27410
- lic...@bi...
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; version 2 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-*/
-/*
- * Created on Sep 24, 2008
- */
-
-package com.bigdata.bop.aggregation;
-
-import com.bigdata.bop.IVariable;
-
-/**
- * Default impl.
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
- * @version $Id$
- */
-public class SortOrder<E> implements ISortOrder<E> {
-
- /**
- *
- */
- private static final long serialVersionUID = -669873421670514139L;
-
- private final IVariable<E> var;
- private final boolean asc;
-
- /**
- *
- * @param var
- * The variable.
- * @param asc
- * <code>true</code> for an ascending sort and
- * <code>false</code> for a descending sort.
- */
- public SortOrder(final IVariable<E> var, final boolean asc) {
-
- if (var == null)
- throw new IllegalArgumentException();
-
- this.var = var;
-
- this.asc = asc;
-
- }
-
- public IVariable<E> getVariable() {
-
- return var;
-
- }
-
- public boolean isAscending() {
-
- return asc;
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/SparqlBindingSetComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,101 +0,0 @@
-package com.bigdata.bop.aggregation;
-
-import java.util.Comparator;
-import java.util.Map;
-
-import org.openrdf.query.algebra.evaluation.util.ValueComparator;
-
-import com.bigdata.bop.BOp;
-import com.bigdata.bop.IBindingSet;
-import com.bigdata.bop.IVariable;
-
-/**
- * A comparator for SPARQL binding sets.
- *
- * @see http://www.w3.org/TR/rdf-sparql-query/#modOrderBy
- * @see ValueComparator
- *
- * @todo unit tests.
- */
-public class SparqlBindingSetComparatorOp extends ComparatorOp {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * Required deep copy constructor.
- */
- public SparqlBindingSetComparatorOp(final SparqlBindingSetComparatorOp op) {
- super(op);
- }
-
- /**
- * Required shallow copy constructor.
- */
- public SparqlBindingSetComparatorOp(final BOp[] args,
- final Map<String, Object> annotations) {
-
- super(args, annotations);
-
- }
-
- /**
- * @see Annotations#ORDER
- */
- public ISortOrder<?>[] getOrder() {
-
- return getRequiredProperty(Annotations.ORDER);
-
- }
-
- /**
- * The sort order to be imposed.
- */
- private transient ISortOrder<?>[] order;
-
- private transient Comparator vc;
-
- public int compare(final IBindingSet bs1, final IBindingSet bs2) {
-
- if (order == null) {
-
- // lazy initialization.
- order = getOrder();
-
- if (order == null)
- throw new IllegalArgumentException();
-
- if (order.length == 0)
- throw new IllegalArgumentException();
-
- // comparator for RDF Value objects.
- vc = new ValueComparator();
-
- }
-
- for (int i = 0; i < order.length; i++) {
-
- final ISortOrder<?> o = order[i];
-
- final IVariable v = o.getVariable();
-
- int ret = vc.compare(bs1.get(v).get(), bs2.get(v).get());
-
- if (!o.isAscending())
- ret = -ret;
-
- if (ret != 0) {
- // not equal for this variable.
- return ret;
- }
-
- }
-
- // equal for all variables.
- return 0;
-
- }
-
-}
Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html 2010-09-05 17:02:34 UTC (rev 3508)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/package.html 2010-09-05 18:16:01 UTC (rev 3509)
@@ -1,17 +0,0 @@
-<html>
-<head>
-<title>solution modifier operators (distinct, sort, slice, and aggregation)</title>
-</head>
-<body>
-
-<p>
-
- This package provides distinct, sort, and aggregation operators. All of
- these are potentially high volume hash partitioned operations against a
- clustered database. Both in memory and disk based versions of the each
- operator should be implemented.
-
-</p>
-
-</body>
-</html>
\ No newline at end of file
Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ComparatorOp.java)
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ComparatorOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -0,0 +1,77 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 4, 2010
+ */
+
+package com.bigdata.bop.solutions;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpBase;
+import com.bigdata.bop.IBindingSet;
+
+/**
+ * Base class for operators which impose a sort order on binding sets.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+abstract public class ComparatorOp extends BOpBase implements
+ Comparator<IBindingSet> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public interface Annotations extends BOp.Annotations {
+
+ /**
+ * An {@link ISortOrder}[] specifying the variables on which the sort
+ * will be imposed and the order (ascending or descending) for each
+ * variable.
+ */
+ String ORDER = ComparatorOp.class.getName() + ".order";
+
+ }
+
+ /**
+ * @param op
+ */
+ public ComparatorOp(BOpBase op) {
+ super(op);
+ }
+
+ /**
+ * @param args
+ * @param annotations
+ */
+ public ComparatorOp(BOp[] args, Map<String, Object> annotations) {
+ super(args, annotations);
+ }
+
+}
Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java)
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -0,0 +1,324 @@
+package com.bigdata.bop.solutions;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.FutureTask;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpContext;
+import com.bigdata.bop.BindingSetPipelineOp;
+import com.bigdata.bop.HashBindingSet;
+import com.bigdata.bop.IBindingSet;
+import com.bigdata.bop.IConstant;
+import com.bigdata.bop.IVariable;
+import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.relation.accesspath.IAsynchronousIterator;
+import com.bigdata.relation.accesspath.IBlockingBuffer;
+
+/**
+ * A pipelined DISTINCT operator based on a hash table.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z
+ * thompsonbry $
+ */
+public class DistinctBindingSetOp extends BindingSetPipelineOp {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public interface Annotations extends BindingSetPipelineOp.Annotations {
+
+ /**
+ * The initial capacity of the {@link ConcurrentHashMap} used to impose
+ * the distinct constraint.
+ *
+ * @see #DEFAULT_INITIAL_CAPACITY
+ */
+ String INITIAL_CAPACITY = DistinctBindingSetOp.class.getName()+".initialCapacity";
+
+ int DEFAULT_INITIAL_CAPACITY = 16;
+
+ /**
+ * The load factor of the {@link ConcurrentHashMap} used to impose
+ * the distinct constraint.
+ *
+ * @see #DEFAULT_LOAD_FACTOR
+ */
+ String LOAD_FACTOR = DistinctBindingSetOp.class.getName()+".loadFactor";
+
+ float DEFAULT_LOAD_FACTOR = .75f;
+
+ /**
+ * The concurrency level of the {@link ConcurrentHashMap} used to impose
+ * the distinct constraint.
+ *
+ * @see #DEFAULT_CONCURRENCY_LEVEL
+ */
+ String CONCURRENCY_LEVEL = DistinctBindingSetOp.class.getName()+".concurrencyLevel";
+
+ int DEFAULT_CONCURRENCY_LEVEL = 16;
+
+ /**
+ * The variables on which the distinct constraint will be imposed.
+ * Binding sets with distinct values for the specified variables will be
+ * passed on.
+ */
+ String VARIABLES = DistinctBindingSetOp.class.getName() + ".variables";
+
+ }
+
+ /**
+ * Required deep copy constructor.
+ */
+ public DistinctBindingSetOp(final DistinctBindingSetOp op) {
+ super(op);
+ }
+
+ /**
+ * Required shallow copy constructor.
+ */
+ public DistinctBindingSetOp(final BOp[] args,
+ final Map<String, Object> annotations) {
+
+ super(args, annotations);
+
+ }
+
+ /**
+ * @see Annotations#INITIAL_CAPACITY
+ */
+ public int getInitialCapacity() {
+
+ return getProperty(Annotations.INITIAL_CAPACITY,
+ Annotations.DEFAULT_INITIAL_CAPACITY);
+
+ }
+
+ /**
+ * @see Annotations#LOAD_FACTOR
+ */
+ public float getLoadFactor() {
+
+ return getProperty(Annotations.LOAD_FACTOR,
+ Annotations.DEFAULT_LOAD_FACTOR);
+
+ }
+
+ /**
+ * @see Annotations#CONCURRENCY_LEVEL
+ */
+ public int getConcurrencyLevel() {
+
+ return getProperty(Annotations.CONCURRENCY_LEVEL,
+ Annotations.DEFAULT_CONCURRENCY_LEVEL);
+
+ }
+
+ /**
+ * @see Annotations#VARIABLES
+ */
+ public IVariable<?>[] getVariables() {
+
+ return getRequiredProperty(Annotations.VARIABLES);
+
+ }
+
+ public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
+
+ return new FutureTask<Void>(new DistinctTask(this, context));
+
+ }
+
+ /**
+ * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}.
+ */
+ private static class Solution {
+ private final int hash;
+
+ private final IConstant<?>[] vals;
+
+ public Solution(final IConstant<?>[] vals) {
+ this.vals = vals;
+ this.hash = java.util.Arrays.hashCode(vals);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof Solution)) {
+ return false;
+ }
+ final Solution t = (Solution) o;
+ if (vals.length != t.vals.length)
+ return false;
+ for (int i = 0; i < vals.length; i++) {
+ // @todo verify that this allows for nulls with a unit test.
+ if (vals[i] == t.vals[i])
+ continue;
+ if (vals[i] == null)
+ return false;
+ if (!vals[i].equals(t.vals[i]))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Task executing on the node.
+ */
+ static private class DistinctTask implements Callable<Void> {
+
+ private final BOpContext<IBindingSet> context;
+
+ /**
+ * A concurrent map whose keys are the bindings on the specified
+ * variables (the keys and the values are the same since the map
+ * implementation does not allow <code>null</code> values).
+ */
+ private /*final*/ ConcurrentHashMap<Solution, Solution> map;
+
+ /**
+ * The variables used to impose a distinct constraint.
+ */
+ private final IVariable<?>[] vars;
+
+ DistinctTask(final DistinctBindingSetOp op,
+ final BOpContext<IBindingSet> context) {
+
+ this.context = context;
+
+ this.vars = op.getVariables();
+
+ if (vars == null)
+ throw new IllegalArgumentException();
+
+ if (vars.length == 0)
+ throw new IllegalArgumentException();
+
+ this.map = new ConcurrentHashMap<Solution, Solution>(
+ op.getInitialCapacity(), op.getLoadFactor(),
+ op.getConcurrencyLevel());
+
+ }
+
+ /**
+ * If the bindings are distinct for the configured variables then return
+ * those bindings.
+ *
+ * @param bset
+ * The binding set to be filtered.
+ *
+ * @return The distinct as bound values -or- <code>null</code> if the
+ * binding set duplicates a solution which was already accepted.
+ */
+ private IConstant<?>[] accept(final IBindingSet bset) {
+
+ final IConstant<?>[] r = new IConstant<?>[vars.length];
+
+ for (int i = 0; i < vars.length; i++) {
+
+ /*
+ * Note: This allows null's.
+ *
+ * @todo write a unit test when some variables are not bound.
+ */
+ r[i] = bset.get(vars[i]);
+
+ }
+
+ final Solution s = new Solution(r);
+
+ final boolean distinct = map.putIfAbsent(s, s) == null;
+
+ return distinct ? r : null;
+
+ }
+
+ public Void call() throws Exception {
+
+ final BOpStats stats = context.getStats();
+
+ final IAsynchronousIterator<IBindingSet[]> itr = context
+ .getSource();
+
+ final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
+
+ try {
+
+ while (itr.hasNext()) {
+
+ final IBindingSet[] a = itr.next();
+
+ stats.chunksIn.increment();
+ stats.unitsIn.add(a.length);
+
+ final List<IBindingSet> accepted = new LinkedList<IBindingSet>();
+
+ int naccepted = 0;
+
+ for (IBindingSet bset : a) {
+
+// System.err.println("considering: " + bset);
+
+ final IConstant<?>[] vals = accept(bset);
+
+ if (vals != null) {
+
+// System.err.println("accepted: "
+// + Arrays.toString(vals));
+
+ accepted.add(new HashBindingSet(vars, vals));
+
+ naccepted++;
+
+ }
+
+ }
+
+ if (naccepted > 0) {
+
+ final IBindingSet[] b = accepted
+ .toArray(new IBindingSet[naccepted]);
+
+// System.err.println("output: "
+// + Arrays.toString(b));
+
+ sink.add(b);
+
+ stats.unitsOut.add(naccepted);
+ stats.chunksOut.increment();
+
+ }
+
+ }
+
+ sink.flush();
+
+ // done.
+ return null;
+
+ } finally {
+
+ sink.close();
+
+ // discard the map.
+ map = null;
+
+ }
+
+ }
+
+ }
+
+}
Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/ISortOrder.java)
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -0,0 +1,55 @@
+/*
+
+Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+*/
+/*
+ * Created on Sep 24, 2008
+ */
+
+package com.bigdata.bop.solutions;
+
+import java.io.Serializable;
+
+import com.bigdata.bop.IVariable;
+
+/**
+ * A variable and an order that will be imposed on the values for that variable.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public interface ISortOrder<E> extends Serializable {
+
+ /**
+ * The variable whose values will be sorted.
+ */
+ IVariable<E> getVariable();
+
+ /**
+ * <code>true</code> iff the values will be placed into an ascending sort
+ * and <code>false</code> if the values will be placed into a descending
+ * sort.
+ */
+ boolean isAscending();
+
+}
Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/MemorySortOp.java)
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -0,0 +1,112 @@
+package com.bigdata.bop.solutions;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpContext;
+import com.bigdata.bop.BOpUtility;
+import com.bigdata.bop.IBindingSet;
+import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.relation.accesspath.IBlockingBuffer;
+
+/**
+ * An in-memory merge sort for binding sets.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z
+ * thompsonbry $
+ *
+ * @todo unit tests.
+ * @todo do an external merge sort operator.
+ */
+public class MemorySortOp extends SortOp {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Required deep copy constructor.
+ */
+ public MemorySortOp(final MemorySortOp op) {
+ super(op);
+ }
+
+ /**
+ * Required shallow copy constructor.
+ */
+ public MemorySortOp(final BOp[] args,
+ final Map<String, Object> annotations) {
+
+ super(args, annotations);
+
+ }
+
+ public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
+
+ return new FutureTask<Void>(new SortTask(this, context));
+
+ }
+
+ /**
+ * Task executing on the node.
+ */
+ static private class SortTask implements Callable<Void> {
+
+ private final BOpContext<IBindingSet> context;
+
+ /**
+ * The binding set comparator.
+ */
+ private final Comparator<IBindingSet> comparator;
+
+ SortTask(final MemorySortOp op,
+ final BOpContext<IBindingSet> context) {
+
+ this.context = context;
+
+ this.comparator = op.getComparator();
+
+ }
+
+ public Void call() throws Exception {
+
+ final BOpStats stats = context.getStats();
+
+ final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
+
+ try {
+
+ final IBindingSet[] all = BOpUtility.toArray(context
+ .getSource(), stats);
+
+ // sort.
+ Arrays.sort(all, comparator);
+
+ // update counters.
+ stats.unitsOut.add(all.length);
+ stats.chunksOut.increment();
+
+ // write output and flush.
+ sink.add(all);
+ sink.flush();
+
+ // done.
+ return null;
+
+ } finally {
+
+ sink.close();
+
+ }
+
+ }
+
+ }
+
+}
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-05 18:16:01 UTC (rev 3509)
@@ -0,0 +1,275 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 5, 2010
+ */
+
+package com.bigdata.bop.solutions;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpContext;
+import com.bigdata.bop.BindingSetPipelineOp;
+import com.bigdata.bop.IBindingSet;
+import com.bigdata.bop.PipelineOp;
+import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.RunningQuery;
+import com.bigdata.relation.accesspath.IAsynchronousIterator;
+import com.bigdata.relation.accesspath.IBlockingBuffer;
+import com.bigdata.relation.accesspath.UnsynchronizedArrayBuffer;
+import com.bigdata.service.IBigdataFederation;
+
+/**
+ * An operator which imposes an offset/limit on a binding set pipeline.
+ * <p>
+ * Note: join processing typically involves concurrent processes, hence the
+ * order of the results will not be stable unless the results are sorted before
+ * applying the slice. When a slice is applied without a sort, the same query
+ * may return different results each time it is evaluated.
+ * <p>
+ * Note: When running on an {@link IBigdataFederation}, this operator must be
+ * imposed on the query controller so it can count the solutions as they flow
+ * through.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ *
+ * @todo If this operator is invoked for each chunk output by a query onto the
+ * pipeline then it will over produce unless (A) it is given the same
+ * {@link BOpStats} each time; and (B) it is not invoked for two chunks
+ * concurrently.
+ * <p>
+ * A safer way to impose the slice constraint is by wrapping the query
+ * buffer on the query controller. Once the slice is satisfied, it can
+ * just cancel the query. The only drawback of this approach is that the
+ * wrapping a buffer is not really the same as applying a {@link BOp} to
+ * the pipeline so it falls outside of the standard operator evaluation
+ * logic.
+ *
+ * @todo If we allow complex operator trees in which "subqueries" can also use a
+ * slice then either then need to run as their own query with their own
+ * {@link RunningQuery} state or the API for cancelling a running query as
+ * used here needs to only cancel evaluation of the child operators.
+ * Otherwise we could cancel all operator evaluation for the query,
+ * including operators which are ancestors of the {@link SliceOp}.
+ */
+public class SliceOp extends BindingSetPipelineOp {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID ...
[truncated message content] |