From: <tho...@us...> - 2010-09-03 00:27:56
|
Revision: 3500 http://bigdata.svn.sourceforge.net/bigdata/?rev=3500&view=rev Author: thompsonbry Date: 2010-09-03 00:27:45 +0000 (Fri, 03 Sep 2010) Log Message: ----------- Reorganized the operators and some utility classes a bit. Added an a conditional routing operator and a test suite for it. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/concurrent/TestAll.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/ThreadLocalBufferFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/UnsyncLocalOutputBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestConditionalRoutingOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestUnionBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/util/concurrent/TestHaltable.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bop-notes.txt branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/UnsyncLocalOutputBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestUnionBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestHaltable.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -162,7 +162,7 @@ * @throws NullPointerException * if the argument is <code>null</code>. */ - protected BOpBase(final BOpBase op) { + public BOpBase(final BOpBase op) { // deep copy the arguments. args = deepCopy(op.args); // deep copy the annotations. @@ -173,7 +173,7 @@ * @param args * The arguments to the operator. */ - protected BOpBase(final BOp[] args) { + public BOpBase(final BOp[] args) { this(args, null/* annotations */); @@ -185,7 +185,7 @@ * @param annotations * The annotations for the operator (optional). */ - protected BOpBase(final BOp[] args, + public BOpBase(final BOp[] args, final Map<String, Object> annotations) { if (args == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -41,9 +41,14 @@ * * @param <E> * The generic type of the objects processed by the operator. - * + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * @todo It is too confusion to have an interface hierarchy which is separate + * from the class hierarchy for the operators. Therefore roll this + * interface into {@link AbstractPipelineOp} and then rename that class to + * {@link PipelineOp} */ public interface PipelineOp<E> extends BOp { Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -1,117 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 25, 2010 - */ - -package com.bigdata.bop; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -/** - * This operator copies its source to its sink. It is used to feed the first - * join in the pipeline. The operator should have no children but may be - * decorated with annotations as necessary. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class PipelineStartOp extends BindingSetPipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * Deep copy constructor. - * - * @param op - */ - public PipelineStartOp(PipelineStartOp op) { - super(op); - } - - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public PipelineStartOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new CopyTask(context)); - - } - - /** - * Copy the source to the sink. - */ - static private class CopyTask implements Callable<Void> { - - private final BOpStats stats; - - private final IAsynchronousIterator<IBindingSet[]> source; - - private final IBlockingBuffer<IBindingSet[]> sink; - - CopyTask(final BOpContext<IBindingSet> context) { - - stats = context.getStats(); - - this.source = context.getSource(); - - this.sink = context.getSink(); - - } - - public Void call() throws Exception { - try { - while (source.hasNext()) { - final IBindingSet[] chunk = source.next(); - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); - sink.add(chunk); - stats.chunksOut.increment(); - stats.unitsOut.add(chunk.length); - } - return null; - } finally { - sink.close(); - } - } - - } - -} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -38,10 +38,10 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * @todo If we do nothing then {@link QuoteOp} will already prevent the - * evaluation of its child operand by the expediency of not defining its - * own evaluation semantics. Alternatively, we could add - * <code>eval():Op</code> using an appropriate evaluation interface. + * @todo I think that we can avoid quoting operators by using annotations (for + * some cases) and through explicit interaction between operators for + * others (such as between a join and a predicate). If that proves to be + * true then this class will be dropped. */ public class QuoteOp extends BOpBase { Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Union.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Union.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -1,135 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 18, 2010 - */ - -package com.bigdata.bop.aggregation; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BindingSetPipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.engine.Haltable; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.rdf.rules.TMUtility; -import com.bigdata.relation.RelationFusedView; - -/** - * The union of two or more {@link BindingSetPipelineOp} operators. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - * - * @todo I have some basic questions about the ability to use a UNION of two - * predicates in scale-out. I think that this might be more accurately - * modeled as the UNION of two joins. That is, rather than: - * - * <pre> - * JOIN( ..., - * UNION( foo.spo(A,loves,B), - * bar.spo(A,loves,B) ) - * ) - * </pre> - * - * using - * - * <pre> - * UNION( JOIN( ..., foo.spo(A,loves,B) ), - * JOIN( ..., bar.spo(A,loves,B) ) - * ) - * </pre> - * - * which would be a binding set union rather than an element union. - * - * @todo The union of access paths was historically handled by - * {@link RelationFusedView}. That class should be removed once queries - * are rewritten to use the union of joins. - * - * @todo The {@link TMUtility} will have to be updated to use this operator - * rather than specifying multiple source "names" for the relation of the - * predicate. - * - * @todo The FastClosureRuleTask will also need to be updated to use a - * {@link Union} over the joins rather than a {@link RelationFusedView}. - */ -public class Union extends BindingSetPipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * @param args - * Two or more operators whose union is desired. - * @param annotations - */ - public Union(final BindingSetPipelineOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - if (args.length < 2) - throw new IllegalArgumentException(); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new UnionTask(this, context)); - - } - - /** - * Pipeline union impl. - * - * FIXME All this does is copy its inputs to its outputs. Since we only run - * one chunk of input at a time, it seems that the easiest way to implement - * a union is to have the operators in the union just target the same sink. - */ - private static class UnionTask extends Haltable<Void> implements Callable<Void> { - - public UnionTask(// - final Union op,// - final BOpContext<IBindingSet> context - ) { - - if (op == null) - throw new IllegalArgumentException(); - if (context == null) - throw new IllegalArgumentException(); - } - - public Void call() throws Exception { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - } - -} Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bop-notes.txt =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bop-notes.txt 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bop-notes.txt 2010-09-03 00:27:45 UTC (rev 3500) @@ -1,540 +0,0 @@ -- Add IElement interface with Object:get(int index) to pull off the - fields from the element by index position. Use this to make - RDFJoinNexus#bind() and copyValues() generic. We can just do a cast - to IElement, but we could also change the generic type constraint on - IRelation from <E> to IRelation<E extends IElement>. But we can - just force the cast for now and not have to update all of those - generic constraints. - -- Review annotation names and defaults. Make sure that the annotation - names are all in appropriate namespaces. The namespaces should - probably be the interface or class of the operator which defines - that annotation. - -- RejectAnythingSameAsSelf appears to be assuming termIds rather than - IVs. - -- Get rid of the concept of a relation view (two or more relations - named by a given predicate) in favor of the UNION of the predicates, - which is basically a UNION of their access paths. - -- Expanders will eventually need to be replaced by either shard-wise - expanders (where possible) or query time materialization of various - inferences, e.g., using magic sets or other query rewrite - techniques. - -- IRelation#getAccessPath(IIndexManager,IPredicate). Raise this onto - onto IRelation. It is the shard aware version. There is also a - version getAccessPath(IPredicate) without the IIndexManager - parameter which is used for local indices and for RMI based access - to a scale-out index. - -- IRelation: DDL Support - - Iterator<IKeyOrder<E>> getKeyOrders(); - - IKeyOrder<E> getKeyOrder(IPredicate<E> p); - - IKeyOrder<E> getPrimaryKeyOrder(); - - IIndex getIndex(IKeyOrder) - -- IKeyOrder: DDL Support (reconcile with ITupleSerializer) - - byte[] getFromKey(IKeyBuilder keyBuilder, IPredicate<ISPO> predicate); - - byte[] getToKey(IKeyBuilder keyBuilder, IPredicate<ISPO> predicate); - - byte[] encodeKey(IKeyBuilder keyBuilder, E e); - - E decodeKey(byte[] key); - -- Elements: Add self-description of data to "elements". Harmonize with - binding sets. - - Iterator<Column> getFields(); - -- Column: - - getIndex(); - - getName(); - - getDatatype(); - - getForeignKeys(); - ... - -- Implement lexicon joins. - -- Implement spatial joins. - -- Nested optionals (conditional routing of joins). - -- Make the free text index a "resource" rather than a relation? - -- Use blocking queues with poison objects and chunking or modify - BlockingBuffer to raise up the LinkedBlockingDeque into our code - base. - -- Support tree rewrites using a pattern language. - - - http://functionaljava.org/, Functional Java. - - - tuprolog: http://alice.unibo.it/xwiki/bin/view/Tuprolog/, LGPL, Java. - - - http://scala.sygneca.com/code/mini-kanren, Example of constraint - based programming in Scala. http://scala.sygneca.com/. BSD - license. http://scala.sygneca.com/code/compressed-executable-jar. - - - PrologCafe: http://kaminari.istc.kobe-u.ac.jp/PrologCafe/, Java. - License is not specified. Authors are in Japan. Appears to be - two people. - - - YAP : http://www.dcc.fc.up.pt/~vsc/Yap/, Perl Artistic license, C. - - - XSB : http://xsb.sourceforge.net/, LGPL, C. - -BOp - - Serializable - - Cloneable - - Unmodifiable - - arity():int - - args():List<BOp> - - annotations():Map<String,Object> -.BOpList: Used to represent lists of operators. E.g., IN(var,{graphIds}). - - values():List<BOp> -.IConstantOrVariable - - isConstant() - - isVariable() - - get() // iff isConstant() - - getName() // iff isVariable() -..IConstant -..IVariable -.IOpN -..IPredicate(name,arg...)[shardId:int;optional:boolean;constraint:IConstraint[],expander] - -- Distributed query execution pattern: - - The historical pipeline join propagated evaluation from left to - right. This needs to be revisited in now that we are dealing with - operator trees. Operator trees lend themselves naturally to top-down - evaluation. While I think that we can do top-down evaluation of the - operator tree for scaleup, the distributed query execution logic is - more complicated and top-down evaluation is not compatible with - distributed evaluation because joins must run for different shards - based on the partition identifier associated with each bindingSet[] - they receive. - - What we been doing is pushing binding sets along propagating joins - at specific shards onto nodes together with those binding sets. - This was a left-to-right evaluation strategy when the IRule was just - an ordered array of tails on which we needed to read. However, now - that we have an operator tree it would correspond to a bottom up - evaluation of a left-deep tree where the right operands were always - access path reads. That makes my head hurt just a bit when I - consider that the bottom up evaluation would also be "partial" as - binding sets appear. - - For a given incoming IBindingSet[] chunk we will continue to do - exactly what we have been doing, but the surrounding logic needs to - be replaced. Starting a "join" (at least for piplined scale-out) - needs to merely register a join task factory that will handle - binding sets as they arrive. - - I am thinking that the way to handle this is to send the query - identifier, join operation identifier, and partition identifier - along with the client's proxy object, the senders operator - identifier, and the senders's service identifier. If the node (a - data service) which receives that RMI message has not seen the query - yet it uses RMI against the client's proxy to fetch the query and - then "starts" the query on that node. Starting a query would merely - allow execution of the operators described in the query plan on the - node once they had sufficient data to run. The current pipeline - join is a chunk-wise nested index join. It runs one pass each time - it has a chunk of binding sets for some shard. Query termination - would be controlled by the client. It would instruct all nodes - known to be running the query to halt execution for that query. - Even if a node misses that message, it will find out when it tries - to send intermediate results to another node that the query was - cancelled. - - Per shard locks, tasks, and threads. - - When running within a single Journal, the query plan is executed by - one task which holds all of the necessary locks. Those locks are - acquired up front by an inspection of the query plan to determine - which indices are needed [actually, it may just be using the - unisolated read/write index and otherwise historical views w/o - locks.] - - There are issues relating to execution of the joins under the - concurrency manager, both in order to have access to the correct - IIndexManager and in order to managing reads and writes against the - unisolated indices by acquiring the appropriate locks. The way - things work right now the upstream join tests a cache for the - downstream join task for a given shard. If there is a cache miss, - it sends a factory task which uses a singleton pattern to start a - join task executing with the appropriate locks under the concurrency - manager and then returns the proxy for that join task to the caller. - This guarantees that each join task has the correct locks, but it - does so at the expense of running one thread per join task. It will - be difficult to get around this one task per shard per join - constraint without going directly to the lock manager with the shard - lock requests. [Or just submitting a per binding set chunk task to - the ConcurrencyManager, which might not be that bad if the chunks - are chunky.] - - ==> Given this, maybe it would be easiest to adapt the current join - execution to allow optional gotos by paying close attention to the - termination condition for the query? We could then refactor to - support BOPs within the same general control logic. A DISTINCT - filter could be yet another custom RMI thing layered directly into - the join logic. - - Rather than defining an eval() method for each operator, we have - standoff interpretation of the pipeline operators (whether for - binding sets, elements, or solutions). The query plan could - encapsulate the local versus distributed execution with annotations - on the operators rather than interposing operators and those - annotations would be used to wrap the sink with one which marshells - the outputs onto NIO buffers. - - - Pipelined chunk-wise nested index join. This is the existing - join algorithm. For each binding set chunk received on a node to - be joined with a given shard, we execute that chunk wise join and - emit the intermediate results. [When the join is optional, we - have an optional target and we send the binding sets which do not - join to that optional target.] - - - multi-block io pipelined join. This is a version of the piplined - chunk-wise nested index join which accumulates much larger chunks - (mega chunks) of binding sets (potentially all intermediate - results) and then computes the join of that using the - operator-at-a-time approach for that mega chunk. The tradeoff - between this join and the pure operator at a time join is that we - can keep the intermediate results off the disk using this - approach but we may have to read the shard multiple times. - - - operator-at-a-time shard wise multi-block-io join. This join - proceeds an operator at a time. Once the producer is done, it - computes the entire join using the intermediate results from the - prior join and a single multi-block IO pass over the shard view. - - A tuple read from the shard joins if there exists a binding set - which is consistent with that tuple. For example, given: - - :- ..., POS(A loves B), SPO(B loves C). - - and the following intermediate results from the POS shard: - - B0:[A=John, B=Mary, ...] - B1:[A=Mary, B=Paul, ...] - B2:[A=Paul, B=Leon, ...] - B3:[A=Leon, B=Paul, ...] - - and the following tuples read from the SPO shard: - - T0:(John loves Mary) - T1:(Mary loves Paul) - T2:(Paul loves Leon) - T3:(Leon loves Paul) - - then we have the following joins: - - (T2, B3) // T2:(Paul loves Leon) with B3:[A=Leon, B=Paul, ...]. - (T3, B2) // T3:(Leon loves Leon) with T2:[A=Paul, B=Leon, ...]. - - There are several ways to manipulate the intermediate results to - setup the join: - - Merge join: Merge sort the binding sets based on the relevant - bound values (A,B) and do an ordered scan of the binding sets and - the shard, emitting results which join. - - Hash join: Create a temporary hash table (backed by disk) whose - keys are the relevant bound values for (A,B) and whose values are - either: (a) the associated binding sets (there can be more than - one per (A,B) and all must be preserved; or (b) the #of offsets - and the offsets into an array or a persistent store of the - associated binding sets. - - Based on our need for an N-way merge sort, I would think that the - merge join will offer the most benefit initially and is likely to - be better organized for execution in memory (it has locality at - more memory tiers). - -- Scaleup query execution pattern: - - Queries which are not distributed can use top-down evaluation of the - left deep pipeline joins. - - -- BOp execution. - - (***) Implement all BOps for which there is existing functionality. - - PipelineJoin: Join binding set producer with local access path - using asBound Predicate. This should be **TRIVIAL** and can - be unit tested against mock objects for the left and right - hand operands. (The vast majority of the logic in JoinTask - dealt with coordinating the pipeline operations. It should - even be trivial to put in "gotos" for failed optional joins. - Defer the logic for mapShards until the core joins are - running.) - - Predicate: read on as bound access path (IChunked(Ordered)Iterator). - - Work through evaluation for BOps, perhaps using BOpUtility, - and definately using unit tests. We should be able to unit - test correct termination, abnormal termination, etc. against - the EDS [move to test suite.] - - How are the rule execution statistics going to be passed - around? How will we get visibility into the running rules and - their current execution statistics (especially for long - running rules)? How can an operator cancel long running - rules? - - Implement optional / conditional binding set routing to - ancestors in the BOp tree (parent and eventual parent). - - Implement map shards with RMI then convert to buffers. Figure - out how to label BOps as senders/receivers. - - Implement map nodes. The ordered list of nodes should be - reused for each MapNodes operator. - - All pipeline operators can specify the pipeline annotations - (chunkSize, chunksOfChunks, etc). - - (***) Harmonize IElementFilter, IConstraint, BOp, IChunkConverter, - PipelineEval. - - Work through how to moving the binding sets and related stuff - around, including when running it into a native heap buffer - and when just draining a queue, blocking buffer's iterator, a - bulk filter (DISTINCT, EXISTS, !EXISTS), bulk completion, etc. - - Asynchronous production of binding sets for vectored pipeline - evaluation. Evaluation should also sets future on buffer as - side effect or collects Future's of the operator tree. - - - Future<Void> eval(IJoinNexus,buffer<BindingSet[]>); - - Evaluation of a predicate, obtaining an access path. The - caller can then wrap the result and do range counts, high - level iterators with filters, low level tuple iterators, - sample the index / view, etc. - - IPredicate<E>: - - - Future<IAccessPath<E>> eval(IJoinNexus,IBindingSet); - - IConstant<T>: - - - T eval(); // return the constant's value. - - IVariable<T>: - - - T eval(IBindingSet); // return the bound value (vs variable name?) - - Striterator patterns return the wrapped iterator. The wrapper - iterator is then applied an (eventual) parent bop. This - pattern can be used with the CTC Striterator, - IChunked(Ordered)Iterator, and probably ITupleIterator (with - the FilterConstructor, which could just be some bops). - - - wrapperItr eval(itr) - - - IElementFilter is element-at-a-time filtering of materialized tuples. - - - IConstraint is IBindingSet-at-a-time filtering. - - - BOp.PipelineEval is IBindingSet at a time evaluation, but it is - designed for chunked pipelineing of the binding sets. - - - IChunkConverter is chunk at a time evaluation and has the sense of - returning a chunk for each chunk consumed. That could be used to - return the bit map from the DISTINCT operator, which is something - that is otherwise missing from the BOp.PipelineEval interface. - - - We need a "batch-oriented" constraint for IN (due to compilation - of the set into an efficient representation prior to execution) - and DISTINCT (it has to batch binding sets in order to amortize - the cost of generating the sort keys and/or chunking up the - results). - - - Reconcile Distinct, DistinctSPOIterator, etc. - - - The IN filters should probably be turned into JOINs against an in - memory IBindingSet[] source except in those cases where we will - have locality in the index for the variable in the IN constraint. - At present, IN is only used by the MatchRule. However, it could - also be used for named graphs and default graphs. There are - specialized filters for those purposes SPARQL data sets right now - {InGraphBinarySearchFilter, InGraphHashSetFilter}, but I think - that this is all pretty general purpose stuff. - -IElementFilter: element at a time filtering (does not scale-out). -.ElementFilter: Unused (discard). -.SameVariableConstraint: precompiles some private state (ok iff immutable). -.SolutionFilter: applies filter to the visited elements; related to rule's head... -.SPOFilter: returns false if not an ISPO. - - canAccept(Object):boolean -..DoNotAddFilter -..ExplicitSPOFilter -..InferredSPOFilter -..InGraphBinarySearchFilter: duplicates IN filter? -..InGraphHashSetFilter: duplicates IN filter? - -IChunkConverter<E,F>: bulk conversion in/out (scales-out since RMI can be chunky). -.BulkCompleteConverter -.BulkFilterConverter -.DistinctFilter: Reconcile with Distinct, ISortKeyBuilder, IN, InGraphHashSetFilter, etc. -.HitConverter - -- Evaluation types: - - IRelation(aka namespace) - IDatabase(aka namespace / AbstractTripleStore) - IIndex, ILocalBTreeView, BTree, IndexSegment, FusedView, - IMap? (aka hash map, DHT) - ITable?, IBat? - File (flat file in/out), - - Bloomfilter - - E[], BlockingBuffer<E[]>.iterator() - - IBindingSet[], - IChunkedIterator<IBindingSet[]>, - BlockingBuffer<IBindingSet[]>.iterator(), - - ISolution[], etc. - - - Life cycle management of resources local to the operator - execution. In some cases, resources must be eventually released, - much like "finally{}". This has to be managed in a distributed - environment where there is no "stack" to be unwound. - - - Explicit management of query priority, buffers, timeout, etc. - - - Visibility into long running queries. - - - Left-deep operator trees for pipelined execution. - - - newInstance(IJoinNexus) : Future<T>. Tasks will run on - ForkJoinPools vs Executors depending on whether they support light - weight asynchronous operations all the way down or need to use a - thread per disk IO. However, if the disk IO thread pool is global - to a DataService and we use queues for disk IO requests, then we - can always use the ForkJoinPool for operators (queuing a request - is consistent with fork/join, right?). - -- Add IOp library. Some operators will be specific to standalone (a - ConcurrentHashMap based distinct) or scale-out (e.g., a DHT based - distinct). - - - RuntimeQueryOptimization(JoinGraph) - Execute the join graph using - interleaved query optimization and query execution. - - Evaluate for both selective and unselective joins. Note that - sampling can result in all relevant tuples being materialized, at - which point chain sampling will perform the actual join and - materialize the real intermediate result set. - - - Predicate. This corresponds to an access path with the current - bindings. - - - SCAN(fromKey,toKey) (local is trivial) - - - SCAN(partitionId,fromKey,toKey) (distributed requires the - partitionId and the local IIndexManager must be set on the - DataService where the operation will execute). - - - Sample(Predicate,limit):(E[]). Sample tuples from a relation which - satisify the predicate. Returns the sampled elements. - - - SampleOp(IOp,limit):(E[],est). Sample output from an operation, - halting if the limit is satisfied. Returns the sample and the - estimated cardinality of the operation. - - - MapNodes(f,BS[],DS[]). Hash partition mapping of some binding sets - using a function across a set of nodes (typically data service - nodes, but that is not required). - - - MapShards(BS[],tail[i+1]). Maps binding sets across shards on - which we need to read for the next join predicate. The operator - is associated with a logical port and a maximum buffer allocation. - - @todo If we want to assign indices to variables for binding sets - then that needs to be done with reference to a total order over - the "rule". With the generalization to an operator tree, the fact - that we strip out variables from the binding set when they are no - longer in use, and the possibility of permutations over the as yet - unevaluated parts of the operator, that mapping needs to be quite - explicit. Perhaps it could be part of the {@link IJoinNexus} or - raised into the root node of the operator tree. - - - Receive()[queryId, port]. Receive binding sets a logical port for - a given query. The receiver is given a maximum buffer allocation. - If the buffers are full, it uses flow control to halt the sender - (NACKs the request to send a buffer to the receiver). Can be used - with MapNodes or MapShards. [Can be tested within a single JVM.] - - - IN(var,E[]). Pipeline join operator binds the variable to each of - the elements in the array in turn for each binding set presented - to the operator. [This is useful for RDF dataset constructs.] - - - JOIN(IPredicate). Pipeline join operator. It accepts binding - sets on one side and joins then against the (local) access path - for the specified predicate. - - - StarJoin(...). - - - ? HashJoin? Join two arguments, each of which evaluates to a - collection of binding sets. This probably can't be pipelined - unless one of the arguments is relatively small and hence can be - fully materialized. Look at how to handle cases where both - arguments have large result sets. - - - ? Execute a correlated subquery and join of the results from that - subquery against with the each binding set presented to the - subquery? - - - DISTINCT (local, concurrent hash map). - - - DISTINCT (distributed hash table). - - - SORT (local, Arrays.sort(), radix sort). - - - SORT (distributed, N-way merge sort). - - - CONSTRUCT (create an element from a binding set). - - - INSERT. Insert elements into a relation (local, sharded uses - buffers to move the constructed elements; behavior can use either - unisolated writes, eventually consistent unisolated writes, or - full transaction isolation). - - - REMOVE. Removes elements from a relation (sharded uses buffers to - move the constructed elements). - - Note: Handle remove of elements matching a predicate elements by - first executing the predicate to select the elements and then - removing the elements using the same kinds of mechansims which are - used for insert. - -============================================================ - -Much of the complexity of the current approach owes itself to having to run a separate task for each join for each shard in order to have the appropriate lock when running against the unisolated shard view. This also means that the join task is running inside of the concurrency manager and hence has the local view of the shard. - -The main, and perhaps the only, reason why we run unisolated rules is during closure, when we query against the unisolated indices and then write the entailments back on the unisolated indices. - -Supporting closure has always been complicated. This complexity is mostly handled by ProgramTask#executeMutation() and AbstractTripleStore#newJoinNexusFactory() which play games with the timestamps used to read and write on the database, with commit points designed to create visibility for tuples written by a mutation rule, and with the automated advance of the read timestamp for the query in each closure pass in order to make newly committed tuples visible to subsequent rounds of closure. For scale-out, we do shard-wise auto commits so we always have a commit point which makes each write visible and the read timestamp is actually a read-only transaction which prevents the historical data we need during a closure round from being released as we are driving updates onto the federation. For the RWStore, we are having a similar problem (in the HA branch since that is where we are working on the RWStore) where historically allocated records were being released as writes drove updates on the indices. Again, we "solved" the problem for the RWStore using a commit point followed by a read-only transaction reading on that commit point to hold onto the view on which the next closure round needs to read (this uncovered a problem with the RWStore and transaction service interaction which Martyn is currently working to resolve through a combination of shadow allocators and deferred deletes which are processed once the release time is advanced by the transaction service). - -The WORM does not have some of these problems with closure because we never delete history, so we do not need to create a commit point and a read-behind transaction. However, the WORM would have problems with concurrent access to the unisolated indices except that we hack that problem through the transparent use of the UnisolatedReadWriteIndex, which allows multiple threads to access the same unisolated index view using a read/write lock pattern (concurrent readers are allowed, but there is only one writer and it has exclusive access when it is running). This works out because we never run closure operations against the WORM through the concurrency manager. If we did, we would have to create a commit point after each mutation and use a read-behind transaction to prevent concurrent access to the unisolated index. - -The main advantage that I can see of the current complexity is that it allows us to do load+closure as a single operation on the WORM, resulting in a single commit point. This makes that operation ACID without having to use full read/write transactions. This is how we gain the ACID contract for the standalone Journal in the SAIL for the WORM. Of course, the SAIL does not have that contract for the RWStore because we have to do the commit and read-behind transaction in order to have visibility and avoid concurrent access to the unisolated index (by reading behind on the last commit point). - -I think that the reality is even one step more complicated. When doing truth maintenance (incremental closure), we bring the temporary graph to a fixed point (the rules write on the temp store) and then apply the delta in a single write to the database. That suggests that incremental truth maintenance would continue to be ACID, but that database-at-once-closure would be round-wise ACID. - -So, I would like to suggest that we break ACID for database-at-once-closure and always follow the pattern of (1) do a commit before each round of closure; and (2) create a read-behind transaction to prevent the release of that commit point as we drive writes onto the indices. If we follow this pattern then we can write on the unisolated indices without conflict and read on the historical views without conflict. Since there will be a commit point before each mutation rule runs (which corresponds to a closure round), database-at-once-closure will be atomic within a round, but will not be a single atomic operation. Per above, I think that we would retain the ACID property for incremental truth maintenance against a WORM or RW mode Journal. - ----- - -The advantage of this proposal (commit before each mutation rule and run query against a read-behind transaction) is that this could enormously simplify how we execute joins. - -Right now, we use a factory pattern to create a join task on each node for each shard for which that node receives binding sets for a query. The main reason for doing this is to gain the appropriate lock for the unisolated index. If we never run a query against the unisolated index then we can go around the concurrency manager and run a single "query manager" task for all joins for all shards for all queries. This has some great benefits which I will go into below. - -That "query manager" task would be responsible for accepting buffers containing elements or binding sets from other nodes and scheduling consumption of those data based on various critieria (order of arrival, priority, buffer resource requirements, timeout, etc.). This manager task could use a fork join pool to execute light weight operations (NIO, formulation of access paths from binding sets, mapping of binding sets onto shards, joining a chunk already read from an access path against a binding set, etc). Operations which touch the disk need to run in their own thread (until we get Java 7 async file IO, which is already available in a preview library). We could handle that by queuing those operations against a fixed size thread pool for reads. - -This is a radical change in how we handle distributed query execution, but I think that it could have a huge payoff by reducing the complexity of the join logic, making it significantly easier to execute different kinds of join operations, reducing the overhead for acquiring locks for the unisolated index views, reducing the #of threads consumed by joins (from one per shard per join per query to a fixed pool of N threads for reads), etc. It would centralize the management of resources on each node and make it possible for us to handle things like join termination by simply purging data from the query manager task for the terminated join. Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -0,0 +1,217 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 25, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.AbstractPipelineOp; +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * An operator for conditional routing of binding sets in a pipeline. The + * operator will copy binding sets either to the default sink (if a condition is + * satisfied) and to the alternate sink otherwise. + * <p> + * Conditional routing can be useful where a different data flow is required + * based on the type of an object (for example a term identifier versus an + * inline term in the RDF database) or where there is a need to jump around a + * join group based on some condition. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class ConditionalRoutingOp extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends AbstractPipelineOp.Annotations { + + /** + * An {@link IConstraint} which specifies the condition. When the + * condition is satisfied the binding set is routed to the default sink. + * When the condition is not satisfied, the binding set is routed to the + * alternative sink. + */ + String CONDITION = ConditionalRoutingOp.class.getName() + ".condition"; + + } + + /** + * Deep copy constructor. + * + * @param op + */ + public ConditionalRoutingOp(ConditionalRoutingOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public ConditionalRoutingOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + /** + * @see Annotations#CONDITION + */ + public IConstraint getCondition() { + + return (IConstraint) getProperty(Annotations.CONDITION); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ConditionalRouteTask(this, context)); + + } + + /** + * Copy the source to the sink or the alternative sink depending on the + * condition. + */ + static private class ConditionalRouteTask implements Callable<Void> { + + private final BOpStats stats; + + private final IConstraint condition; + + private final IAsynchronousIterator<IBindingSet[]> source; + + private final IBlockingBuffer<IBindingSet[]> sink; + + private final IBlockingBuffer<IBindingSet[]> sink2; + + ConditionalRouteTask(final ConditionalRoutingOp op, + final BOpContext<IBindingSet> context) { + + this.stats = context.getStats(); + + this.condition = op.getCondition(); + + if (condition == null) + throw new IllegalArgumentException(); + + this.source = context.getSource(); + + this.sink = context.getSink(); + + this.sink2 = context.getSink2(); + + if (sink2 == null) + throw new IllegalArgumentException(); + + if (sink == sink2) + throw new IllegalArgumentException(); + + } + + public Void call() throws Exception { + try { + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); + + final IBindingSet[] def = new IBindingSet[chunk.length]; + final IBindingSet[] alt = new IBindingSet[chunk.length]; + + int ndef = 0, nalt = 0; + + for(int i=0; i<chunk.length; i++) { + + final IBindingSet bset = chunk[i]; + + if (condition.accept(bset)) { + + def[ndef++] = bset; + + } else { + + alt[nalt++] = bset; + + } + + } + + if (ndef > 0) { + if (ndef == def.length) + sink.add(def); + else + sink.add(Arrays.copyOf(def, ndef)); + stats.chunksOut.increment(); + stats.unitsOut.add(ndef); + } + + if (nalt > 0) { + if (nalt == alt.length) + sink2.add(alt); + else + sink2.add(Arrays.copyOf(alt, nalt)); + stats.chunksOut.increment(); + stats.unitsOut.add(nalt); + } + + } + + sink.flush(); + sink2.flush(); + + return null; + + } finally { + + sink.close(); + sink2.close(); + + } + + } + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java (from rev 3495, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineStartOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -0,0 +1,124 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 25, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * This operator copies its source to its sink. It is used to feed the first + * join in the pipeline. The operator should have no children but may be + * decorated with annotations as necessary. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo unit tests. + */ +public class CopyBindingSetOp extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + * + * @param op + */ + public CopyBindingSetOp(CopyBindingSetOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public CopyBindingSetOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new CopyTask(context)); + + } + + /** + * Copy the source to the sink. + */ + static private class CopyTask implements Callable<Void> { + + private final BOpStats stats; + + private final IAsynchronousIterator<IBindingSet[]> source; + + private final IBlockingBuffer<IBindingSet[]> sink; + + CopyTask(final BOpContext<IBindingSet> context) { + + stats = context.getStats(); + + this.source = context.getSource(); + + this.sink = context.getSink(); + + } + + public Void call() throws Exception { + try { + while (source.hasNext()) { + final IBindingSet[] chunk = source.next(); + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); + sink.add(chunk); + stats.chunksOut.increment(); + stats.unitsOut.add(chunk.length); + } + sink.flush(); + return null; + } finally { + sink.close(); + } + } + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java (from rev 3495, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/Union.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-09-03 00:27:45 UTC (rev 3500) @@ -0,0 +1,135 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.rdf.rules.TMUtility; +import com.bigdata.relation.RelationFusedView; +import com.bigdata.util.concurrent.Haltable; + +/** + * The union of two or more {@link BindingSetPipelineOp} operators. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo I have some basic questions about the ability to use a UNION of two + * predicates in scale-out. I think that this might be more accurately + * modeled as the UNION of two joins. That is, rather than: + * + * <pre> + * JOIN( ..., + * UNION( foo.spo(A,loves,B), + * bar.spo(A,loves,B) ) + * ) + * </pre> + * + * using + * + * <pre> + * UNION( JOIN( ..., foo.spo(A,loves,B) ), + * JOIN( ..., bar.spo(A,loves,B) ) + * ) + * </pre> + * + * which would be a binding set union rather than an element union. + * + * @todo The union of access paths was historically handled by + * {@link RelationFusedView}. That class should be removed once queries + * are rewritten to use the union of joins. + * + * @todo The {@link TMUtility} will have to be updated to use this operator + * rather than specifying multiple source "names" for the relation of the + * predicate. + * + * @todo The FastClosureRuleTask will also need to be updated to use a + * {@link Union} over the joins rather than a {@link RelationFusedView}. + */ +public class Union extends BindingSetPipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * @param args + * Two or more operators whose union is desired. + * @param annotations + */ + public Union(final BindingSetPipelineOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + if (args.length < 2) + throw new IllegalArgumentException(); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new UnionTask(this, context)); + + } + + /** + * Pipeline union impl. + * + * FIXME All this does is copy its inputs to its outputs. Since we only run + * one chunk of input at a time, it seems that the easiest way to implement + * a union is to have the operators in the union just target the same sink. + */ + private static class UnionTask extends Haltable<Void> implements Callable<Void> { + + public UnionTask(// + final Union op,// + final BOpContext<IBindingSet> context + ) { + + if (op == null) + throw new IllegalArgumentException(); + if (context == null) + throw new IllegalArgumentException(); + } + + public Void call() throws Exception { + // TODO Auto-generated method stub + throw new UnsupportedOperationException(); + } + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-02 22:24:22 UTC (rev 3499) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 20... [truncated message content] |