From: <tho...@us...> - 2011-06-22 19:54:27
|
Revision: 4772 http://bigdata.svn.sourceforge.net/bigdata/?rev=4772&view=rev Author: thompsonbry Date: 2011-06-22 19:54:20 +0000 (Wed, 22 Jun 2011) Log Message: ----------- Replaced the use of SliceOp, which forces all solutions to be serialized, with EndOp. SliceOp SHOULD be used where bigdata imposes a native restriction on the OFFSET and LIMIT of the solutions delivered to the application. EndOp is preferred when solutions must be materialized on the query controller in scale-out if the OFFSET and LIMIT are not constrained. See https://sourceforge.net/apps/trac/bigdata/ticket/227 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/EndOp.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/EndOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/EndOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/EndOp.java 2011-06-22 19:54:20 UTC (rev 4772) @@ -0,0 +1,121 @@ +package com.bigdata.bop.bset; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * A operator which may be used at the end of query pipelines when there is a + * requirement marshal solutions back to the query controller by no requirement + * to {@link SliceOp slice} solutions. The primary use case for {@link EndOp} is + * when it is evaluated on the query controller so the results will be streamed + * back to the query controller in scale-out. You MUST specify + * {@link BOp.Annotations#EVALUATION_CONTEXT} as + * {@link BOpEvaluationContext#CONTROLLER} when it is to be used for this + * purpose. + * + * FIXME This is hacked to extend {@link SliceOp} instead as that appears to be + * necessary due to a persistent bug. + * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/227 + */ +public class EndOp extends PipelineOp {//SliceOp {//CopyOp { + +// private static final Logger log = Logger.getLogger(EndOp.class); + + /** + * + */ + private static final long serialVersionUID = 1L; + + public EndOp(EndOp op) { + super(op); + } + + public EndOp(BOp[] args, Map<String, Object> annotations) { + +// super(args, ensureSharedState(annotations)); + super(args, annotations); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + + } + +// static private Map<String, Object> ensureSharedState( +// Map<String, Object> annotations) { +// +// annotations.put(PipelineOp.Annotations.SHARED_STATE, true); +// +// return annotations; +// +// } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new OpTask(this, context)); + + } + + /** + * Copy the source to the sink or the alternative sink depending on the + * condition. + */ + static private class OpTask implements Callable<Void> { + + private final PipelineOp op; + + private final BOpContext<IBindingSet> context; + + OpTask(final PipelineOp op, final BOpContext<IBindingSet> context) { + + this.op = op; + + this.context = context; + + } + + public Void call() throws Exception { + + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + +// boolean didRun = false; + + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + sink.add(chunk); + +// didRun = true; + + } + +// if(didRun) +// sink.flush(); + + return null; + + } + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2011-06-22 18:06:38 UTC (rev 4771) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2011-06-22 19:54:20 UTC (rev 4772) @@ -3,10 +3,19 @@ import java.util.Map; import com.bigdata.bop.BOp; +import com.bigdata.bop.PipelineOp; /** * A version of {@link CopyOp} which is always evaluated on the query * controller. + * <p> + * Note: {@link CopyOp} and {@link StartOp} are the same. {@link StartOp} exists + * solely to reflect its functional role at the end of the query pipeline. + * <p> + * Note: {@link StartOp} is generally NOT required in a query plan. It is more + * of a historical artifact than something that we actually need to have in the + * query pipeline. It is perfectly possible to have the query pipeline begin + * with any of the {@link PipelineOp pipeline operators}. */ public class StartOp extends CopyOp { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java 2011-06-22 18:06:38 UTC (rev 4771) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java 2011-06-22 19:54:20 UTC (rev 4772) @@ -21,10 +21,10 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.bset.EndOp; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.joinGraph.rto.JoinGraph; import com.bigdata.bop.solutions.DistinctBindingSetOp; -import com.bigdata.bop.solutions.SliceOp; /** * Class accepts a join group and partitions it into a join graph and a tail @@ -1135,14 +1135,16 @@ * necessary? (It is causing runtime errors when not wrapped). * Is this a bopId collision which is not being detected? * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/227 + * * [This should perhaps be moved into the caller.] */ - lastOp = new SliceOp(new BOp[] { lastOp }, NV + lastOp = new EndOp(new BOp[] { lastOp }, NV .asMap(new NV[] { new NV(JoinGraph.Annotations.BOP_ID, idFactory.nextId()), // new NV(JoinGraph.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - new NV(PipelineOp.Annotations.SHARED_STATE,true),// + BOpEvaluationContext.CONTROLLER)// +// new NV(PipelineOp.Annotations.SHARED_STATE,true),// }) // ); Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-06-22 18:06:38 UTC (rev 4771) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-06-22 19:54:20 UTC (rev 4772) @@ -65,6 +65,7 @@ import com.bigdata.bop.ap.filter.DistinctFilter; import com.bigdata.bop.bindingSet.HashBindingSet; import com.bigdata.bop.bset.ConditionalRoutingOp; +import com.bigdata.bop.bset.EndOp; import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.controller.AbstractSubqueryOp; import com.bigdata.bop.controller.Steps; @@ -87,12 +88,12 @@ import com.bigdata.rdf.internal.TermId; import com.bigdata.rdf.internal.VTE; import com.bigdata.rdf.internal.constraints.INeedsMaterialization; -import com.bigdata.rdf.internal.constraints.INeedsMaterialization.Requirement; import com.bigdata.rdf.internal.constraints.IsInlineBOp; import com.bigdata.rdf.internal.constraints.IsMaterializedBOp; import com.bigdata.rdf.internal.constraints.NeedsMaterializationBOp; import com.bigdata.rdf.internal.constraints.SPARQLConstraint; import com.bigdata.rdf.internal.constraints.TryBeforeMaterializationConstraint; +import com.bigdata.rdf.internal.constraints.INeedsMaterialization.Requirement; import com.bigdata.rdf.lexicon.LexPredicate; import com.bigdata.rdf.spo.DefaultGraphSolutionExpander; import com.bigdata.rdf.spo.ISPO; @@ -266,12 +267,12 @@ * controller so the results will be streamed back to the query * controller in scale-out. */ - tmp = new SliceOp(new BOp[] { tmp }, NV.asMap(// + tmp = new EndOp(new BOp[] { tmp }, NV.asMap(// new NV(BOp.Annotations.BOP_ID, idFactory .incrementAndGet()), // new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - new NV(PipelineOp.Annotations.SHARED_STATE,true)// + BOpEvaluationContext.CONTROLLER)// +// new NV(PipelineOp.Annotations.SHARED_STATE,true)// )); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-06-22 18:06:38 UTC (rev 4771) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-06-22 19:54:20 UTC (rev 4772) @@ -54,6 +54,8 @@ import com.bigdata.bop.PipelineOp; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.bset.ConditionalRoutingOp; +import com.bigdata.bop.bset.CopyOp; +import com.bigdata.bop.bset.EndOp; import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.controller.SubqueryHashJoinOp; import com.bigdata.bop.controller.SubqueryOp; @@ -390,12 +392,12 @@ * with SliceOp which interactions with SubqueryOp to allow * incorrect termination under some circumstances. */ - left = new SliceOp(new BOp[] { left }, NV.asMap(// + left = new EndOp(new BOp[] { left }, NV.asMap(// new NV(BOp.Annotations.BOP_ID, idFactory .incrementAndGet()), // new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - new NV(PipelineOp.Annotations.SHARED_STATE, true)// + BOpEvaluationContext.CONTROLLER)// +// new NV(PipelineOp.Annotations.SHARED_STATE, true)// )); } @@ -781,11 +783,11 @@ } - final PipelineOp slice = new SliceOp(new BOp[] { left }, NV.asMap(// + final PipelineOp slice = new EndOp(new BOp[] { left }, NV.asMap(// new NV(BOp.Annotations.BOP_ID, idFactory.incrementAndGet()), // new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - new NV(PipelineOp.Annotations.SHARED_STATE, true)// + BOpEvaluationContext.CONTROLLER)// +// new NV(PipelineOp.Annotations.SHARED_STATE, true)// )); return slice; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |