From: <tho...@us...> - 2013-12-23 22:45:44
|
Revision: 7691 http://bigdata.svn.sourceforge.net/bigdata/?rev=7691&view=rev Author: thompsonbry Date: 2013-12-23 22:45:36 +0000 (Mon, 23 Dec 2013) Log Message: ----------- Partial integration of the RTO for SPARQL. For the moment, I am only targetting simple join groups with filters that do not require materialization of variable bindings. Once this is working, we can look into how to handle more of SPARQL. The RTO is currently turned on through a query hint. For example: {{{ PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> SELECT ?x ?y ?z WHERE { hint:Group hint:optimizer "Runtime". ?x a ub:Student . # v0 ?y a ub:Faculty . # v1 ?z a ub:Course . # v2 ?x ub:advisor ?y . # v3 ?y ub:teacherOf ?z . # v4 ?x ub:takesCourse ?z . # v5 } limit 1 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JoinGraph.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JoinGraph.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JoinGraph.java 2013-12-23 22:42:23 UTC (rev 7690) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JoinGraph.java 2013-12-23 22:45:36 UTC (rev 7691) @@ -228,6 +228,7 @@ } + @Override public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { return new FutureTask<Void>(new JoinGraphTask(context)); @@ -278,12 +279,17 @@ } - /** - * {@inheritDoc} - * - * - * TODO where to handle DISTINCT, ORDER BY, GROUP BY for join graph? - */ + /** + * {@inheritDoc} + * + * + * TODO where to handle DISTINCT, ORDER BY, GROUP BY for join graph? + * + * FIXME When run as sub-query, we need to fix point the upstream + * solutions and then flood them into the join graph. Samples of the + * known bound variables can be pulled from those initial solutions. + */ + @Override public Void call() throws Exception { // Create the join graph. Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java 2013-12-23 22:45:36 UTC (rev 7691) @@ -0,0 +1,259 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Dec 23, 2013 + */ +package com.bigdata.rdf.sparql.ast.eval; + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.ap.Predicate; +import com.bigdata.bop.joinGraph.rto.JGraph; +import com.bigdata.bop.joinGraph.rto.JoinGraph; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.internal.constraints.INeedsMaterialization; +import com.bigdata.rdf.sparql.ast.IGroupMemberNode; +import com.bigdata.rdf.sparql.ast.JoinGroupNode; +import com.bigdata.rdf.sparql.ast.StatementPatternNode; +import com.bigdata.rdf.sparql.ast.StaticAnalysis; + +/** + * Integration with the Runtime Optimizer (RTO). + * + * TODO The initial integration aims to run only queries that are simple join + * groups with filters. Once we have this integrated so that it can be enabled + * with a query hint, then we can look into handling subgroups, materialization, + * etc. Even handling filters will be somewhat tricky due to the requirement for + * conditional materialization of variable bindings in advance of certain + * {@link IValueExpression} depending on the {@link INeedsMaterialization} + * interface. Therefore, the place to start is with simple join groups and + * filters whose {@link IValueExpression}s do not require materialization. + * + * TODO We need a way to inspect the RTO behavior. It will get logged, but it + * would be nice to attach it to the query plan. Likewise, it would be nice to + * surface this to the caller so the RTO can be used to guide query construction + * UIs. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/64">Runtime + * Query Optimization</a> + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/258">Integrate + * RTO into SAIL</a> + * @see <a + * href="http://www-db.informatik.uni-tuebingen.de/files/research/pathfinder/publications/rox-demo.pdf"> + * ROX </a> + * @see JoinGraph + * @see JGraph + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class AST2BOpRTO extends AST2BOpJoins { + + /** + * Inspect the remainder of the join group. If we can isolate a join graph + * and filters, then we will push them down into an RTO JoinGroup. Since the + * joins have already been ordered by the static optimizer, we can accept + * them in sequence along with any attachable filters and (if we get at + * least 3 predicates) push them down into an RTO join group. + * <p> + * Note: Two predicates in a join group is not enough for the RTO to provide + * a different join ordering. Both the static optimizer and the RTO will + * always choose the AP with the smaller cardinality to run first. If there + * are only 2 predicates, then the other predicate will run second. You need + * at least three predicates before the RTO could provide a different + * answer. + */ + static protected PipelineOp convertRTOJoinGraph(PipelineOp left, + final JoinGroupNode joinGroup, final Set<IVariable<?>> doneSet, + final AST2BOpContext ctx, final AtomicInteger start) { + + final int arity = joinGroup.arity(); + + // The predicates for the RTO join group. + final Set<StatementPatternNode> sps = new LinkedHashSet<StatementPatternNode>(); + @SuppressWarnings("rawtypes") + final Set<Predicate> preds = new LinkedHashSet<Predicate>(); + final List<IConstraint> constraints = new LinkedList<IConstraint>(); + + // Examine the remaining joins, stopping at the first non-SP. + for (int i = start.get(); i < arity; i++) { + + final IGroupMemberNode child = (IGroupMemberNode) joinGroup + .get(i); + + if (child instanceof StatementPatternNode) { + // SP + final StatementPatternNode sp = (StatementPatternNode) child; + final boolean optional = sp.isOptional(); + if(optional) { + // TODO Handle optional SPs in joinGraph. + break; + } + + final List<IConstraint> attachedConstraints = getJoinConstraints(sp); + + @SuppressWarnings("rawtypes") + final Map<IConstraint, Set<IVariable<IV>>> needsMaterialization = + new LinkedHashMap<IConstraint, Set<IVariable<IV>>>(); + + getJoinConstraints(attachedConstraints, needsMaterialization); + + if (!needsMaterialization.isEmpty()) { + /* + * At least one variable requires (or might require) + * materialization. This is not currently handled by + * the RTO so we break out of the loop. + * + * TODO Handle materialization patterns within the RTO. + */ + break; + } + +// // Add constraints to the join for that predicate. +// anns.add(new NV(JoinAnnotations.CONSTRAINTS, getJoinConstraints( +// constraints, needsMaterialization))); + +// /* +// * Pull off annotations before we clear them from the predicate. +// */ +// final Scope scope = (Scope) pred.getProperty(Annotations.SCOPE); +// +// // true iff this is a quads access path. +// final boolean quads = pred.getProperty(Annotations.QUADS, +// Annotations.DEFAULT_QUADS); +// +// // pull of the Sesame dataset before we strip the annotations. +// final DatasetNode dataset = (DatasetNode) pred +// .getProperty(Annotations.DATASET); + + // Something the RTO can handle. + sps.add(sp); + /* + * TODO Assign predId? + * + * FIXME Handle Triples vs Quads, Default vs Named Graph, and + * DataSet. This probably means pushing more logic down into + * the RTO from AST2BOpJoins. + */ + final Predicate<?> pred = AST2BOpUtility.toPredicate(sp, ctx); +// final int joinId = ctx.nextId(); +// +// // annotations for this join. +// final List<NV> anns = new LinkedList<NV>(); +// +// anns.add(new NV(BOp.Annotations.BOP_ID, joinId)); + preds.add(pred); + if (attachedConstraints != null) { + // RTO will figure out where to attach these constraints. + constraints.addAll(attachedConstraints); + } + + } else { + // Non-SP. + break; + } + + } + + if (sps.size() < 3) { + + /* + * There are not enough joins for the RTO. + * + * TODO For incremental query construction UIs, it would be useful + * to run just the RTO and to run it with even a single join. This + * will give us sample values as well as estimates cardinalities. If + * the UI has triple patterns that do not join (yet), then those + * should be grouped. + */ + return left; + + } + + /* + * Figure out which variables are projected out of the RTO. + * + * TODO This should only include things that are not reused later in the + * query. + */ + final Set<IVariable<?>> selectVars = new LinkedHashSet<IVariable<?>>(); + { + + for (StatementPatternNode sp : sps) { + + // Note: recursive only matters for complex nodes, not SPs. + ctx.sa.getDefinitelyProducedBindings(sp, selectVars, true/* recursive */); + + } + + } + + /* + * FIXME When running the RTO as anything other than the top-level join + * group in the query plan and for the *FIRST* joins in the query plan, + * we need to flow in any solutions that are already in the pipeline + * (unless we are going to run the RTO "bottom up") and build a hash + * index. When the hash index is ready, we can execute the join group. + */ + left = new JoinGraph(leftOrEmpty(left),// + new NV(BOp.Annotations.BOP_ID, ctx.nextId()),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(BOp.Annotations.CONTROLLER, true),// TODO DROP the "CONTROLLER" annotation. The concept is not required. + // new NV(PipelineOp.Annotations.MAX_PARALLEL, 1),// + // new NV(PipelineOp.Annotations.LAST_PASS, true),// required + new NV(JoinGraph.Annotations.SELECTED, selectVars + .toArray(new IVariable[selectVars.size()])),// + new NV(JoinGraph.Annotations.VERTICES, + preds.toArray(new Predicate[preds.size()])),// + new NV(JoinGraph.Annotations.CONSTRAINTS, constraints + .toArray(new IConstraint[constraints.size()])),// + new NV(JoinGraph.Annotations.LIMIT, + JoinGraph.Annotations.DEFAULT_LIMIT),// + new NV(JoinGraph.Annotations.NEDGES, + JoinGraph.Annotations.DEFAULT_NEDGES),// + new NV(JoinGraph.Annotations.SAMPLE_TYPE, + JoinGraph.Annotations.DEFAULT_SAMPLE_TYPE)// + ); + + // These joins were consumed. + start.addAndGet(sps.size()); + + return left; + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2013-12-23 22:42:23 UTC (rev 7690) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2013-12-23 22:45:36 UTC (rev 7691) @@ -123,6 +123,7 @@ import com.bigdata.rdf.sparql.ast.ProjectionNode; import com.bigdata.rdf.sparql.ast.QueryBase; import com.bigdata.rdf.sparql.ast.QueryHints; +import com.bigdata.rdf.sparql.ast.QueryOptimizerEnum; import com.bigdata.rdf.sparql.ast.QueryRoot; import com.bigdata.rdf.sparql.ast.RangeNode; import com.bigdata.rdf.sparql.ast.SliceNode; @@ -165,7 +166,7 @@ * >Query Evaluation</a>. * */ -public class AST2BOpUtility extends AST2BOpJoins { +public class AST2BOpUtility extends AST2BOpRTO { private static final transient Logger log = Logger .getLogger(AST2BOpUtility.class); @@ -176,9 +177,8 @@ * <p> * <strong>NOTE:</strong> This is the entry for {@link ASTEvalHelper}. Do * NOT use this entry point directly. It will evolve when we integrate the - * RTO and/or the BindingsClause of the SPARQL 1.1 Federation extension. - * Applications should use the public entry points on {@link ASTEvalHelper} - * rather that this entry point. + * RTO. Applications should use public entry points on {@link ASTEvalHelper} + * instead. * * @param ctx * The evaluation context. @@ -191,15 +191,15 @@ * TODO We could handle the IBindingSet[] by stuffing the data into * a named solution set during the query rewrite and attaching that * named solution set to the AST. This could allow for very large - * solution sets to be passed into a query. Any such change would + * solution sets to be passed into a query. Any such change would * have to be deeply integrated with the SPARQL parser in order to * provide any benefit for the Java heap. - * - * TODO This logic is currently single-threaded. If we allow internal - * concurrency or when we integrate the RTO, we will need to ensure that - * the logic remains safely cancelable by an interrupt of the thread in - * which the query was submitted. See <a - * href="https://sourceforge.net/apps/trac/bigdata/ticket/715" > + * + * TODO This logic is currently single-threaded. If we allow + * internal concurrency or when we integrate the RTO, we will need + * to ensure that the logic remains safely cancelable by an + * interrupt of the thread in which the query was submitted. See <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/715" > * Interrupt of thread submitting a query for evaluation does not * always terminate the AbstractRunningQuery </a>. */ @@ -2506,6 +2506,26 @@ left = doMergeJoin(left, joinGroup, doneSet, start, ctx); } + + if (joinGroup.getProperty(QueryHints.OPTIMIZER, + QueryOptimizerEnum.Static).equals(QueryOptimizerEnum.Runtime)) { + + /* + * Inspect the remainder of the join group. If we can isolate a join + * graph and filters, then we will push them down into an RTO + * JoinGroup. Since the joins have already been ordered by the + * static optimizer, we can accept them in sequence along with any + * attachable filters. + */ + + left = convertRTOJoinGraph(left, joinGroup, doneSet, ctx, start); + + /* + * Fall through. Anything not handled in this section will be + * handled as part of normal join group processing below. + */ + + } /* * Translate the remainder of the group. @@ -2539,12 +2559,12 @@ sp.getQueryHints()); continue; } else if (child instanceof ArbitraryLengthPathNode) { - @SuppressWarnings("unchecked") +// @SuppressWarnings("unchecked") final ArbitraryLengthPathNode alpNode = (ArbitraryLengthPathNode) child; left = convertArbitraryLengthPath(left, alpNode, doneSet, ctx); continue; } else if (child instanceof ZeroLengthPathNode) { - @SuppressWarnings("unchecked") +// @SuppressWarnings("unchecked") final ZeroLengthPathNode zlpNode = (ZeroLengthPathNode) child; left = convertZeroLengthPath(left, zlpNode, doneSet, ctx); continue; @@ -2588,7 +2608,7 @@ } continue; } else if (child instanceof UnionNode) { - @SuppressWarnings("unchecked") +// @SuppressWarnings("unchecked") final UnionNode unionNode = (UnionNode) child; left = convertUnion(left, unionNode, doneSet, ctx); continue; @@ -3955,7 +3975,7 @@ * DataSetJoin with an "inline" access path.) */ @SuppressWarnings("rawtypes") - private static final Predicate toPredicate(final StatementPatternNode sp, + protected static final Predicate toPredicate(final StatementPatternNode sp, final AST2BOpContext ctx) { final QueryRoot query = ctx.astContainer.getOptimizedAST(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |