From: <tho...@us...> - 2010-10-24 18:18:18
|
Revision: 3842 http://bigdata.svn.sourceforge.net/bigdata/?rev=3842&view=rev Author: thompsonbry Date: 2010-10-24 18:18:10 +0000 (Sun, 24 Oct 2010) Log Message: ----------- Modified PipelineJoin to make the predicate an annotation. This is in keeping with a design pattern where operands (other than simple variables and constants) are evaluated in the pipeline and where annotations are interpreted. This also simplifies the RunState logging format. Modified RunningQuery to track all Futures and permit more than one concurrent operator task per (bopId,shardId). There is now an annotation which controls how many such tasks may run concurrently. I've also experiment with the parameter space for the BufferAnnotations and the fullyBufferedReadThreshold. These do not appear to have much influence on query performance for either LUBM U50 or BSBM 100M. LUBM query performance remains significantly better in the trunk (13s vs 17s). There is a less significant difference in BSBM performance (4234 vs 4058). Since LUBM U50 tends to be memory based (after the first presentation of each query) this suggests that the performance difference is related more to in-memory dynamics than to disk access. The most significant difference right now between the trunk and the quads query branch is that we chain the input and output buffers of operators together in the trunk but run operators over chunks of materialized inputs in the branch. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -52,7 +52,7 @@ /** * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} */ - int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100; + int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100;//trunk=1000 /** * Sets the capacity of the {@link IBuffer}[]s used to accumulate a chunk of @@ -66,7 +66,7 @@ /** * Default for {@link #CHUNK_CAPACITY} */ - int DEFAULT_CHUNK_CAPACITY = 100; + int DEFAULT_CHUNK_CAPACITY = 100;//trunk=100 /** * The timeout in milliseconds that the {@link BlockingBuffer} will wait for @@ -81,7 +81,7 @@ * * @todo this is probably much larger than we want. Try 10ms. */ - int DEFAULT_CHUNK_TIMEOUT = 20; + int DEFAULT_CHUNK_TIMEOUT = 20;//trunk=1000 /** * The {@link TimeUnit}s in which the {@link #CHUNK_TIMEOUT} is measured. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -42,6 +42,7 @@ import com.bigdata.btree.filter.Advancer; import com.bigdata.btree.filter.TupleFilter; import com.bigdata.mdi.PartitionLocator; +import com.bigdata.rawstore.Bytes; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.ElementFilter; @@ -255,7 +256,7 @@ * @todo Experiment with this. It should probably be something close to * the branching factor, e.g., 100. */ - int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 100; + int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 100;//trunk=20*Bytes.kilobyte32 /** * Specify the {@link IRangeQuery} flags for the {@link IAccessPath} ( Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -39,15 +39,6 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * @todo Add time per bop. This can not be directly aggregated into wall time - * since there are concurrent processes. However, this will be useful - * since we tend to process materialized chunks with the new - * {@link QueryEngine} such that the operator evaluation time now more or - * less directly corresponds to the time it takes to act on local data, - * producing local outputs. The {@link QueryEngine} itself now handles the - * transportation of data between the nodes so that time can be factored - * out of the local aspects of query execution. */ public class BOpStats implements Serializable { @@ -56,12 +47,12 @@ */ private static final long serialVersionUID = 1L; -// /** -// * The timestamp (milliseconds) associated with the start of execution for -// * the join dimension. This is not aggregated. It should only be used to -// * compute the elapsed time for the operator. -// */ -// private final long startTime; +// /** +// * The timestamp (nanoseconds) assigned when this {@link BOpStats} object +// * was creatred. This can not be directly aggregated into wall time since +// * concurrent processes are nearly always used during query evaluation. +// */ +// private final long startTime = System.nanoTime(); /** * #of chunks in. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -69,4 +69,9 @@ boolean DEFAULT_ONE_MESSAGE_PER_CHUNK = false; + String MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = QueryEngineTestAnnotations.class.getName() + + ".maxConcurrentTasksPerOperatorAndShard"; + + int DEFAULT_MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = Integer.MAX_VALUE; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,6 +45,7 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; +import com.bigdata.bop.PipelineOp; import com.bigdata.relation.accesspath.IBlockingBuffer; /** @@ -756,8 +756,13 @@ final Integer id = bopIds[i]; - sb.append("\tnavail(id=" + id + ")"); + final BOp bop = bopIndex.get(id); + + if(!(bop instanceof PipelineOp)) + continue; // skip non-pipeline operators. + sb.append("\tnavail(id=" + id + ")"); + sb.append("\tnrun(id=" + id + ")"); } @@ -853,6 +858,11 @@ final Integer id = bopIds[i]; + final BOp bop = bopIndex.get(id); + + if(!(bop instanceof PipelineOp)) + continue; // skip non-pipeline operators. + final AtomicLong nrunning = runningMap.get(id); final AtomicLong navailable = availableMap.get(id); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -28,7 +28,6 @@ package com.bigdata.bop.engine; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -145,12 +144,18 @@ */ final private Haltable<Void> future = new Haltable<Void>(); + /** + * The maximum number of operator tasks which may be concurrently executor + * for a given (bopId,shardId). + */ + final private int maxConcurrentTasksPerOperatorAndShard; + /** * A collection of (bopId,partitionId) keys mapped onto a collection of * operator task evaluation contexts for currently executing operators for * this query. */ - private final ConcurrentHashMap<BSBundle, ChunkFutureTask> operatorFutures; + private final ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>> operatorFutures; /** * A map of unbounded work queues for each (bopId,partitionId). Empty queues @@ -450,8 +455,13 @@ this.bopIndex = BOpUtility.getIndex(query); - this.operatorFutures = new ConcurrentHashMap<BSBundle, ChunkFutureTask>(); + this.maxConcurrentTasksPerOperatorAndShard = query + .getProperty( + QueryEngineTestAnnotations.MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD, + QueryEngineTestAnnotations.DEFAULT_MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD); + this.operatorFutures = new ConcurrentHashMap<BSBundle, ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>>(); + this.operatorQueues = new ConcurrentHashMap<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>>(); /* @@ -520,11 +530,12 @@ } - /** - * Pre-populate a map with {@link BOpStats} objects for the query. Operators - * in subqueries are not visited since they will be assigned {@link BOpStats} - * objects when they are run as a subquery. - */ + /** + * Pre-populate a map with {@link BOpStats} objects for the query. Only the + * child operands are visited. Operators in subqueries are not visited since + * they will be assigned {@link BOpStats} objects when they are run as a + * subquery. + */ private void populateStatsMap(final BOp op) { if(!(op instanceof PipelineOp)) @@ -1139,14 +1150,26 @@ lock.lock(); try { // Make sure the query is still running. - future.halted(); - // Is there a Future for this (bopId,partitionId)? - final ChunkFutureTask cft = operatorFutures.get(bundle); - if (cft != null && !cft.isDone()) { - // already running. - return false; - } - // Remove the work queue for that (bopId,partitionId). + if(future.isDone()) + return false; + // Is there a Future for this (bopId,partitionId)? + ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures + .get(bundle); + if (map != null) { + int nrunning = 0; + for (ChunkFutureTask cft : map.keySet()) { + if (cft.isDone()) + map.remove(cft); + nrunning++; + } + if (map.isEmpty()) + operatorFutures.remove(bundle); + if (nrunning > maxConcurrentTasksPerOperatorAndShard) { + // Too many already running. + return false; + } + } + // Remove the work queue for that (bopId,partitionId). final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues .remove(bundle); if (queue == null || queue.isEmpty()) { @@ -1165,16 +1188,26 @@ for (IChunkMessage<IBindingSet> msg : messages) { source.add(msg.getChunkAccessor().iterator()); } - /* - * Create task to consume that source. - */ - final ChunkFutureTask ft = new ChunkFutureTask(new ChunkTask( - bundle.bopId, bundle.shardId, nmessages, source)); - /* - * Submit task for execution (asynchronous). - */ - queryEngine.execute(ft); - return true; + /* + * Create task to consume that source. + */ + final ChunkFutureTask cft = new ChunkFutureTask(new ChunkTask( + bundle.bopId, bundle.shardId, nmessages, source)); + /* + * Save the Future for this task. Together with the logic above this + * may be used to limit the #of concurrent tasks per (bopId,shardId) + * to one for a given query. + */ + if (map == null) { + map = new ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>(); + operatorFutures.put(bundle, map); + } + map.put(cft, cft); + /* + * Submit task for execution (asynchronous). + */ + queryEngine.execute(cft); + return true; } finally { lock.unlock(); } @@ -1199,6 +1232,29 @@ } + public void run() { + + final ChunkTask t = chunkTask; + + super.run(); + + /* + * This task is done executing so remove its Future before we + * attempt to schedule another task for the same + * (bopId,partitionId). + */ + final ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures + .get(new BSBundle(t.bopId, t.partitionId)); + if (map != null) { + map.remove(this, this); + } + + // Schedule another task if any messages are waiting. + RunningQuery.this.scheduleNext(new BSBundle( + t.bopId, t.partitionId)); + + } + } /** @@ -1224,16 +1280,6 @@ public void run() { - // Run the task. - runOnce(); - - // Schedule another task if any messages are waiting. - RunningQuery.this.scheduleNext(new BSBundle( - t.bopId, t.partitionId)); - } - - private void runOnce() { - final UUID serviceId = queryEngine.getServiceUUID(); try { @@ -1467,24 +1513,25 @@ + bop); } - /* - * Setup the BOpStats object. For some operators, e.g., SliceOp, - * this MUST be the same object across all invocations of that - * instance of that operator for this query. This is marked by the - * PipelineOp#isSharedState() method and is handled by a - * putIfAbsent() pattern when that method returns true. - * - * Note: RunState#haltOp() avoids adding a BOpStats object to itself - * since that would cause double counting when the same object is - * used for each invocation of the operator. - * - * Note: By using a shared stats object we have live reporting on - * all instances of the task which are being evaluated on the query - * controller (tasks running on peers always have distinct stats - * objects and those stats are aggregated when the task finishes). - */ + /* + * Setup the BOpStats object. For some operators, e.g., SliceOp, + * this MUST be the same object across all invocations of that + * instance of that operator for this query. This is marked by the + * PipelineOp#isSharedState() method and is handled by a + * putIfAbsent() pattern when that method returns true. + * + * Note: RunState#haltOp() avoids adding a BOpStats object to itself + * since that would cause double counting when the same object is + * used for each invocation of the operator. + * + * Note: It tends to be more useful to have distinct BOpStats + * objects for each operator task instance that we run as this makes + * it possible to see how much work was performed by that task + * instance. The data are aggregated in the [statsMap] across the + * entire run of the query. + */ final BOpStats stats; - if (((PipelineOp) bop).isSharedState() || statsMap != null) { + if (((PipelineOp) bop).isSharedState()) {//|| statsMap != null) { // shared stats object. stats = statsMap.get(bopId); } else { @@ -1947,23 +1994,19 @@ boolean cancelled = false; - final Iterator<ChunkFutureTask> fitr = operatorFutures.values().iterator(); + final Iterator<ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask>> fitr = operatorFutures.values().iterator(); while (fitr.hasNext()) { - final ChunkFutureTask f = fitr.next(); - - try { - - if (f.cancel(mayInterruptIfRunning)) - cancelled = true; + final ConcurrentHashMap<ChunkFutureTask,ChunkFutureTask> set = fitr.next(); - } finally { - -// fitr.remove(); - - } + for(ChunkFutureTask f : set.keySet()) { + if (f.cancel(mayInterruptIfRunning)) + cancelled = true; + + } + } return cancelled; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -79,15 +79,14 @@ /** * Pipelined join operator for online (selective) queries. The pipeline join - * accepts chunks of binding sets from its left operand, combines each binding - * set in turn with the right operand to produce an "asBound" predicate, and - * then executes a nested indexed subquery against that asBound predicate, - * writing out a new binding set for each element returned by the asBound - * predicate which satisfies the join constraint. + * accepts chunks of binding sets from its operand, combines each binding set in + * turn with its {@link IPredicate} annotation to produce an "asBound" + * predicate, and then executes a nested indexed subquery against that asBound + * predicate, writing out a new binding set for each element returned by the + * asBound predicate which satisfies the join constraint. * <p> * Note: In order to support pipelining, query plans need to be arranged in a - * "left-deep" manner and there may not be intervening operators between the - * pipeline join operator and the {@link IPredicate} on which it will read. + * "left-deep" manner. * <p> * Note: In scale-out, the {@link PipelineJoin} is generally annotated as a * {@link BOpEvaluationContext#SHARDED} or {@link BOpEvaluationContext#HASHED} @@ -113,6 +112,12 @@ public interface Annotations extends PipelineOp.Annotations { + /** + * The {@link IPredicate} which is used to generate the + * {@link IAccessPath}s during the join. + */ + String PREDICATE = PipelineJoin.class.getName() + ".predicate"; + /** * An optional {@link IVariable}[] identifying the variables to be * retained in the {@link IBindingSet}s written out by the operator. @@ -249,7 +254,7 @@ * @param args * @param annotations */ - public PipelineJoin(final BOp[] args, NV[] annotations) { + public PipelineJoin(final BOp[] args, NV... annotations) { this(args, NV.asMap(annotations)); @@ -265,37 +270,17 @@ super(args, annotations); - if (arity() != 2) + if (arity() != 1) throw new IllegalArgumentException(); if (left() == null) throw new IllegalArgumentException(); - if (right() == null) - throw new IllegalArgumentException(); - } - /** - * @param left - * The left operand, which must be an {@link IBindingSet} - * pipeline operator, such as another {@link PipelineJoin}. - * @param right - * The right operand, which must be an {@link IPredicate}. - * - * @param annotations - */ - public PipelineJoin(final PipelineOp left, - final IPredicate<?> right, final Map<String, Object> annotations) { - - this(new BOp[] { left, right }, annotations); - - } - - /** - * The left hand operator, which is the previous join in the pipeline join - * path. - */ + /** + * The sole operand, which is the previous join in the pipeline join path. + */ public PipelineOp left() { return (PipelineOp) get(0); @@ -303,28 +288,14 @@ } /** - * The right hand operator, which is the {@link IPredicate}. + * {@inheritDoc} + * + * @see Annotations#PREDICATE */ @SuppressWarnings("unchecked") - public IPredicate<E> right() { - - return (IPredicate<E>) get(1); - - } - - // /** - // * Returns {@link BOpEvaluationContext#SHARDED} - // */ - // @Override - // final public BOpEvaluationContext getEvaluationContext() { - // - // return BOpEvaluationContext.SHARDED; - // - // } - - public IPredicate<E> getPredicate() { + public IPredicate<E> getPredicate() { - return right(); + return (IPredicate<E>) getRequiredProperty(Annotations.PREDICATE); } @@ -408,7 +379,7 @@ final private Executor service; /** - * True iff the {@link #right} operand is an optional pattern (aka if + * True iff the {@link #predicate} operand is an optional pattern (aka if * this is a SPARQL style left join). */ final private boolean optional; @@ -420,18 +391,13 @@ */ final private IVariable<?>[] variablesToKeep; -// /** -// * The source for the binding sets. -// */ -// final BindingSetPipelineOp left; - /** * The source for the elements to be joined. */ - final private IPredicate<E> right; + final private IPredicate<E> predicate; /** - * The relation associated with the {@link #right} operand. + * The relation associated with the {@link #predicate} operand. */ final private IRelation<E> relation; @@ -519,10 +485,8 @@ if (context == null) throw new IllegalArgumentException(); -// this.fed = context.getFederation(); this.joinOp = joinOp; -// this.left = joinOp.left(); - this.right = joinOp.right(); + this.predicate = joinOp.getPredicate(); this.constraints = joinOp.constraints(); this.maxParallel = joinOp.getMaxParallel(); if (maxParallel > 0) { @@ -536,7 +500,7 @@ this.optional = joinOp.isOptional(); this.variablesToKeep = joinOp.variablesToKeep(); this.context = context; - this.relation = context.getRelation(right); + this.relation = context.getRelation(predicate); this.source = context.getSource(); this.sink = context.getSink(); this.sink2 = context.getSink2(); @@ -932,7 +896,7 @@ final IBindingSet bindingSet = chunk[0]; // constrain the predicate to the given bindings. - IPredicate<E> predicate = right.asBound(bindingSet); + IPredicate<E> asBound = predicate.asBound(bindingSet); if (partitionId != -1) { @@ -947,11 +911,11 @@ * for an index partition. */ - predicate = predicate.setPartitionId(partitionId); + asBound = asBound.setPartitionId(partitionId); } - new JoinTask.AccessPathTask(predicate, Arrays.asList(chunk)) + new JoinTask.AccessPathTask(asBound, Arrays.asList(chunk)) .call(); } @@ -986,7 +950,7 @@ halted(); // constrain the predicate to the given bindings. - IPredicate<E> predicate = right.asBound(bindingSet); + IPredicate<E> asBound = predicate.asBound(bindingSet); if (partitionId != -1) { @@ -1001,12 +965,12 @@ * for an index partition. */ - predicate = predicate.setPartitionId(partitionId); + asBound = asBound.setPartitionId(partitionId); } // lookup the asBound predicate in the map. - Collection<IBindingSet> values = map.get(predicate); + Collection<IBindingSet> values = map.get(asBound); if (values == null) { @@ -1019,7 +983,7 @@ values = new LinkedList<IBindingSet>(); - map.put(predicate, values); + map.put(asBound, values); } else { @@ -1793,7 +1757,7 @@ bset = bset.clone(); // propagate bindings from the visited element. - if (context.bind(right, constraints, e, bset)) { + if (context.bind(predicate, constraints, e, bset)) { // optionally strip off unnecessary variables. bset = variablesToKeep == null ? bset : bset Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -104,17 +104,17 @@ })); @SuppressWarnings("unchecked") - final PipelineOp join1Op = new PipelineJoin(startOp, pred1Op, - NV.asMap(new NV[] { new NV(Predicate.Annotations.BOP_ID, - joinId1),// - })); + final PipelineOp join1Op = new PipelineJoin(new BOp[] { startOp }, + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred1Op) // + ); @SuppressWarnings("unchecked") - final PipelineOp join2Op = new PipelineJoin(join1Op, pred2Op, - NV.asMap(new NV[] { new NV(Predicate.Annotations.BOP_ID, - joinId2),// - })); - + final PipelineOp join2Op = new PipelineJoin(new BOp[] { join1Op }, // + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op) // + ); + final PipelineOp queryPlan = join2Op; final Map<Integer,BOp> queryIndex = BOpUtility.getIndex(queryPlan); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -309,30 +309,29 @@ public void test_query_join1() throws Exception { final int startId = 1; - final int joinId = 2; - final int predId = 3; - final PipelineOp query = new PipelineJoin<E>( - // left - new StartOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, startId),// - new NV(SliceOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - })), - // right - new Predicate<E>(new IVariableOrConstant[] { - new Constant<String>("Mary"), Var.var("value") }, NV - .asMap(new NV[] {// - new NV(Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, predId),// - new NV(Predicate.Annotations.TIMESTAMP,ITx.READ_COMMITTED),// - })), - // join annotations - NV.asMap(new NV[] { // - new NV(Predicate.Annotations.BOP_ID, joinId),// - })// - ); + final int joinId = 2; + final int predId = 3; + final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<E> pred = new Predicate<E>(new IVariableOrConstant[] { + new Constant<String>("Mary"), Var.var("value") }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineOp query = new PipelineJoin<E>(new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, pred)); + // the expected solution. final IBindingSet[] expected = new IBindingSet[] {// new ArrayBindingSet(// @@ -434,15 +433,10 @@ ITx.READ_COMMITTED),// })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>( - startOp/* left */, predOp/* right */, - // join annotations - NV.asMap(new NV[] { // - new NV(Predicate.Annotations.BOP_ID, joinId),// -// new NV(PipelineOp.Annotations.CHUNK_CAPACITY, 1),// -// new NV(PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, 1),// - })// - ); + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp)); final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp }, // slice annotations @@ -868,13 +862,10 @@ ITx.READ_COMMITTED),// })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, - predOp/* right */, - // join annotations - NV.asMap(new NV[] { // - new NV(Predicate.Annotations.BOP_ID, joinId),// - })// - ); + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp)); final PipelineOp query = new SliceOp(new BOp[] { joinOp }, // slice annotations @@ -1002,18 +993,16 @@ // R.primaryKeyOrder),// })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, - predOp/* right */, - // join annotations - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId),// - // impose constraint on the join. - new NV(PipelineJoin.Annotations.CONSTRAINTS, - new IConstraint[] { new EQConstant(y, - new Constant<String>("Paul")) }),// - })// - ); - + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp),// + // impose constraint on the join. + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQConstant(y, + new Constant<String>("Paul")) })// + ); + final PipelineOp query = new SliceOp(new BOp[] { joinOp }, // slice annotations NV.asMap(new NV[] {// @@ -1170,19 +1159,17 @@ new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// })); - final PipelineOp join1Op = new PipelineJoin<E>(// - startOp, pred1Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId1),// - })); + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred1Op)); - final PipelineOp join2Op = new PipelineJoin<E>(// - join1Op, pred2Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId2),// - })); + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { join1Op },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)); - final PipelineOp query = join2Op; + final PipelineOp query = join2Op; // start the query. final UUID queryId = UUID.randomUUID(); @@ -1472,24 +1459,21 @@ })); final PipelineOp join1Op = new PipelineJoin<E>(// - startOp, pred1Op,// - NV.asMap(new NV[] {// + new BOp[]{startOp},// new NV(Predicate.Annotations.BOP_ID, joinId1),// - })); + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op)); - final PipelineOp join2Op = new PipelineJoin<E>(// - join1Op, pred2Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId2),// - // constraint x == z - new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ - new EQ(x,z) - }), - // join is optional. - new NV(PipelineJoin.Annotations.OPTIONAL,true),// - // optional target is the same as the default target. - new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// - })); + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { join1Op },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQ(x, z) }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL, true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId)); final PipelineOp sliceOp = new SliceOp(// new BOp[]{join2Op}, @@ -1843,18 +1827,16 @@ new NV(ConditionalRoutingOp.Annotations.CONDITION, condition), })); - final PipelineOp join1Op = new PipelineJoin<E>(// - cond, pred1Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId1),// - })); + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[] { cond }, // + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred1Op)); + + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { join1Op },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op)); - final PipelineOp join2Op = new PipelineJoin<E>(// - join1Op, pred2Op, // - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId2),// - })); - final PipelineOp sliceOp = new SliceOp(// new BOp[]{join2Op}, NV.asMap(new NV[] {// Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -544,17 +544,14 @@ new NV( Predicate.Annotations.REMOTE_ACCESS_PATH,false), })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, - predOp/* right */, - // join annotations - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - })// - ); - + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp),// + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED)); + final PipelineOp query = new SliceOp(new BOp[] { joinOp }, // slice annotations NV.asMap(new NV[] {// @@ -712,20 +709,17 @@ new NV( Predicate.Annotations.REMOTE_ACCESS_PATH,false), })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, - predOp/* right */, - // join annotations - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - // impose constraint on the join. - new NV(PipelineJoin.Annotations.CONSTRAINTS, - new IConstraint[] { new EQConstant(y, - new Constant<String>("Paul")) }),// - })// - ); + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp), + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + // impose constraint on the join. + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQConstant(y, + new Constant<String>("Paul")) })); final PipelineOp query = new SliceOp(new BOp[] { joinOp }, // slice annotations @@ -865,17 +859,14 @@ ITx.READ_COMMITTED),// })); - final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, - predOp/* right */, - // join annotations - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - })// - ); - + final PipelineJoin<E> joinOp = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp),// + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED)); + final PipelineOp query = new SliceOp(new BOp[] { joinOp }, // slice annotations NV.asMap(new NV[] {// @@ -1030,22 +1021,20 @@ })); final PipelineOp join1Op = new PipelineJoin<E>(// - startOp, pred1Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId1),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - })); + new BOp[]{startOp},// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE,pred1Op),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED)); - final PipelineOp join2Op = new PipelineJoin<E>(// - join1Op, pred2Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId2),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - })); + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { join1Op },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op),// + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED)); final PipelineOp query = new SliceOp(new BOp[] { join2Op }, NV.asMap(new NV[] {// @@ -1228,31 +1217,28 @@ new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// })); - final PipelineOp join1Op = new PipelineJoin<E>(// - startOp, pred1Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId1),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - })); + final PipelineOp join1Op = new PipelineJoin<E>(// + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + new NV(PipelineJoin.Annotations.PREDICATE, pred1Op),// + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED)); - final PipelineOp join2Op = new PipelineJoin<E>(// - join1Op, pred2Op,// - NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, joinId2),// - // Note: shard-partitioned joins! - new NV( Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.SHARDED),// - // constraint x == z - new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ - new EQ(x,z) - }), - // join is optional. - new NV(PipelineJoin.Annotations.OPTIONAL,true),// - // optional target is the same as the default target. - new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// - })); + final PipelineOp join2Op = new PipelineJoin<E>(// + new BOp[] { join1Op },// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + new NV(PipelineJoin.Annotations.PREDICATE, pred2Op),// + // Note: shard-partitioned joins! + new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new EQ(x, z) }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL, true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF, sliceId)); final PipelineOp sliceOp = new SliceOp(// new BOp[]{join2Op}, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -171,27 +171,26 @@ final int startId = 1; final int joinId = 2; final int predId = 3; - final PipelineJoin<E> query = new PipelineJoin<E>( - // left - new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, startId),// - })), - // right - new Predicate<E>(new IVariableOrConstant[] { - new Constant<String>("Mary"), Var.var("x") }, NV - .asMap(new NV[] {// - new NV(Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, predId),// - new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// - })), - // join annotations - NV - .asMap(new NV[] { new NV(Predicate.Annotations.BOP_ID, - joinId),// - })// - ); + + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { + new Constant<String>("Mary"), Var.var("x") }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp)); + // the expected solutions. final IBindingSet[] expected = new IBindingSet[] {// new ArrayBindingSet(// @@ -260,29 +259,28 @@ final int startId = 1; final int joinId = 2; final int predId = 3; - final PipelineJoin<E> query = new PipelineJoin<E>( - // left - new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(BOpBase.Annotations.BOP_ID, startId),// - })), - // right - new Predicate<E>( - new IVariableOrConstant[] { new Constant<String>("Mary"), y },// - NV.asMap(new NV[] {// - new NV( - Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, - predId),// - new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// - })), - // join annotations + + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(BOpBase.Annotations.BOP_ID, startId),// + })); + + final Predicate<E> predOp = new Predicate<E>( + new IVariableOrConstant[] { new Constant<String>("Mary"), y },// NV.asMap(new NV[] {// - new NV(BOpBase.Annotations.BOP_ID, joinId),// - new NV( PipelineJoin.Annotations.CONSTRAINTS, - new IConstraint[] { new INBinarySearch<String>( - y, set) }) })// - ); + new NV( + Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, + predId),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(BOpBase.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp),// + new NV( PipelineJoin.Annotations.CONSTRAINTS, + new IConstraint[] { new INBinarySearch<String>(y, set) })); // the expected solution (just one). final IBindingSet[] expected = new IBindingSet[] {// @@ -352,32 +350,29 @@ final Var<String> y = Var.var("y"); final int startId = 1; - final int joinId = 2; - final int predId = 3; - final PipelineJoin<E> query = new PipelineJoin<E>( - // left - new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(BOpBase.Annotations.BOP_ID, startId),// - })), - // right - new Predicate<E>( - new IVariableOrConstant[] { x, y },// - NV.asMap(new NV[] {// - new NV( - Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, - predId),// - new NV(Predicate.Annotations.TIMESTAMP, - ITx.READ_COMMITTED),// - })), - // join annotations - NV.asMap(new NV[] {// - new NV(BOpBase.Annotations.BOP_ID, joinId),// - new NV(PipelineJoin.Annotations.SELECT,new IVariable[]{y})// - })// - ); + final int joinId = 2; + final int predId = 3; + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(BOpBase.Annotations.BOP_ID, startId),// + })); + + final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { + x, y },// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(BOpBase.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp),// + new NV(PipelineJoin.Annotations.SELECT, new IVariable[] { y })); + /* * The expected solutions. */ @@ -458,31 +453,25 @@ final int joinId = 2; final int predId = 3; - final PipelineJoin<E> query = new PipelineJoin<E>( - // left - new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, startId),// - })), - // right - new Predicate<E>(new IVariableOrConstant[] { - new Constant<String>("Mary"), x }, NV - .asMap(new NV[] {// - new NV(Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, predId),// - new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// - })), - // join annotations - NV - .asMap(new NV[] { // - new NV(BOpBase.Annotations.BOP_ID, - joinId), - new NV(PipelineJoin.Annotations.OPTIONAL, - Boolean.TRUE),// -// - })// - ); + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + final Predicate<E> pred = new Predicate<E>(new IVariableOrConstant[] { + new Constant<String>("Mary"), x }, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp }, // + new NV(BOpBase.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, pred),// + new NV(PipelineJoin.Annotations.OPTIONAL, Boolean.TRUE)); + /* * Setup the source with two initial binding sets. One has nothing bound * and will join with (Mary,x:=John) and (Mary,x:=Paul). The other has @@ -565,33 +554,27 @@ final int startId = 1; final int joinId = 2; - final int predId = 3; + final int predId = 3; - final PipelineJoin<E> query = new PipelineJoin<E>( - // left - new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, startId),// - })), - // right - new Predicate<E>(new IVariableOrConstant[] { - new Constant<String>("Mary"), x }, NV - .asMap(new NV[] {// - new NV(Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - new NV(Predicate.Annotations.BOP_ID, predId),// - new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// - })), - // join annotations - NV - .asMap(new NV[] { // - new NV(BOpBase.Annotations.BOP_ID, - joinId), - new NV(PipelineJoin.Annotations.OPTIONAL, - Boolean.TRUE),// -// - })// - ); + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + final Predicate<E> pred = new Predicate<E>(new IVariableOrConstant[] { + new Constant<String>("Mary"), x }, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(BOpBase.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, pred),// + new NV(PipelineJoin.Annotations.OPTIONAL, Boolean.TRUE)); + /* * Setup the source with two initial binding sets. One has nothing bound * and will join with (Mary,x:=John) and (Mary,x:=Paul). The other has Modified: branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java 2010-10-22 20:08:48 UTC (rev 3841) +++ branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java 2010-10-24 18:18:10 UTC (rev 3842) @@ -453,41 +453,45 @@ final int joinId = 2; final int predId = 3; final int sliceId = 4; - final PipelineOp query = - new SliceOp(new BOp[]{new PipelineJoin<E>( - // left - new StartOp(new BOp[] {}, NV.asMap(new NV[] {// - new NV(Predicate.Annotations.BOP_ID, startId),// - new NV(SliceOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - })), - // right - new Predicate<E>(new IVariableOrConstant[] { - new Constant<String>("Mary"), Var.var("value") }, NV - .asMap(new NV[] {// - new NV(Predicate.Annotations.RELATION_NAME, - new String[] { namespace }),// - // Note: local access path! - new NV( Predicate.Annotations.REMOTE_ACCESS_PATH,false), - new NV(Predicate.Annotations.BOP_ID, predId),// - new NV... [truncated message content] |