From: <tho...@us...> - 2010-09-20 20:14:36
|
Revision: 3599 http://bigdata.svn.sourceforge.net/bigdata/?rev=3599&view=rev Author: thompsonbry Date: 2010-09-20 20:14:28 +0000 (Mon, 20 Sep 2010) Log Message: ----------- Added a Tee and a DataSetJoin operator for use in named and default graph queries. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -195,7 +195,7 @@ * identifier for the {@link BOp} within the context of its owning * query. */ - String BOP_ID = "bopId"; + String BOP_ID = BOp.class.getName()+".bopId"; /** * The timeout for the operator evaluation (milliseconds). @@ -210,7 +210,7 @@ * be interpreted with respect to the time when the query began to * execute. */ - String TIMEOUT = "timeout"; + String TIMEOUT = BOp.class.getName()+".timeout"; /** * The default timeout for operator evaluation. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BindingSetPipelineOp.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -48,9 +48,16 @@ /** * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of - * the ancestor in the operator tree which serves as an alternative sink - * for binding sets. + * the ancestor in the operator tree which serves as the default sink + * for binding sets (default is the parent). */ + String SINK_REF = BindingSetPipelineOp.class.getName() + ".sinkRef"; + + /** + * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of + * the ancestor in the operator tree which serves as the alternative + * sink for binding sets (default is no alternative sink). + */ String ALT_SINK_REF = BindingSetPipelineOp.class.getName() + ".altSinkRef"; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -41,14 +41,13 @@ import com.bigdata.relation.accesspath.IBlockingBuffer; /** - * This operator copies its source to its sink. It is used to feed the first - * join in the pipeline. The operator should have no children but may be - * decorated with annotations as necessary. + * This operator copies its source to its sink. Specializations exist which are + * used to feed the the initial set of intermediate results into a pipeline ( + * {@link StartOp}) and which are used to replicate intermediate results to more + * than one sink ({@link Tee}). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo unit tests. */ public class CopyBindingSetOp extends BindingSetPipelineOp { @@ -99,8 +98,10 @@ } public Void call() throws Exception { - final IAsynchronousIterator<IBindingSet[]> source = context.getSource(); + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2(); try { final BOpStats stats = context.getStats(); while (source.hasNext()) { @@ -108,11 +109,17 @@ stats.chunksIn.increment(); stats.unitsIn.add(chunk.length); sink.add(chunk); + if (sink2 != null) + sink2.add(chunk); } sink.flush(); + if (sink2 != null) + sink2.flush(); return null; } finally { sink.close(); + if (sink2 != null) + sink2.close(); source.close(); } } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,118 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 20, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.rdf.rules.TMUtility; +import com.bigdata.relation.RelationFusedView; +import com.bigdata.relation.rule.Slice; + +/** + * TEE(op):[sinkRef=X; altSinkRef=Y] + * <p> + * Pipeline operator copies its source to both sink and altSink. The sink and + * the altSink must both be ancestors of the operator. The sinkRef MAY be + * omitted when one of the targets is the immediate parent of the TEE. + * Evaluation scope: {@link BOpEvaluationContext#ANY}. + * <p> + * <h2>Example - Truth Maintenance</h2> + * <p> + * In truth maintenance we establish a focus store which is brought to a fixed + * point by applying some rules and a transitive closure operator. Once the + * fixed point is reached, the assertions in the focus store are either inserted + * onto the database or (for retraction) removed from database unless a proof + * can be found that an assertion is still entailed. + * <p> + * The {@link Tee} operator can be used in truth maintenance to read on the + * UNION of the focus store and the database - see {@link TMUtility}. This is + * handled as the "union" of two JOINs using a {@link Tee} as follows: + * + * <pre> + * slice := SLICE( join2 )[bopId=3] + * join2 := JOIN( join1, bar.spo(A,loves,B))[bopId=2] + * join1 := JOIN( tee, foo.spo(A,loves,B))[bopId=1; sinkRef=3] + * tee := TEE( ... )[altSinkRef=2], + * </pre> + * + * The {@link Tee} copies its inputs to both the default sink (its parent, which + * is join1) and the alternate sink (join2). join1 routes its outputs around + * join2, sending them directly to their lowest common ancestor. This has the + * effect of creating a union of their outputs at the receiver. In this example, + * a {@link Slice} is used as the target for both of the join operators. Since + * this is a pipeline construction, the joins will be evaluated in parallel as + * intermediate results arrive for those operators. Normally the {@link Tee} + * will be fed by a {@link StartOp} or another {@link PipelineJoin}. + * + * @todo The union of access paths was historically handled by + * {@link RelationFusedView}. That class should be removed once queries + * are rewritten to use the union of joins. + * + * @todo The {@link TMUtility} will have to be updated to use this operator + * rather than specifying multiple source "names" for the relation of the + * predicate. + * + * @todo The FastClosureRuleTask will also need to be updated to use a + * {@link Union} over the joins rather than a {@link RelationFusedView}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Tee extends CopyBindingSetOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + * @param op + */ + public Tee(final Tee op) { + super(op); + } + + /** + * Shallow copy constructor. + * @param args + * @param annotations + */ + public Tee(BOp[] args, Map<String, Object> annotations) { + + super(args, annotations); + + getRequiredProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -31,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; @@ -40,42 +41,29 @@ import com.bigdata.util.concurrent.Haltable; /** - * The union of two or more {@link BindingSetPipelineOp} operators. + * UNION(ops)[maxParallel(default all)] + * <p> + * Executes each of the operands in the union as a subqueries. Each subquery is + * run as a separate query but is linked to the parent query in which the UNION + * is being evaluated. The subqueries do not receive bindings from the parent + * and may be executed independently. By default, the subqueries are run with + * unlimited parallelism. + * <p> + * UNION is useful when independent queries are evaluated and their outputs are + * merged. Outputs from the UNION operator flow to the parent operator and will + * be mapped across shards or nodes as appropriate for the parent. UNION runs on + * the query controller. In order to avoid routing intermediate results through + * the controller, the {@link BindingSetPipelineOp.Annotations#SINK_REF} of each + * child operand should be overriden to specify the parent of the UNION + * operator. + * <p> + * UNION can not be used when the intermediate results must be routed into the + * subqueries. However, a {@link Tee} pattern may help in such cases. For + * example, a {@link Tee} may be used to create a union of pipeline joins for + * two access paths during truth maintenance. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo I have some basic questions about the ability to use a UNION of two - * predicates in scale-out. I think that this might be more accurately - * modeled as the UNION of two joins. That is, rather than: - * - * <pre> - * JOIN( ..., - * UNION( foo.spo(A,loves,B), - * bar.spo(A,loves,B) ) - * ) - * </pre> - * - * using - * - * <pre> - * UNION( JOIN( ..., foo.spo(A,loves,B) ), - * JOIN( ..., bar.spo(A,loves,B) ) - * ) - * </pre> - * - * which would be a binding set union rather than an element union. - * - * @todo The union of access paths was historically handled by - * {@link RelationFusedView}. That class should be removed once queries - * are rewritten to use the union of joins. - * - * @todo The {@link TMUtility} will have to be updated to use this operator - * rather than specifying multiple source "names" for the relation of the - * predicate. - * - * @todo The FastClosureRuleTask will also need to be updated to use a - * {@link Union} over the joins rather than a {@link RelationFusedView}. */ public class Union extends BindingSetPipelineOp { @@ -101,35 +89,35 @@ public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - return new FutureTask<Void>(new UnionTask(this, context)); - +// return new FutureTask<Void>(new UnionTask(this, context)); + throw new UnsupportedOperationException(); } - /** - * Pipeline union impl. - * - * FIXME All this does is copy its inputs to its outputs. Since we only run - * one chunk of input at a time, it seems that the easiest way to implement - * a union is to have the operators in the union just target the same sink. - */ - private static class UnionTask extends Haltable<Void> implements Callable<Void> { - - public UnionTask(// - final Union op,// - final BOpContext<IBindingSet> context - ) { - - if (op == null) - throw new IllegalArgumentException(); - if (context == null) - throw new IllegalArgumentException(); - } - - public Void call() throws Exception { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - } +// /** +// * Pipeline union impl. +// * +// * FIXME All this does is copy its inputs to its outputs. Since we only run +// * one chunk of input at a time, it seems that the easiest way to implement +// * a union is to have the operators in the union just target the same sink. +// */ +// private static class UnionTask extends Haltable<Void> implements Callable<Void> { +// +// public UnionTask(// +// final Union op,// +// final BOpContext<IBindingSet> context +// ) { +// +// if (op == null) +// throw new IllegalArgumentException(); +// if (context == null) +// throw new IllegalArgumentException(); +// } +// +// public Void call() throws Exception { +// // TODO Auto-generated method stub +// throw new UnsupportedOperationException(); +// } +// +// } } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,160 @@ +RunningQuery: + + * FIXME Raise this into an annotation that we can tweak from the unit + * tests and then debug the problem. + * + * FIXME Add an annotation or method to mark operators which must be + * evaluated using operator-at-a-time evaluation. SORT is the main + * example here (it must be operator at a time of necessity) but other + * operators may implemented with operator at a time assumptions. This + * might be on PipelineOp and could be trinary {Chunked,Blocked,All}. + +Note: Many of the maxParallel annotations related to thread consumption will go +away with Java7 and async file IO. Other annotations, such as the #of 1M buffers +to allocate to an operator, need to be introduced to handle high volume queries. + +Note: UNION, STEPS, and STAR(transitive closure) are all evaluated on the query +controller. + +--- +UNION(ops)[maxParallel(default all)] + +Executes each of the operands in the union as subqueries. Each subquery is run +as a separate RunningQuery but is linked to the parent query in which the UNION +is being evaluated. The subqueries do not receive bindings from the parent and +may be executed independently. + +--- +STEPS(ops)[maxParallel(default 1)] + +The operands are executed as independent subqueries. Unlike UNION, STEPS does +not copy its source binding sets. + +--- + +STAR(op) [maxItr(default all)] + +Evaluate the operand until its mutation count remains unchanged from one round +to the next. The operand must write on a resource. The fixed point is determined +by examining BOPStats.mutationCount. + +Do with INSERT/REMOVE since all involve mutation. + +--- +DataSetJoin([left,var])[graphs={graphIds}; maxParallel=50] + +SPARQL specific join binds var to each of the given graphIds values for each +source binding set. This join operator is useful when the multiplicity of the +graphIds set is modest (between 2 and ~5000). This differs from a pipeline join +by joining against inline data and by being more specialized (it lacks a pred). +An alternative would be to develop an inline access path and then specify a std +predicate which references the data in its annotation. That could then generalize +to a predicate which references persistent data, query or tx local data, or inline +data. However, the DataSetJoin is still far simpler since it just binds the var +and send out the asBound binding set and does not need to worry about internal +parallelism, alternative sinks, or chunking. + +Note: SPARQL default graph queries require us to apply a +distinct {s,p,o} filter to each default graph access path. For scale-out, that +is a distributed distinct access path filter. A DHT is used when the scale is +moderate. A distributed external merge sort SORT is used when the scale is very +large. + +Special cases exist for: + + - Whenever C is a constant, we are guaranteed that the SPO will be distinct and + do not need to apply a distributed distinct filter. + + - The SPOC access path can be optimized because we know that C is strictly + ascending. We can note the last observed {s,p,o} and skip to the next possible + o in the index (o:=o+1) using an advancer pattern (this could also just scan + until o changes). These are the possibly distinct {s,p,o} triples, which can + then be sent to the DHT unless we have a guarantee that S never crosses a + shard boundary (this is trivially true for standalone can this constraint can + be imposed on scale-out, but can cause problems if some subjects are very + highly referenced). + + - ? + +--- +INSERT(op,pred) : insert elements into an index. +DELETE(op,pred) : remove elements from an index. + +The access path mutation operators construct elements from the source binding +sets and the asBBound predicates. For each element so constructed, they insert/ +remove the corresponding element into/from the access path. These operators +update a mutation counter IFF the access path was modified for the constructed +element. STAR relies on the mutation operator to detect a fixed point. + +The mutation access paths need to use the appropriate concurrency control to +ensure the constraint on the mutable B+Tree is respected. This is either +the UnisolatedReadWriteIndex or the LockManager/ConcurrencyManager. + +The basic mutation operators write on an access path and may be combined using +STEPS in order to update all of the indices associated with a relation. + + - For incremental TM, we also need to construct an element for the just index + from the rule and assert it onto that index. + + - For the lexicon, we also need to write on the full text index. + + - For SIDs mode, we also need to capture the logic to ground the statements by + binding the SIDs. + + - triggers could be integrated here. perhaps events backed by a queue which + could be either restart safe or query local? + +---- +Parallel distributed closure : TBD. Review notes posted on trak. + +---- +done. TEE(op):[sinkRef=X; altSinkRef=Y] + +Pipeline operator copies its source to both sink and altSink. The sink and the +altSink must both be ancestors of the operator. The sinkRef MAY be omitted when +one of the targets is the immediate parent of the TEE. Evaluation scope: ANY. + +TM rules. JOIN of AP UNION is the same as the UNION of JOINs of the APs. This +gets translated into a pattern of routing in the pipeline such that the two JOINs +appear one after the other and the first join has its default sink _overridden_ +to reference the same target as the second join. This has the effect of creating +a union of their outputs at the receiver and the benefit that the JOINs run in +parallel. + +- We MUST also satisfy the requirement that the source binding sets are seen by +both joins. This can be done using an operator which copies its source binding +sets to both its default and alternative sinks. That would be an ANY scope op. + +This is basically an OR "pattern". + +---- +Lexicon joins - + +==== +Features: + + - operator-at-once evaluation. The operator is triggered once its possible + triggers are done. This is just an application of the same utility method + which we use to decide when a query is done. + + - subquery evaluation (linked parent to child). a subquery may be cancelled + by a slice without cancelling the parent. cancelling the parent terminates + all subqueries. whenever a query or subquery is terminated, we need to go + through its operator and query life cycle tear down methods (unit tests). + + - default graph access path using DHT. See DataSetJoin, which has some notes. + + - query and connection local resources: creating, destroying and using resources. + references to query local resources permit reuse of intermediate results across + different. + + CREATE FOO AS TEMP GRAPH ON LOCAL TEMP STORE SPO ONLY SHARED LEXICON + + - "thick" resources which can be sent along with the query or access either by + RMI or copied to the node where the query is running on demand. (This could + be just alternative access path instantiations which are selected by the query + optimizer or defaulted based on the amount of data to be moved to/from the + node if not specified.) + + - The predicate could have fromRevision/toRevision annotations which would be + used for fast computation of the delta between two historical commit points. Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,254 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 20, 2010 + */ + +package com.bigdata.bop.rdf.join; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.constraint.INConstraint; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkAccessor; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.rdf.internal.IV; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * DataSetJoin(left,var)[graphs={graphIds}; maxParallel=50] + * <p> + * SPARQL specific join binds <i>var</i> to each of the given graphIds values + * for each source binding set. This join operator is useful when the + * multiplicity of the graphs is small to moderate. If there are a very large + * number of graphs, then the operator tree is to cumbersome and you would do + * better off joining against an index (whether temporary or permanent) + * containing the graphs. + * <p> + * The evaluation context is {@link BOpEvaluationContext#ANY}. + * + * @todo An alternative would be to develop an inline access path and then + * specify a standard predicate which references the data in its + * annotation. That could then generalize to a predicate which references + * persistent data, query or tx local data, or inline data. However, the + * DataSetJoin is still far simpler since it just binds the var and send + * out the asBound binding set and does not need to worry about internal + * parallelism, alternative sinks, or chunking. + * + * @todo SPARQL default graph queries require us to apply a distinct {s,p,o} + * filter to each default graph access path. For scale-out, that is a + * distributed distinct access path filter. A DHT is used when the scale + * is moderate. A distributed external merge sort SORT is used when the + * scale is very large. + * <p> + * Special cases exist for: + * <ul> + * + * <li>Whenever C is a constant, we are guaranteed that the SPO will be + * distinct and do not need to apply a distributed distinct filter.</li> + * <li> + * The SPOC access path can be optimized because we know that C is + * strictly ascending. We can note the last observed {s,p,o} and skip to + * the next possible o in the index (o:=o+1) using an advancer pattern + * (this could also just scan until o changes). These are the possibly + * distinct {s,p,o} triples, which can then be sent to the DHT unless we + * have a guarantee that S never crosses a shard boundary (this is + * trivially true for standalone can this constraint can be imposed on + * scale-out, but can cause problems if some subjects are very highly + * referenced).</li> + * <li> + * There will be some cases where we do better by doing a + * {@link PipelineJoin} and filtering using an {@link INConstraint}. + * However, this is probably only true for very small temporary graphs and + * in high volume scale-out joins where a cost analysis shows that it will + * be more efficient to read all the shards with the IN filter.</li> + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class DataSetJoin extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends BindingSetPipelineOp.Annotations { + + /** + * The variable to be bound. + */ + String VAR = DataSetJoin.class.getName() + ".var"; + + /** + * The {@link IV}s to be bound. This is logically a set and SHOULD NOT + * include duplicates. The elements in this array SHOULD be ordered for + * improved efficiency. + */ + String GRAPHS = DataSetJoin.class.getName() + ".graphs"; + + } + + /** + * Deep copy constructor. + * + * @param op + */ + public DataSetJoin(DataSetJoin op) { + super(op); + } + + /** + * Shallow copy constructor. + * @param args + * @param annotations + */ + public DataSetJoin(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + getVar(); + getGraphs(); + } + + public IVariable<?> getVar() { + return (IVariable<?>)getRequiredProperty(Annotations.VAR); + } + + public IV[] getGraphs() { + return (IV[]) getRequiredProperty(Annotations.GRAPHS); + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new DataSetJoinTask(this,context)); + + } + + /** + * Copy the source to the sink. + * + * @todo Optimize this. When using an {@link IChunkAccessor} we should be + * able to directly output the same chunk. + */ + static private class DataSetJoinTask implements Callable<Void> { + + private final DataSetJoin op; + + private final BOpContext<IBindingSet> context; + + private final IVariable<?> var; + private final IV[] graphs; + + DataSetJoinTask(final DataSetJoin op, + final BOpContext<IBindingSet> context) { + + this.op = op; + + this.context = context; + + var = op.getVar(); + + graphs = op.getGraphs(); + + } + + /** + * FIXME unit tests. + */ + public Void call() throws Exception { + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + try { + final BOpStats stats = context.getStats(); + while (source.hasNext()) { + final IBindingSet[] chunk = source.next(); + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); + handleChunk_(chunk, sink); + } + sink.flush(); + return null; + } finally { + sink.close(); + source.close(); + } + } + + /** + * Cross product join. For each source binding set and each graph, + * output one binding set in which the variable is bound to that graph. + * + * @param chunk + * A chunk of {@link IBindingSet}s from the source. + * @param sink + * Where to write the data. + * + * @todo Should we choose the nesting order of the loops based on the + * multiplicity of the source chunk size and the #of graphs to be + * bound? That way the inner loop decides the chunk size of the + * output. + * <p> + * Should we always emit an asBound source chunk for a given + * graphId? That will cluster better when the target predicate is + * mapped over CSPO. + */ + private void handleChunk_(final IBindingSet[] chunk, + final IBlockingBuffer<IBindingSet[]> sink) { + + final IBindingSet[] chunkOut = new IBindingSet[chunk.length + * graphs.length]; + + int n = 0; + + for (IBindingSet bset : chunk) { + + for (IV c : graphs) { + + bset = bset.clone(); + + bset.set(var, new Constant<IV>(c)); + + chunkOut[n++] = bset; + + } + + } + + sink.add(chunkOut); + + } + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -92,6 +92,9 @@ // join operators. suite.addTest(com.bigdata.bop.join.TestAll.suite()); + // Specialized RDF join operators : @todo move to bigdata-rdf. + suite.addTest(com.bigdata.bop.rdf.join.TestAll.suite()); + // aggregation operators. suite.addTest(com.bigdata.bop.solutions.TestAll.suite()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -176,5 +176,14 @@ assertEquals(1L, stats.chunksOut.get()); } + + /** + * {@link Tee} is just a specialized {@link CopyBindingSetOp} which requires + * that the alternate sink is also specified. Write a unit test of those + * semantics for {@link CopyBindingSetOp}. + */ + public void test_copyToSinkAndAltSink() { + fail("write test"); + } } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,70 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.bop.rdf; + + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Aggregates test suites into increasing dependency order. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestAll extends TestCase { + + /** + * + */ + public TestAll() { + + } + + /** + * @param arg0 + */ + public TestAll(String arg0) { + + super(arg0); + + } + + /** + * Returns a test that will run each of the implementation specific test + * suites in turn. + */ + public static Test suite() + { + + final TestSuite suite = new TestSuite("RDF operators"); + + suite.addTest(com.bigdata.bop.rdf.join.TestAll.suite()); + + return suite; + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/TestAll.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,70 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.bop.rdf.join; + + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Aggregates test suites into increasing dependency order. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestAll extends TestCase { + + /** + * + */ + public TestAll() { + + } + + /** + * @param arg0 + */ + public TestAll(String arg0) { + + super(arg0); + + } + + /** + * Returns a test that will run each of the implementation specific test + * suites in turn. + */ + public static Test suite() + { + + final TestSuite suite = new TestSuite("RDF join operators"); + + suite.addTestSuite(TestDataSetJoin.class); + + return suite; + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestAll.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -0,0 +1,56 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 20, 2010 + */ + +package com.bigdata.bop.rdf.join; + +import junit.framework.TestCase2; + +/** + * Test {@link DataSetJoin} + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestDataSetJoin extends TestCase2 { + + /** + * + */ + public TestDataSetJoin() { + } + + /** + * @param name + */ + public TestDataSetJoin(String name) { + super(name); + } + + public void test_something() { + fail("write tests"); + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2010-09-20 19:43:44 UTC (rev 3598) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2010-09-20 20:14:28 UTC (rev 3599) @@ -121,6 +121,7 @@ * that go around the sail. */ cxn.flush();//commit(); + cxn.commit();// if (log.isInfoEnabled()) { log.info("\n" + sail.getDatabase().dumpStore()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |