From: <tho...@us...> - 2014-01-13 16:27:53
|
Revision: 7783 http://bigdata.svn.sourceforge.net/bigdata/?rev=7783&view=rev Author: thompsonbry Date: 2014-01-13 16:27:44 +0000 (Mon, 13 Jan 2014) Log Message: ----------- Bug fix for non-conditional materialization of SOMETIMES or ALWAYS variables when evaluating constraints for cutoff joins with the RTO. The RTO now relies on non-conditional chunked materialization. The RTO can still fail due to reordering of solutions if the check for reordering is enabled. I am not sure what is the root cause for this. At this point, it is possible that it is the query engine rather than the query plan. See #64 (RTO). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpFilters.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpJoins.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpFilters.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpFilters.java 2014-01-13 15:54:15 UTC (rev 7782) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpFilters.java 2014-01-13 16:27:44 UTC (rev 7783) @@ -207,7 +207,7 @@ PipelineOp left,// final int rightId, // final IValueExpression<IV> ve,// - final Collection<IVariable<IV>> vars, // + final Set<IVariable<IV>> vars, // final Properties queryHints, final AST2BOpContext ctx) { @@ -269,17 +269,17 @@ * @param ctx * The evaluation context. * - * @return The final bop added to the pipeline by this method + * @return The final bop added to the pipeline by this method. If there are + * no variables that require materialization, then this just returns + * <i>left</i>. * * @see TryBeforeMaterializationConstraint - * - * TODO make [vars] a Set. */ @SuppressWarnings("rawtypes") protected static PipelineOp addMaterializationSteps2(// PipelineOp left,// final int rightId, // - final Collection<IVariable<IV>> vars,// + final Set<IVariable<IV>> vars,// final Properties queryHints, // final AST2BOpContext ctx) { @@ -288,32 +288,38 @@ if (nvars == 0) return left; - final long timestamp = ctx.getLexiconReadTimestamp(); + if (nvars >= 1) { - final String ns = ctx.getLexiconNamespace(); - - if (nvars >= 1) { /* - * Use a pipeline operator which uses the chunked materialization - * pattern for solution sets. This is similar to the pattern which - * is used when the IVs in the final solutions are materialized as - * RDF Values. + * Materializes multiple variables at once. * - * TODO We should drop the more complicated materialization pipeline - * logic unless a performance advantage can be demonstrated either - * on a Journal or a cluster. + * Note: This code path does not reorder the solutions (no + * conditional routing). */ - return (PipelineOp) applyQueryHints(new ChunkedMaterializationOp(leftOrEmpty(left), - new NV(ChunkedMaterializationOp.Annotations.VARS, vars.toArray(new IVariable[nvars])),// - new NV(ChunkedMaterializationOp.Annotations.RELATION_NAME, new String[] { ns }), // - new NV(ChunkedMaterializationOp.Annotations.TIMESTAMP, timestamp), // - new NV(PipelineOp.Annotations.SHARED_STATE, !ctx.isCluster()),// live stats, but not on the cluster. - new NV(BOp.Annotations.BOP_ID, ctx.nextId())// - ), queryHints, ctx); -// vars.toArray(new IVariable[nvars]), ns, timestamp) -// .setProperty(BOp.Annotations.BOP_ID, ctx.nextId()); + + return addChunkedMaterializationStep( + left, + vars, + ChunkedMaterializationOp.Annotations.DEFAULT_MATERIALIZE_INLINE_IVS, + null, // cutoffLimit + queryHints, ctx); + } + /* + * Materialize a single variable. + * + * Note: This code path can reorder the solutions (it uses conditional + * routing). + * + * TODO We should drop the more complicated materialization pipeline + * logic unless a performance advantage can be demonstrated either on a + * Journal or a cluster. + */ + final long timestamp = ctx.getLexiconReadTimestamp(); + + final String ns = ctx.getLexiconNamespace(); + final Iterator<IVariable<IV>> it = vars.iterator(); int firstId = ctx.nextId(); @@ -462,6 +468,74 @@ } + /** + * Use a pipeline operator which uses the chunked materialization pattern + * for solution sets. This is similar to the pattern which is used when the + * IVs in the final solutions are materialized as RDF Values. + * <p> + * Note: The RTO uses this method since it does not use conditional routing + * and does not reorder the solutions. + * + * @param left + * The left (upstream) operator that immediately proceeds the + * materialization steps. + * @param vars + * The terms to materialize. + * @param materializeInlineIvs + * When <code>true</code>, inline IVs are also materialized. + * @param queryHints + * The query hints from the dominating AST node. + * @param ctx + * The evaluation context. + * + * @return The final bop added to the pipeline by this method. If there are + * no variables that require materialization, then this just returns + * <i>left</i>. + * + * @see ChunkedMaterializationOp + */ + protected static PipelineOp addChunkedMaterializationStep(// + PipelineOp left,// + final Set<IVariable<IV>> vars,// + final boolean materializeInlineIvs,// + final Long cutoffLimit,// + final Properties queryHints,// + final AST2BOpContext ctx// + ) { + + final int nvars = vars.size(); + + if (nvars == 0) + return left; + + final long timestamp = ctx.getLexiconReadTimestamp(); + + final String ns = ctx.getLexiconNamespace(); + + /* + * If we are doing cutoff join evaluation, then limit the parallelism of + * this operator to prevent reordering of solutions. + * + * TODO If query hints are allowed to override MAX_PARALLEL and this is + * being invoked for cutoff join evaluation, then that will break the + * "no reordering" guarantee. + */ + + final int maxParallel = cutoffLimit != null ? 1 + : PipelineOp.Annotations.DEFAULT_MAX_PARALLEL; + + return (PipelineOp) applyQueryHints(new ChunkedMaterializationOp(leftOrEmpty(left), + new NV(ChunkedMaterializationOp.Annotations.VARS, vars.toArray(new IVariable[nvars])),// + new NV(ChunkedMaterializationOp.Annotations.RELATION_NAME, new String[] { ns }), // + new NV(ChunkedMaterializationOp.Annotations.TIMESTAMP, timestamp), // + new NV(ChunkedMaterializationOp.Annotations.MATERIALIZE_INLINE_IVS, materializeInlineIvs), // + new NV(PipelineOp.Annotations.SHARED_STATE, !ctx.isCluster()),// live stats, but not on the cluster. + new NV(PipelineOp.Annotations.MAX_PARALLEL,maxParallel),// + new NV(BOp.Annotations.BOP_ID, ctx.nextId())// + ), queryHints, ctx); + + } + // /** // * Wrapper for handling the {@link AST2BOpContext} / {@link BOpContextBase} // * API mismatch. @@ -483,7 +557,7 @@ // ctx.queryEngine), queryHints); // // } - + /** * For each filter which requires materialization steps, add the * materializations steps to the pipeline and then add the filter to the @@ -491,7 +565,10 @@ * * @param left * @param doneSet - * The set of variables already known to be materialized. + * The set of variables already known to be materialized. This is + * populated as a side-effect with any variables that will be + * materialized by the materialization steps added by this + * method. * @param needsMaterialization * A map of constraints and their variable materialization * requirements. @@ -499,6 +576,16 @@ * Query hints from the dominating AST node. * @param ctx * The evaluation context. + * + * TODO This treats each filter in turn rather than handling all + * variable materializations for all filters at once. Is this + * deliberate? If so, maybe we should pay attention to the order + * of the filters. I.e., those constraints should be ordered + * based on an expectation that they can reduce the total work by + * first eliminating solutions with less materialization effort + * (run constraints without materialization requirements before + * those with materialization requirements, run constraints that + * are more selective before others, etc.). */ @SuppressWarnings("rawtypes") protected static PipelineOp addMaterializationSteps3(// @@ -509,72 +596,148 @@ final AST2BOpContext ctx// ) { - if (!needsMaterialization.isEmpty()) { + if (needsMaterialization.isEmpty()) { + // Nothing to do. + return left; + } - final Set<IVariable<?>> alreadyMaterialized = doneSet; + final Set<IVariable<?>> alreadyMaterialized = doneSet; - for (Map.Entry<IConstraint, Set<IVariable<IV>>> e : - needsMaterialization.entrySet()) { + for (Map.Entry<IConstraint, Set<IVariable<IV>>> e : + needsMaterialization.entrySet()) { - final IConstraint c = e.getKey(); + // The constraint. + final IConstraint c = e.getKey(); + + // The set of variables associated with that constraint. + final Set<IVariable<IV>> terms = e.getValue(); + + // remove any terms already materialized + terms.removeAll(alreadyMaterialized); + + if (c instanceof INeedsMaterialization + && ((INeedsMaterialization) c).getRequirement() == Requirement.ALWAYS) { + + // add any new terms to the list of already materialized + alreadyMaterialized.addAll(terms); - final Set<IVariable<IV>> terms = e.getValue(); + } - // remove any terms already materialized - terms.removeAll(alreadyMaterialized); + final int condId = ctx.nextId(); - if (c instanceof INeedsMaterialization - && ((INeedsMaterialization) c).getRequirement() == Requirement.ALWAYS) { + // we might have already materialized everything we need + if (!terms.isEmpty()) { - // add any new terms to the list of already materialized - alreadyMaterialized.addAll(terms); - - } + // Add materialization steps for remaining variables. - final int condId = ctx.nextId(); + @SuppressWarnings("unchecked") + final IValueExpression<IV> ve = (IValueExpression) c.get(0); - // we might have already materialized everything we need - if (!terms.isEmpty()) { + left = addMaterializationSteps1(// + left, // + condId, // right + ve, // value expression + terms,// varsToMaterialize, + queryHints,// + ctx); + + } - // Add materialization steps for remaining variables. + left = applyQueryHints(// + new ConditionalRoutingOp(leftOrEmpty(left),// + new NV(BOp.Annotations.BOP_ID, condId),// + new NV(ConditionalRoutingOp.Annotations.CONDITION,c)// + ), queryHints, ctx); - @SuppressWarnings("unchecked") - final IValueExpression<IV> ve = (IValueExpression) c.get(0); + } - left = addMaterializationSteps1(// - left, // - condId, // right - ve, // value expression - terms,// varsToMaterialize, - queryHints,// - ctx); - -// left = addMaterializationSteps(// -// ctx,// -// left,// -// condId,// rightId -// c, // eval c.get(0) -// terms, // varsToMaterialize -// // idFactory, -// queryHints// -// ); + return left; - } + } - left = applyQueryHints(// - new ConditionalRoutingOp(leftOrEmpty(left),// - new NV(BOp.Annotations.BOP_ID, condId),// - new NV(ConditionalRoutingOp.Annotations.CONDITION,c)// - ), queryHints, ctx); + /** + * The RTO requires that we do not reorder solutions. This means that it + * must use an un-conditional approach to variable materialization for + * constraints with SOMETIMES materialization requirements. This has two + * practical impacts: + * <p> + * 1. We can not attach a filter with SOMETIMES requirements to a JOIN and + * wrap it with a {@link TryBeforeMaterializationConstraint} since this + * requires a {@link ConditionalRoutingOp} with an altSink and that will + * reorder solutions. + * <p> + * 2. We can not use a pattern which involves an {@link InlineMaterializeOp} + * followed by a {@link ConditionalRoutingOp} with an altSink followed by a + * {@link PipelineJoin} against the lexicon. This also reorders the + * solutions (primarily because of the {@link ConditionalRoutingOp} since we + * can force the {@link PipelineJoin} to not reorder solutions). + * <p> + * The code below uses the {@link ChunkedMaterializationOp}. This does not + * reorder the solutions. It can also materialize inline IVs giving us a + * single operator that prepare the solutions for filter evaluation. + */ + @SuppressWarnings("rawtypes") + protected static PipelineOp addNonConditionalMaterializationSteps(// + PipelineOp left,// + final Set<IVariable<?>> doneSet,// + final Map<IConstraint, Set<IVariable<IV>>> needsMaterialization, + final Long cutoffLimit,// + final Properties queryHints,// + final AST2BOpContext ctx// + ) { + + if (needsMaterialization.isEmpty()) { + + // No filters. + return left; + + } - } + // Collect variables that require materialization. + final Set<IVariable<IV>> matvars = new LinkedHashSet<IVariable<IV>>(); + for (Map.Entry<IConstraint, Set<IVariable<IV>>> e : needsMaterialization + .entrySet()) { + + matvars.addAll(e.getValue()); + } + if (!matvars.isEmpty()) { + + // Materialize those variables. + left = addChunkedMaterializationStep(left, matvars, + true/* materializeInlineIVs */, cutoffLimit, queryHints, + ctx); + + // Add them to the doneSet. + doneSet.addAll(matvars); + + } + + // Attach all constraints. + for(IConstraint c : needsMaterialization.keySet()) { + + /* + * Note: While this is using a ConditionalRoutingOp, it is NOT + * use the altSink. All solutions flow through the default sink. + * This use does not cause the solutions to be reordered. + * Parallel evaluation is also disabled. + */ + + left = applyQueryHints(// + new ConditionalRoutingOp(leftOrEmpty(left),// + new NV(BOp.Annotations.BOP_ID, ctx.nextId()),// + new NV(PipelineOp.Annotations.MAX_PARALLEL, 1),// + new NV(ConditionalRoutingOp.Annotations.CONDITION, c)// + ), queryHints, ctx); + + } + return left; } - + /** * Partition the constraints for a join into those which can (or might) be * able to run attached to that join and those which must (or might) need to @@ -601,10 +764,70 @@ * join -or- <code>null</code> iff there are no constraints that can * be attached to the join. */ + @SuppressWarnings("rawtypes") static protected IConstraint[] getJoinConstraints( final Collection<IConstraint> constraints, final Map<IConstraint, Set<IVariable<IV>>> needsMaterialization) { + return getJoinConstraints2(constraints, needsMaterialization, true/* conditionalRouting */); + + } + + /** + * Partition the constraints for a join into those which can (or might) be + * able to run attached to that join and those which must (or might) need to + * materialize some variables before they can be evaluated. Constraints + * which might be able to run attached to a join actually wind up both + * attached to the join (in the return value) where they are wrapped by a + * {@link TryBeforeMaterializationConstraint} which will ignore errors + * caused by unmaterialized values and in the <i>needsMaterialization</i> + * map. This allows such constraints to run attached to the join iff that is + * possible and otherwise to be evaluated as soon as the materialization + * pipeline can satisify their materialization requirements. + * <p> + * Note: The RTO requires that solutions are not reordered during cutoff + * JOIN evaluation in order to obtain accurate estimates of the cardinality + * of a join based on a known number of solutions in producing a known + * number of solutions out. Conditional materialization can cause solutions + * to be reordered since some solutions may pass the constraint attached to + * the join and be routed along one path in the query plan while other + * solutions may pass the join but fail the constraint due to a + * materialization requirement and are routed along another path. Thus it is + * necessary to disable conditional routing for cutoff JOIN evaluation. + * + * @param constraints + * The constraints (if any) for the join (optional). This + * collection is NOT modified. + * @param needsMaterialization + * A map providing for each constraint the set of variables which + * either might or must be materialized before that constraint + * can be evaluated. This map is populated as a side-effect. It + * will be empty iff there are no constraints that might or must + * require variable materialization. + * @param conditionalRouting + * When <code>true</code>, constraints that + * {@link Requirement#SOMETIMES} are able to run attached are + * wrapped by a {@link TryBeforeMaterializationConstraint} and + * appear both attached to the JOIN and will also run after the + * JOIN using a {@link ConditionalRoutingOp} to route solutions + * that fail the attached contraint through a materialization + * pipeline and then into a 2nd copy of the constraint once the + * variable(s) are known to be materialized.<br/> + * When <code>false</code>, only constraints that + * {@link Requirement#NEVER} require materialization will be + * attached to the JOIN. All other constraints will use + * non-conditional materialization. + * + * @return Constraints which can (or might) be able to run attached to that + * join -or- <code>null</code> iff there are no constraints that can + * be attached to the join. + */ + @SuppressWarnings("rawtypes") + static protected IConstraint[] getJoinConstraints2( + final Collection<IConstraint> constraints, + final Map<IConstraint, Set<IVariable<IV>>> needsMaterialization, + final boolean conditionalRouting) { + if (constraints == null || constraints.isEmpty()) { // No constraints for this join. @@ -630,7 +853,6 @@ * the join and run it as a ConditionalRoutingOp later. */ - @SuppressWarnings("rawtypes") final Set<IVariable<IV>> terms = new LinkedHashSet<IVariable<IV>>(); final Requirement req = StaticAnalysis.gatherVarsToMaterialize(c, @@ -640,7 +862,7 @@ it.remove(); - if (req == Requirement.SOMETIMES) { + if (req == Requirement.SOMETIMES && conditionalRouting) { tryBeforeMaterialization.add(c); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpJoins.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpJoins.java 2014-01-13 15:54:15 UTC (rev 7782) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpJoins.java 2014-01-13 16:27:44 UTC (rev 7783) @@ -140,9 +140,20 @@ final Map<IConstraint, Set<IVariable<IV>>> needsMaterialization = new LinkedHashMap<IConstraint, Set<IVariable<IV>>>(); - // Add constraints to the join for that predicate. - anns.add(new NV(JoinAnnotations.CONSTRAINTS, getJoinConstraints( - constraints, needsMaterialization))); + /* + * Add constraints to the join for that predicate. + * + * Note: If we are performing cutoff evaluation of a JOIN + * [cutoffLimit!=null], then this disables the conditional routing logic + * for constraints with SOMETIMES materialization requirements. This is + * necessary in order to preserve the order of evaluation. Conditional + * routing of solutions causes them to be reordered and that breaks the + * ability to accurately estimate the cardinality of the JOIN using + * cutoff evaluation. + */ + anns.add(new NV(JoinAnnotations.CONSTRAINTS, + getJoinConstraints2(constraints, needsMaterialization, + cutoffLimit == null/* conditionalRouting */))); // true iff there are no constraints that require materialization. anns.add(new NV(Annotations.SIMPLE_JOIN, needsMaterialization.isEmpty())); @@ -198,15 +209,39 @@ } + if (needsMaterialization.isEmpty()) { + + // No filters. + return left; + + } + /* - * For each filter which requires materialization steps, add the - * materializations steps to the pipeline and then add the filter to the - * pipeline. + * Add operators to materialization variables (as necessary) and + * evaluate filters. */ + if (cutoffLimit != null) { - left = addMaterializationSteps3(left, doneSet, needsMaterialization, - queryHints, ctx); + left = addNonConditionalMaterializationSteps(left, doneSet, + needsMaterialization, cutoffLimit, queryHints, ctx); + } else { + + /* + * For each filter which requires materialization steps, add the + * materializations steps to the pipeline and then add the + * filter to the pipeline. + * + * Note: This is the old code path. This code path not support + * cutoff evaluation of joins because it can reorder the + * solutions. + */ + + left = addMaterializationSteps3(left, doneSet, + needsMaterialization, queryHints, ctx); + + } + return left; } @@ -1002,7 +1037,7 @@ QueryHints.DEFAULT_HASH_JOIN); if (cutoffLimit != null) { - + /* * Cutoff join (RTO). */ @@ -1026,6 +1061,22 @@ map.put(PipelineJoin.Annotations.COALESCE_DUPLICATE_ACCESS_PATHS, Boolean.FALSE); + /* + * Disable access path reordering. + * + * Note: Reordering must be disabled for complex joins since we will + * correlate the input solutions and output solutions using a row + * identifier. If the solutions are reordered as they flow through + * the pipeline, then it will break this correlation and we will no + * longer have accurate information about the #of input solutions + * required to produce a given number of output solutions. [Simple + * joins might not have this requirement since the PipelineJoin is + * internally doing the accounting for the #of solutions in and out + * of the join.] + */ + map.put(PipelineJoin.Annotations.REORDER_ACCESS_PATHS, + Boolean.FALSE); + if (simpleJoin) { // // disable access path coalescing @@ -1092,20 +1143,6 @@ */ map.put(PipelineJoin.Annotations.SHARED_STATE, Boolean.TRUE);// - /* - * Disable access path reordering. - * - * Note: Reordering must be disabled for complex joins since we - * will correlate the input solutions and output solutions using - * a row identifier. If the solutions are reordered as they flow - * through the pipeline, then it will break this correlation and - * we will no longer have accurate information about the #of - * input solutions required to produce a given number of output - * solutions. - */ - map.put(PipelineJoin.Annotations.REORDER_ACCESS_PATHS, - Boolean.FALSE); - } } // cutoffJoin Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java 2014-01-13 15:54:15 UTC (rev 7782) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java 2014-01-13 16:27:44 UTC (rev 7783) @@ -232,6 +232,13 @@ static final private boolean failOutOfOrderEvaluation = false; /** + * When <code>true</code>, the generated query plans for cutoff evaluation + * will be checked to verify that the query plans do not permit reordering + * of solutions. + */ + static final private boolean checkQueryPlans = false; + + /** * Inspect the remainder of the join group. If we can isolate a join graph * and filters, then we will push them down into an RTO JoinGroup. Since the * joins have already been ordered by the static optimizer, we can accept @@ -445,6 +452,13 @@ * based on deep sampling of the join graph. * * @return The query plan to fully execute that join graph. + * + * FIXME RTO: Modify this to use AST2BOpUtility#convertJoinGroup(). + * We would have to reorder the joins in the + * {@link JoinGraph.Annotations#JOIN_GROUP} into the ordering + * specified in the {@link Path} and make sure that + * convertJoinGroup() did not attempt to recursively reapply the + * RTO. */ public static PipelineOp compileJoinGraph(final QueryEngine queryEngine, final JoinGraph joinGraph, final Path path) { @@ -464,7 +478,8 @@ final IConstraint[] constraints = joinGraph.getConstraints(); - final Set<IVariable<?>> doneSet = joinGraph.getDoneSet(); + final Set<IVariable<?>> doneSet = new LinkedHashSet<IVariable<?>>( + joinGraph.getDoneSet()); /* * The AST JoinGroupNode for the joins and filters that we are running @@ -673,6 +688,7 @@ * * @return The result of sampling that edge. */ + //synchronized// FIXME REMOVE synchronized. This forces single threading for debugging purposes when chasing out-of-order evaluation exceptions. static public EdgeSample cutoffJoin(// final QueryEngine queryEngine,// final JoinGraph joinGraph,// @@ -701,38 +717,55 @@ if (sourceSample.getSample() == null) throw new IllegalArgumentException(); - /* - * Generate the query plan for cutoff evaluation of that JOIN. The - * complexity of the query plan depends on whether or not there are - * FILTERs "attached" to the join that have variable materialization - * requirements. - */ - final PipelineOp query = getCutoffJoinQuery(queryEngine, joinGraph, - limit, predicates, constraints, pathIsComplete, sourceSample); + PipelineOp query = null; + try { - if (!runAllJoinsAsComplexJoins && (query instanceof PipelineJoin) - && query.arity() == 0) { - /* - * Simple JOIN. - * - * Old logic for query plan generation. This is deprecated and will - * disappear soon. We will still generate simple query plans and - * execute them in the simple manner, but all of the code will go - * through AST2BOPJoins#join(). + * Generate the query plan for cutoff evaluation of that JOIN. The + * complexity of the query plan depends on whether or not there are + * FILTERs "attached" to the join that have variable materialization + * requirements. */ - return runSimpleJoin(queryEngine, sourceSample, limit, - (PipelineJoin<?>) query); + query = getCutoffJoinQuery(queryEngine, joinGraph, + limit, predicates, constraints, pathIsComplete, sourceSample); - } else { + if (!runAllJoinsAsComplexJoins && (query instanceof PipelineJoin) + && query.arity() == 0) { + /* + * Simple JOIN. + * + * Old logic for query execution. Relies on the ability of + * the PipelineJoin + */ + return runSimpleJoin(queryEngine, sourceSample, limit, + (PipelineJoin<?>) query); + + } else { + + /* + * Complex JOIN involving variable materialization, conditional + * routing operators, filters, and a SLICE to limit the output. + */ + + return runComplexJoin(queryEngine, sourceSample, limit, query); + + } + } catch (Throwable ex) { + /* - * Complex JOIN involving variable materialization, conditional - * routing operators, filters, and a SLICE to limit the output. + * Add some more information. This gives us the specific predicate. + * However, it does not tell us the constraints that were attached + * to that predicate. That information is only available when we are + * compiling the query plan. At this point, the constraints have + * been turned into query plan operators. */ - - return runComplexJoin(queryEngine, sourceSample, limit, query); + throw new RuntimeException("cause=" + ex + "\npred=" + + BOpUtility.toString(pred) + "\nconstraints=" + + Arrays.toString(constraints) + (query==null?"":"\nquery=" + + BOpUtility.toString(query)), ex); + } } @@ -856,7 +889,7 @@ * [left] is now the last operator in the query plan. */ - if (!(left instanceof PipelineJoin) && left.arity() == 0) { + if (!((left instanceof PipelineJoin) && left.arity() == 0)) { /* * The query plan contains multiple operators. @@ -883,12 +916,109 @@ log.debug("RTO cutoff join query::\n" + BOpUtility.toString(left) + "\npred::" + pred); - return left; + if (checkQueryPlans) { + + checkQueryPlan(left); + + } + + return left; } + + /** + * Debug code checks the query plan for patterns that could cause + * {@link OutOfOrderEvaluationException}s. + * <p> + * <ul> + * <li>This looks for query plans that use the alternate sink. Operators + * that route some solutions to the default sink and some solutions to the + * alternate sink can cause out of order evaluation. Out of order evaluation + * is not compatible with cutoff JOIN evaluation.</li> + * <li>This looks for operators that do not restrict + * {@link PipelineOp.Annotations#MAX_PARALLEL} to ONE (1).</li> + * </ul> + * + * TODO We probably need an interface to determine whether an operator (as + * configured) guarantee in order evaluation. Some operators can not be + * configured for in order evaluation. Others, including a hash join using a + * linked hash map for the buckets, can preserve order of the source + * solutions in their output. This would also be useful for creating query + * plans that are order preserving when an index order corresponds to the + * desired output order. + * + * TODO Extend this to check the RTO plan for ordered evaluation and then + * use this in the test suite. The plan must be provable oredered. We could + * also use this for order preserving query plans for other purposes. + */ + private static void checkQueryPlan(final PipelineOp left) { + final Iterator<PipelineOp> itr = BOpUtility.visitAll(left, + PipelineOp.class); + + while (itr.hasNext()) { + + final PipelineOp tmp = itr.next(); + + if (tmp.getProperty(PipelineOp.Annotations.ALT_SINK_REF) != null) { + + // alternative sink is disallowed. + throw new RuntimeException("Query plan uses altSink: op=" + + tmp.toShortString()); + + } + + if (tmp.getMaxParallel() != 1) { + + // parallel execution of an operator is disallowed. + throw new RuntimeException("RTO " + + PipelineOp.Annotations.MAX_PARALLEL + + ": expected=1, actual=" + tmp.getMaxParallel() + + ", op=" + tmp.toShortString()); + + } + + if (tmp instanceof PipelineJoin) { + + final PipelineJoin<?> t = (PipelineJoin<?>) tmp; + + final int maxParallelChunks = t.getMaxParallelChunks(); + + if (maxParallelChunks != 0) { + + throw new RuntimeException("PipelineJoin: " + + PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS + + "=" + maxParallelChunks + + " but must be ZERO (0):: op=" + t.toShortString()); + + } + + final boolean coalesceDuplicateAccessPaths = t + .getProperty( + PipelineJoin.Annotations.COALESCE_DUPLICATE_ACCESS_PATHS, + PipelineJoin.Annotations.DEFAULT_COALESCE_DUPLICATE_ACCESS_PATHS); + + if (coalesceDuplicateAccessPaths) { + + throw new RuntimeException( + "PipelineJoin: " + + PipelineJoin.Annotations.COALESCE_DUPLICATE_ACCESS_PATHS + + "=" + coalesceDuplicateAccessPaths + + " but must be false:: op=" + + t.toShortString()); + + } + + } + + } + + } + /** - * Run a simple cutoff join on the {@link QueryEngine}. + * Run a simple cutoff join on the {@link QueryEngine}. This method relies + * on the ability to control the {@link PipelineJoin} such that it does not + * reorder the solutions. * * @param queryEngine * The {@link QueryEngine}. @@ -1050,7 +1180,7 @@ if (log.isInfoEnabled()) log.info("limit=" + limit + ", sourceSample=" + sourceSample + ", query=" + query.toShortString()); - + // Anonymous variable used for the injected column. final IVariable<?> rtoVar = Var.var(); @@ -1102,9 +1232,9 @@ int lastRowId = 0; while (itr.hasNext()) { bset = itr.next(); -//System.err.println(bset.toString()); final int rowid = ((Integer) bset.get(rtoVar).get()) .intValue(); +//log.warn("rowId="+rowid+",lastRowId="+lastRowId+",bset="+bset); if (rowid < lastRowId && failOutOfOrderEvaluation) { /* * Out of order evaluation makes it impossible to @@ -1113,8 +1243,7 @@ * without knowing the #of solutions in required to * produce a given #of solutions out. */ - throw new OutOfOrderEvaluationException( - BOpUtility.toString(query)); + throw new OutOfOrderEvaluationException(); } lastRowId = rowid; bset.clear(rtoVar); // drop injected variable. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2014-01-13 15:54:15 UTC (rev 7782) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpUtility.java 2014-01-13 16:27:44 UTC (rev 7783) @@ -1,7 +1,6 @@ package com.bigdata.rdf.sparql.ast.eval; import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -997,7 +996,7 @@ // Add the materialization step. left = addMaterializationSteps2(left, rightId, - (Collection) vars, queryHints, ctx); + (Set<IVariable<IV>>) (Set) vars, queryHints, ctx); // These variables have now been materialized. doneSet.addAll(vars); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |