|
From: <tho...@us...> - 2010-09-20 20:14:36
|
Revision: 3599
http://bigdata.svn.sourceforge.net/bigdata/?rev=3599&view=rev
Author: thompsonbry
Date: 2010-09-20 20:14:28 +0000 (Mon, 20 Sep 2010)
Log Message:
-----------
Added a Tee and a DataSetJoin operator for use in named and default graph queries.
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java
branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java
Added Paths:
-----------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -195,7 +195,7 @@
* identifier for the {@link BOp} within the context of its owning
* query.
*/
- String BOP_ID = "bopId";
+ String BOP_ID = BOp.class.getName()+".bopId";
/**
* The timeout for the operator evaluation (milliseconds).
@@ -210,7 +210,7 @@
* be interpreted with respect to the time when the query began to
* execute.
*/
- String TIMEOUT = "timeout";
+ String TIMEOUT = BOp.class.getName()+".timeout";
/**
* The default timeout for operator evaluation.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -48,9 +48,16 @@
/**
* The value of the annotation is the {@link BOp.Annotations#BOP_ID} of
- * the ancestor in the operator tree which serves as an alternative sink
- * for binding sets.
+ * the ancestor in the operator tree which serves as the default sink
+ * for binding sets (default is the parent).
*/
+ String SINK_REF = BindingSetPipelineOp.class.getName() + ".sinkRef";
+
+ /**
+ * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of
+ * the ancestor in the operator tree which serves as the alternative
+ * sink for binding sets (default is no alternative sink).
+ */
String ALT_SINK_REF = BindingSetPipelineOp.class.getName()
+ ".altSinkRef";
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -41,14 +41,13 @@
import com.bigdata.relation.accesspath.IBlockingBuffer;
/**
- * This operator copies its source to its sink. It is used to feed the first
- * join in the pipeline. The operator should have no children but may be
- * decorated with annotations as necessary.
+ * This operator copies its source to its sink. Specializations exist which are
+ * used to feed the the initial set of intermediate results into a pipeline (
+ * {@link StartOp}) and which are used to replicate intermediate results to more
+ * than one sink ({@link Tee}).
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id$
- *
- * @todo unit tests.
*/
public class CopyBindingSetOp extends BindingSetPipelineOp {
@@ -99,8 +98,10 @@
}
public Void call() throws Exception {
- final IAsynchronousIterator<IBindingSet[]> source = context.getSource();
+ final IAsynchronousIterator<IBindingSet[]> source = context
+ .getSource();
final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
+ final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2();
try {
final BOpStats stats = context.getStats();
while (source.hasNext()) {
@@ -108,11 +109,17 @@
stats.chunksIn.increment();
stats.unitsIn.add(chunk.length);
sink.add(chunk);
+ if (sink2 != null)
+ sink2.add(chunk);
}
sink.flush();
+ if (sink2 != null)
+ sink2.flush();
return null;
} finally {
sink.close();
+ if (sink2 != null)
+ sink2.close();
source.close();
}
}
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,118 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 20, 2010
+ */
+
+package com.bigdata.bop.bset;
+
+import java.util.Map;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpEvaluationContext;
+import com.bigdata.bop.BindingSetPipelineOp;
+import com.bigdata.bop.join.PipelineJoin;
+import com.bigdata.rdf.rules.TMUtility;
+import com.bigdata.relation.RelationFusedView;
+import com.bigdata.relation.rule.Slice;
+
+/**
+ * TEE(op):[sinkRef=X; altSinkRef=Y]
+ * <p>
+ * Pipeline operator copies its source to both sink and altSink. The sink and
+ * the altSink must both be ancestors of the operator. The sinkRef MAY be
+ * omitted when one of the targets is the immediate parent of the TEE.
+ * Evaluation scope: {@link BOpEvaluationContext#ANY}.
+ * <p>
+ * <h2>Example - Truth Maintenance</h2>
+ * <p>
+ * In truth maintenance we establish a focus store which is brought to a fixed
+ * point by applying some rules and a transitive closure operator. Once the
+ * fixed point is reached, the assertions in the focus store are either inserted
+ * onto the database or (for retraction) removed from database unless a proof
+ * can be found that an assertion is still entailed.
+ * <p>
+ * The {@link Tee} operator can be used in truth maintenance to read on the
+ * UNION of the focus store and the database - see {@link TMUtility}. This is
+ * handled as the "union" of two JOINs using a {@link Tee} as follows:
+ *
+ * <pre>
+ * slice := SLICE( join2 )[bopId=3]
+ * join2 := JOIN( join1, bar.spo(A,loves,B))[bopId=2]
+ * join1 := JOIN( tee, foo.spo(A,loves,B))[bopId=1; sinkRef=3]
+ * tee := TEE( ... )[altSinkRef=2],
+ * </pre>
+ *
+ * The {@link Tee} copies its inputs to both the default sink (its parent, which
+ * is join1) and the alternate sink (join2). join1 routes its outputs around
+ * join2, sending them directly to their lowest common ancestor. This has the
+ * effect of creating a union of their outputs at the receiver. In this example,
+ * a {@link Slice} is used as the target for both of the join operators. Since
+ * this is a pipeline construction, the joins will be evaluated in parallel as
+ * intermediate results arrive for those operators. Normally the {@link Tee}
+ * will be fed by a {@link StartOp} or another {@link PipelineJoin}.
+ *
+ * @todo The union of access paths was historically handled by
+ * {@link RelationFusedView}. That class should be removed once queries
+ * are rewritten to use the union of joins.
+ *
+ * @todo The {@link TMUtility} will have to be updated to use this operator
+ * rather than specifying multiple source "names" for the relation of the
+ * predicate.
+ *
+ * @todo The FastClosureRuleTask will also need to be updated to use a
+ * {@link Union} over the joins rather than a {@link RelationFusedView}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class Tee extends CopyBindingSetOp {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Deep copy constructor.
+ * @param op
+ */
+ public Tee(final Tee op) {
+ super(op);
+ }
+
+ /**
+ * Shallow copy constructor.
+ * @param args
+ * @param annotations
+ */
+ public Tee(BOp[] args, Map<String, Object> annotations) {
+
+ super(args, annotations);
+
+ getRequiredProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF);
+
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -31,6 +31,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
+import com.bigdata.bop.BOp;
import com.bigdata.bop.BOpContext;
import com.bigdata.bop.BindingSetPipelineOp;
import com.bigdata.bop.IBindingSet;
@@ -40,42 +41,29 @@
import com.bigdata.util.concurrent.Haltable;
/**
- * The union of two or more {@link BindingSetPipelineOp} operators.
+ * UNION(ops)[maxParallel(default all)]
+ * <p>
+ * Executes each of the operands in the union as a subqueries. Each subquery is
+ * run as a separate query but is linked to the parent query in which the UNION
+ * is being evaluated. The subqueries do not receive bindings from the parent
+ * and may be executed independently. By default, the subqueries are run with
+ * unlimited parallelism.
+ * <p>
+ * UNION is useful when independent queries are evaluated and their outputs are
+ * merged. Outputs from the UNION operator flow to the parent operator and will
+ * be mapped across shards or nodes as appropriate for the parent. UNION runs on
+ * the query controller. In order to avoid routing intermediate results through
+ * the controller, the {@link BindingSetPipelineOp.Annotations#SINK_REF} of each
+ * child operand should be overriden to specify the parent of the UNION
+ * operator.
+ * <p>
+ * UNION can not be used when the intermediate results must be routed into the
+ * subqueries. However, a {@link Tee} pattern may help in such cases. For
+ * example, a {@link Tee} may be used to create a union of pipeline joins for
+ * two access paths during truth maintenance.
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id$
- *
- * @todo I have some basic questions about the ability to use a UNION of two
- * predicates in scale-out. I think that this might be more accurately
- * modeled as the UNION of two joins. That is, rather than:
- *
- * <pre>
- * JOIN( ...,
- * UNION( foo.spo(A,loves,B),
- * bar.spo(A,loves,B) )
- * )
- * </pre>
- *
- * using
- *
- * <pre>
- * UNION( JOIN( ..., foo.spo(A,loves,B) ),
- * JOIN( ..., bar.spo(A,loves,B) )
- * )
- * </pre>
- *
- * which would be a binding set union rather than an element union.
- *
- * @todo The union of access paths was historically handled by
- * {@link RelationFusedView}. That class should be removed once queries
- * are rewritten to use the union of joins.
- *
- * @todo The {@link TMUtility} will have to be updated to use this operator
- * rather than specifying multiple source "names" for the relation of the
- * predicate.
- *
- * @todo The FastClosureRuleTask will also need to be updated to use a
- * {@link Union} over the joins rather than a {@link RelationFusedView}.
*/
public class Union extends BindingSetPipelineOp {
@@ -101,35 +89,35 @@
public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
- return new FutureTask<Void>(new UnionTask(this, context));
-
+// return new FutureTask<Void>(new UnionTask(this, context));
+ throw new UnsupportedOperationException();
}
- /**
- * Pipeline union impl.
- *
- * FIXME All this does is copy its inputs to its outputs. Since we only run
- * one chunk of input at a time, it seems that the easiest way to implement
- * a union is to have the operators in the union just target the same sink.
- */
- private static class UnionTask extends Haltable<Void> implements Callable<Void> {
-
- public UnionTask(//
- final Union op,//
- final BOpContext<IBindingSet> context
- ) {
-
- if (op == null)
- throw new IllegalArgumentException();
- if (context == null)
- throw new IllegalArgumentException();
- }
-
- public Void call() throws Exception {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
- }
-
- }
+// /**
+// * Pipeline union impl.
+// *
+// * FIXME All this does is copy its inputs to its outputs. Since we only run
+// * one chunk of input at a time, it seems that the easiest way to implement
+// * a union is to have the operators in the union just target the same sink.
+// */
+// private static class UnionTask extends Haltable<Void> implements Callable<Void> {
+//
+// public UnionTask(//
+// final Union op,//
+// final BOpContext<IBindingSet> context
+// ) {
+//
+// if (op == null)
+// throw new IllegalArgumentException();
+// if (context == null)
+// throw new IllegalArgumentException();
+// }
+//
+// public Void call() throws Exception {
+// // TODO Auto-generated method stub
+// throw new UnsupportedOperationException();
+// }
+//
+// }
}
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,160 @@
+RunningQuery:
+
+ * FIXME Raise this into an annotation that we can tweak from the unit
+ * tests and then debug the problem.
+ *
+ * FIXME Add an annotation or method to mark operators which must be
+ * evaluated using operator-at-a-time evaluation. SORT is the main
+ * example here (it must be operator at a time of necessity) but other
+ * operators may implemented with operator at a time assumptions. This
+ * might be on PipelineOp and could be trinary {Chunked,Blocked,All}.
+
+Note: Many of the maxParallel annotations related to thread consumption will go
+away with Java7 and async file IO. Other annotations, such as the #of 1M buffers
+to allocate to an operator, need to be introduced to handle high volume queries.
+
+Note: UNION, STEPS, and STAR(transitive closure) are all evaluated on the query
+controller.
+
+---
+UNION(ops)[maxParallel(default all)]
+
+Executes each of the operands in the union as subqueries. Each subquery is run
+as a separate RunningQuery but is linked to the parent query in which the UNION
+is being evaluated. The subqueries do not receive bindings from the parent and
+may be executed independently.
+
+---
+STEPS(ops)[maxParallel(default 1)]
+
+The operands are executed as independent subqueries. Unlike UNION, STEPS does
+not copy its source binding sets.
+
+---
+
+STAR(op) [maxItr(default all)]
+
+Evaluate the operand until its mutation count remains unchanged from one round
+to the next. The operand must write on a resource. The fixed point is determined
+by examining BOPStats.mutationCount.
+
+Do with INSERT/REMOVE since all involve mutation.
+
+---
+DataSetJoin([left,var])[graphs={graphIds}; maxParallel=50]
+
+SPARQL specific join binds var to each of the given graphIds values for each
+source binding set. This join operator is useful when the multiplicity of the
+graphIds set is modest (between 2 and ~5000). This differs from a pipeline join
+by joining against inline data and by being more specialized (it lacks a pred).
+An alternative would be to develop an inline access path and then specify a std
+predicate which references the data in its annotation. That could then generalize
+to a predicate which references persistent data, query or tx local data, or inline
+data. However, the DataSetJoin is still far simpler since it just binds the var
+and send out the asBound binding set and does not need to worry about internal
+parallelism, alternative sinks, or chunking.
+
+Note: SPARQL default graph queries require us to apply a
+distinct {s,p,o} filter to each default graph access path. For scale-out, that
+is a distributed distinct access path filter. A DHT is used when the scale is
+moderate. A distributed external merge sort SORT is used when the scale is very
+large.
+
+Special cases exist for:
+
+ - Whenever C is a constant, we are guaranteed that the SPO will be distinct and
+ do not need to apply a distributed distinct filter.
+
+ - The SPOC access path can be optimized because we know that C is strictly
+ ascending. We can note the last observed {s,p,o} and skip to the next possible
+ o in the index (o:=o+1) using an advancer pattern (this could also just scan
+ until o changes). These are the possibly distinct {s,p,o} triples, which can
+ then be sent to the DHT unless we have a guarantee that S never crosses a
+ shard boundary (this is trivially true for standalone can this constraint can
+ be imposed on scale-out, but can cause problems if some subjects are very
+ highly referenced).
+
+ - ?
+
+---
+INSERT(op,pred) : insert elements into an index.
+DELETE(op,pred) : remove elements from an index.
+
+The access path mutation operators construct elements from the source binding
+sets and the asBBound predicates. For each element so constructed, they insert/
+remove the corresponding element into/from the access path. These operators
+update a mutation counter IFF the access path was modified for the constructed
+element. STAR relies on the mutation operator to detect a fixed point.
+
+The mutation access paths need to use the appropriate concurrency control to
+ensure the constraint on the mutable B+Tree is respected. This is either
+the UnisolatedReadWriteIndex or the LockManager/ConcurrencyManager.
+
+The basic mutation operators write on an access path and may be combined using
+STEPS in order to update all of the indices associated with a relation.
+
+ - For incremental TM, we also need to construct an element for the just index
+ from the rule and assert it onto that index.
+
+ - For the lexicon, we also need to write on the full text index.
+
+ - For SIDs mode, we also need to capture the logic to ground the statements by
+ binding the SIDs.
+
+ - triggers could be integrated here. perhaps events backed by a queue which
+ could be either restart safe or query local?
+
+----
+Parallel distributed closure : TBD. Review notes posted on trak.
+
+----
+done. TEE(op):[sinkRef=X; altSinkRef=Y]
+
+Pipeline operator copies its source to both sink and altSink. The sink and the
+altSink must both be ancestors of the operator. The sinkRef MAY be omitted when
+one of the targets is the immediate parent of the TEE. Evaluation scope: ANY.
+
+TM rules. JOIN of AP UNION is the same as the UNION of JOINs of the APs. This
+gets translated into a pattern of routing in the pipeline such that the two JOINs
+appear one after the other and the first join has its default sink _overridden_
+to reference the same target as the second join. This has the effect of creating
+a union of their outputs at the receiver and the benefit that the JOINs run in
+parallel.
+
+- We MUST also satisfy the requirement that the source binding sets are seen by
+both joins. This can be done using an operator which copies its source binding
+sets to both its default and alternative sinks. That would be an ANY scope op.
+
+This is basically an OR "pattern".
+
+----
+Lexicon joins -
+
+====
+Features:
+
+ - operator-at-once evaluation. The operator is triggered once its possible
+ triggers are done. This is just an application of the same utility method
+ which we use to decide when a query is done.
+
+ - subquery evaluation (linked parent to child). a subquery may be cancelled
+ by a slice without cancelling the parent. cancelling the parent terminates
+ all subqueries. whenever a query or subquery is terminated, we need to go
+ through its operator and query life cycle tear down methods (unit tests).
+
+ - default graph access path using DHT. See DataSetJoin, which has some notes.
+
+ - query and connection local resources: creating, destroying and using resources.
+ references to query local resources permit reuse of intermediate results across
+ different.
+
+ CREATE FOO AS TEMP GRAPH ON LOCAL TEMP STORE SPO ONLY SHARED LEXICON
+
+ - "thick" resources which can be sent along with the query or access either by
+ RMI or copied to the node where the query is running on demand. (This could
+ be just alternative access path instantiations which are selected by the query
+ optimizer or defaulted based on the amount of data to be moved to/from the
+ node if not specified.)
+
+ - The predicate could have fromRevision/toRevision annotations which would be
+ used for fast computation of the delta between two historical commit points.
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,254 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 20, 2010
+ */
+
+package com.bigdata.bop.rdf.join;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.bigdata.bop.BOp;
+import com.bigdata.bop.BOpContext;
+import com.bigdata.bop.BOpEvaluationContext;
+import com.bigdata.bop.BindingSetPipelineOp;
+import com.bigdata.bop.Constant;
+import com.bigdata.bop.IBindingSet;
+import com.bigdata.bop.IVariable;
+import com.bigdata.bop.constraint.INConstraint;
+import com.bigdata.bop.engine.BOpStats;
+import com.bigdata.bop.engine.IChunkAccessor;
+import com.bigdata.bop.join.PipelineJoin;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.relation.accesspath.IAsynchronousIterator;
+import com.bigdata.relation.accesspath.IBlockingBuffer;
+
+/**
+ * DataSetJoin(left,var)[graphs={graphIds}; maxParallel=50]
+ * <p>
+ * SPARQL specific join binds <i>var</i> to each of the given graphIds values
+ * for each source binding set. This join operator is useful when the
+ * multiplicity of the graphs is small to moderate. If there are a very large
+ * number of graphs, then the operator tree is to cumbersome and you would do
+ * better off joining against an index (whether temporary or permanent)
+ * containing the graphs.
+ * <p>
+ * The evaluation context is {@link BOpEvaluationContext#ANY}.
+ *
+ * @todo An alternative would be to develop an inline access path and then
+ * specify a standard predicate which references the data in its
+ * annotation. That could then generalize to a predicate which references
+ * persistent data, query or tx local data, or inline data. However, the
+ * DataSetJoin is still far simpler since it just binds the var and send
+ * out the asBound binding set and does not need to worry about internal
+ * parallelism, alternative sinks, or chunking.
+ *
+ * @todo SPARQL default graph queries require us to apply a distinct {s,p,o}
+ * filter to each default graph access path. For scale-out, that is a
+ * distributed distinct access path filter. A DHT is used when the scale
+ * is moderate. A distributed external merge sort SORT is used when the
+ * scale is very large.
+ * <p>
+ * Special cases exist for:
+ * <ul>
+ *
+ * <li>Whenever C is a constant, we are guaranteed that the SPO will be
+ * distinct and do not need to apply a distributed distinct filter.</li>
+ * <li>
+ * The SPOC access path can be optimized because we know that C is
+ * strictly ascending. We can note the last observed {s,p,o} and skip to
+ * the next possible o in the index (o:=o+1) using an advancer pattern
+ * (this could also just scan until o changes). These are the possibly
+ * distinct {s,p,o} triples, which can then be sent to the DHT unless we
+ * have a guarantee that S never crosses a shard boundary (this is
+ * trivially true for standalone can this constraint can be imposed on
+ * scale-out, but can cause problems if some subjects are very highly
+ * referenced).</li>
+ * <li>
+ * There will be some cases where we do better by doing a
+ * {@link PipelineJoin} and filtering using an {@link INConstraint}.
+ * However, this is probably only true for very small temporary graphs and
+ * in high volume scale-out joins where a cost analysis shows that it will
+ * be more efficient to read all the shards with the IN filter.</li>
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class DataSetJoin extends BindingSetPipelineOp {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public interface Annotations extends BindingSetPipelineOp.Annotations {
+
+ /**
+ * The variable to be bound.
+ */
+ String VAR = DataSetJoin.class.getName() + ".var";
+
+ /**
+ * The {@link IV}s to be bound. This is logically a set and SHOULD NOT
+ * include duplicates. The elements in this array SHOULD be ordered for
+ * improved efficiency.
+ */
+ String GRAPHS = DataSetJoin.class.getName() + ".graphs";
+
+ }
+
+ /**
+ * Deep copy constructor.
+ *
+ * @param op
+ */
+ public DataSetJoin(DataSetJoin op) {
+ super(op);
+ }
+
+ /**
+ * Shallow copy constructor.
+ * @param args
+ * @param annotations
+ */
+ public DataSetJoin(BOp[] args, Map<String, Object> annotations) {
+ super(args, annotations);
+ getVar();
+ getGraphs();
+ }
+
+ public IVariable<?> getVar() {
+ return (IVariable<?>)getRequiredProperty(Annotations.VAR);
+ }
+
+ public IV[] getGraphs() {
+ return (IV[]) getRequiredProperty(Annotations.GRAPHS);
+ }
+
+ public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
+
+ return new FutureTask<Void>(new DataSetJoinTask(this,context));
+
+ }
+
+ /**
+ * Copy the source to the sink.
+ *
+ * @todo Optimize this. When using an {@link IChunkAccessor} we should be
+ * able to directly output the same chunk.
+ */
+ static private class DataSetJoinTask implements Callable<Void> {
+
+ private final DataSetJoin op;
+
+ private final BOpContext<IBindingSet> context;
+
+ private final IVariable<?> var;
+ private final IV[] graphs;
+
+ DataSetJoinTask(final DataSetJoin op,
+ final BOpContext<IBindingSet> context) {
+
+ this.op = op;
+
+ this.context = context;
+
+ var = op.getVar();
+
+ graphs = op.getGraphs();
+
+ }
+
+ /**
+ * FIXME unit tests.
+ */
+ public Void call() throws Exception {
+ final IAsynchronousIterator<IBindingSet[]> source = context
+ .getSource();
+ final IBlockingBuffer<IBindingSet[]> sink = context.getSink();
+ try {
+ final BOpStats stats = context.getStats();
+ while (source.hasNext()) {
+ final IBindingSet[] chunk = source.next();
+ stats.chunksIn.increment();
+ stats.unitsIn.add(chunk.length);
+ handleChunk_(chunk, sink);
+ }
+ sink.flush();
+ return null;
+ } finally {
+ sink.close();
+ source.close();
+ }
+ }
+
+ /**
+ * Cross product join. For each source binding set and each graph,
+ * output one binding set in which the variable is bound to that graph.
+ *
+ * @param chunk
+ * A chunk of {@link IBindingSet}s from the source.
+ * @param sink
+ * Where to write the data.
+ *
+ * @todo Should we choose the nesting order of the loops based on the
+ * multiplicity of the source chunk size and the #of graphs to be
+ * bound? That way the inner loop decides the chunk size of the
+ * output.
+ * <p>
+ * Should we always emit an asBound source chunk for a given
+ * graphId? That will cluster better when the target predicate is
+ * mapped over CSPO.
+ */
+ private void handleChunk_(final IBindingSet[] chunk,
+ final IBlockingBuffer<IBindingSet[]> sink) {
+
+ final IBindingSet[] chunkOut = new IBindingSet[chunk.length
+ * graphs.length];
+
+ int n = 0;
+
+ for (IBindingSet bset : chunk) {
+
+ for (IV c : graphs) {
+
+ bset = bset.clone();
+
+ bset.set(var, new Constant<IV>(c));
+
+ chunkOut[n++] = bset;
+
+ }
+
+ }
+
+ sink.add(chunkOut);
+
+ }
+
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -92,6 +92,9 @@
// join operators.
suite.addTest(com.bigdata.bop.join.TestAll.suite());
+ // Specialized RDF join operators : @todo move to bigdata-rdf.
+ suite.addTest(com.bigdata.bop.rdf.join.TestAll.suite());
+
// aggregation operators.
suite.addTest(com.bigdata.bop.solutions.TestAll.suite());
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -176,5 +176,14 @@
assertEquals(1L, stats.chunksOut.get());
}
+
+ /**
+ * {@link Tee} is just a specialized {@link CopyBindingSetOp} which requires
+ * that the alternate sink is also specified. Write a unit test of those
+ * semantics for {@link CopyBindingSetOp}.
+ */
+ public void test_copyToSinkAndAltSink() {
+ fail("write test");
+ }
}
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,70 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+package com.bigdata.bop.rdf;
+
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Aggregates test suites into increasing dependency order.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class TestAll extends TestCase {
+
+ /**
+ *
+ */
+ public TestAll() {
+
+ }
+
+ /**
+ * @param arg0
+ */
+ public TestAll(String arg0) {
+
+ super(arg0);
+
+ }
+
+ /**
+ * Returns a test that will run each of the implementation specific test
+ * suites in turn.
+ */
+ public static Test suite()
+ {
+
+ final TestSuite suite = new TestSuite("RDF operators");
+
+ suite.addTest(com.bigdata.bop.rdf.join.TestAll.suite());
+
+ return suite;
+
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,70 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+package com.bigdata.bop.rdf.join;
+
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Aggregates test suites into increasing dependency order.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class TestAll extends TestCase {
+
+ /**
+ *
+ */
+ public TestAll() {
+
+ }
+
+ /**
+ * @param arg0
+ */
+ public TestAll(String arg0) {
+
+ super(arg0);
+
+ }
+
+ /**
+ * Returns a test that will run each of the implementation specific test
+ * suites in turn.
+ */
+ public static Test suite()
+ {
+
+ final TestSuite suite = new TestSuite("RDF join operators");
+
+ suite.addTestSuite(TestDataSetJoin.class);
+
+ return suite;
+
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java (rev 0)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -0,0 +1,56 @@
+/**
+
+Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved.
+
+Contact:
+ SYSTAP, LLC
+ 4501 Tower Road
+ Greensboro, NC 27410
+ lic...@bi...
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+/*
+ * Created on Sep 20, 2010
+ */
+
+package com.bigdata.bop.rdf.join;
+
+import junit.framework.TestCase2;
+
+/**
+ * Test {@link DataSetJoin}
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @version $Id$
+ */
+public class TestDataSetJoin extends TestCase2 {
+
+ /**
+ *
+ */
+ public TestDataSetJoin() {
+ }
+
+ /**
+ * @param name
+ */
+ public TestDataSetJoin(String name) {
+ super(name);
+ }
+
+ public void test_something() {
+ fail("write tests");
+ }
+
+}
Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java
___________________________________________________________________
Added: svn:keywords
+ Id Date Revision Author HeadURL
Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2010-09-20 19:43:44 UTC (rev 3598)
+++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2010-09-20 20:14:28 UTC (rev 3599)
@@ -121,6 +121,7 @@
* that go around the sail.
*/
cxn.flush();//commit();
+ cxn.commit();//
if (log.isInfoEnabled()) {
log.info("\n" + sail.getDatabase().dumpStore());
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|