From: <tho...@us...> - 2010-11-17 02:14:18
|
Revision: 3950 http://bigdata.svn.sourceforge.net/bigdata/?rev=3950&view=rev Author: thompsonbry Date: 2010-11-17 02:14:08 +0000 (Wed, 17 Nov 2010) Log Message: ----------- More work on the runtime query optimizer. It is generating useful plans for LUBM Q2, Q8 and Q9. The runtime cost of the generated plans is close to the runtime cost of the plans produced by the static query optimizer. LUBM data are pretty regular so the runtime query optimizer is not able to exploit unexpected correlations in the joins. The runtime query optimizer tends to have cardinality estimation underflow for Q2 which suggests that we need to deepen the search on paths with low estimated cardinality. This bears further investigation. When we have estimation underflow in the runtime query optimizer that means that plans extending that point are picked at random. This is similar to, but not the same, as the problem encounterd by the static query optimizer, which is unable to estimate the "as bound" cardinality after making some initial decision about the join ordering. There may very well be a role for hybrid of both the static and runtime query optimizer which plays to their different strengths. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/journal/Journal.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestPredicateAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestSampleIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestRemoteAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/rule/eval/TestDefaultEvaluationPlan.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestContextAdvancer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; +import com.bigdata.bop.IPredicate.Annotations; + import cutthecrap.utils.striterators.IPropertySet; /** @@ -180,25 +182,29 @@ */ BOpEvaluationContext getEvaluationContext(); - /** - * Return <code>true</code> iff this operator is an access path which writes - * on the database. - * - * @see Annotations#MUTATION - */ - boolean isMutation(); +// /** +// * Return <code>true</code> iff this operator is an access path which writes +// * on the database. +// * +// * @see com.bigdata.bop.IPredicate.Annotations#MUTATION +// * +// * @todo Move to {@link IPredicate}? +// */ +// boolean isMutation(); +// +// /** +// * The timestamp or transaction identifier on which the operator will read +// * or write. +// * +// * @see Annotations#TIMESTAMP +// * +// * @throws IllegalStateException +// * if {@link Annotations#TIMESTAMP} was not specified. +// * +// * @todo move to {@link IPredicate}? +// */ +// long getTimestamp(); - /** - * The timestamp or transaction identifier on which the operator will read - * or write. - * - * @see Annotations#TIMESTAMP - * - * @throws IllegalStateException - * if {@link Annotations#TIMESTAMP} was not specified. - */ - long getTimestamp(); - // /** // * Compare this {@link BOp} with another {@link BOp}. // * @@ -240,37 +246,6 @@ long DEFAULT_TIMEOUT = Long.MAX_VALUE; /** - * Boolean property whose value is <code>true</code> iff this operator - * writes on a database. - * <p> - * Most operators operate solely on streams of elements or binding sets. - * Some operators read or write on the database using an access path, - * which is typically described by an {@link IPredicate}. This property - * MUST be <code>true</code> when access path is used to write on the - * database. - * <p> - * Operators which read or write on the database must declare the - * {@link Annotations#TIMESTAMP} associated with that operation. - * - * @see #TIMESTAMP - * - * @todo Move to {@link IPredicate}? - */ - String MUTATION = BOp.class.getName() + ".mutation"; - - boolean DEFAULT_MUTATION = false; - - /** - * The timestamp (or transaction identifier) used by this operator if it - * reads or writes on the database (no default). - * - * @see #MUTATION - * - * @todo Move to {@link IPredicate}? - */ - String TIMESTAMP = BOp.class.getName() + ".timestamp"; - - /** * This annotation determines where an operator will be evaluated * (default {@value #DEFAULT_EVALUATION_CONTEXT}). */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -582,18 +582,6 @@ } - public final boolean isMutation() { - - return getProperty(Annotations.MUTATION, Annotations.DEFAULT_MUTATION); - - } - - public final long getTimestamp() { - - return (Long) getRequiredProperty(Annotations.TIMESTAMP); - - } - /* * Note: I've played around with a few hash functions and senses of * equality. Predicate (before the bops were introduced) used to have a Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -165,8 +165,7 @@ final IIndexManager tmp = getFederation() == null ? getIndexManager() : getFederation(); - final long timestamp = (Long) pred - .getRequiredProperty(BOp.Annotations.TIMESTAMP); + final long timestamp = pred.getTimestamp(); return (IRelation<E>) tmp.getResourceLocator().locate( pred.getOnlyRelationName(), timestamp); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpIdFactory.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -0,0 +1,29 @@ +package com.bigdata.bop; + +import java.util.LinkedHashSet; + +/** + * A factory which may be used when some identifiers need to be reserved. + */ +public class BOpIdFactory implements IdFactory { + + private final LinkedHashSet<Integer> ids = new LinkedHashSet<Integer>(); + + private int nextId = 0; + + public void reserve(int id) { + ids.add(id); + } + + public int nextId() { + + while (ids.contains(nextId)) { + + nextId++; + + } + + return nextId++; + } + +} \ No newline at end of file Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -27,7 +27,6 @@ package com.bigdata.bop; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -699,7 +698,7 @@ for (BOp arg : bop.args()) { - if (arg.arity() > 0) { + if (!(arg instanceof IVariableOrConstant<?>)) { toString(arg, sb, indent+1); @@ -798,29 +797,33 @@ return true; } - - /** - * Copy binding sets from the source to the sink(s). - * - * @param source - * The source. - * @param sink - * The sink (required). - * @param sink2 - * Another sink (optional). - * @param constraints - * Binding sets which fail these constraints will NOT be copied - * (optional). - * @param stats - * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} - * will be updated during the copy (optional). - */ - static public void copy( + + /** + * Copy binding sets from the source to the sink(s). + * + * @param source + * The source. + * @param sink + * The sink (required). + * @param sink2 + * Another sink (optional). + * @param constraints + * Binding sets which fail these constraints will NOT be copied + * (optional). + * @param stats + * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} + * will be updated during the copy (optional). + * + * @return The #of binding sets copied. + */ + static public long copy( final IAsynchronousIterator<IBindingSet[]> source, final IBlockingBuffer<IBindingSet[]> sink, final IBlockingBuffer<IBindingSet[]> sink2, final IConstraint[] constraints, final BOpStats stats) { + long nout = 0; + while (source.hasNext()) { final IBindingSet[] chunk = source.next(); @@ -841,13 +844,19 @@ // copy accepted binding sets to the default sink. sink.add(tmp); + nout += chunk.length; + if (sink2 != null) { - // copy accepted binding sets to the alt sink. + + // copy accepted binding sets to the alt sink. sink2.add(tmp); + } } + return nout; + } /** @@ -946,5 +955,5 @@ return out; } - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -42,13 +42,12 @@ import com.bigdata.btree.filter.Advancer; import com.bigdata.btree.filter.TupleFilter; import com.bigdata.mdi.PartitionLocator; -import com.bigdata.rawstore.Bytes; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.eval.IEvaluationPlan; import com.bigdata.relation.rule.eval.pipeline.JoinMasterTask; import com.bigdata.service.ndx.IClientIndex; @@ -69,9 +68,12 @@ */ public interface IPredicate<E> extends BOp, Cloneable, Serializable { - /** - * Interface declaring well known annotations. - */ + /** + * Interface declaring well known annotations. + * + * FIXME All of these annotations should be in the {@link IPredicate} + * namespace. + */ public interface Annotations extends BOp.Annotations, BufferAnnotations { /** @@ -289,6 +291,35 @@ // | IRangeQuery.PARALLEL ; + /** + * Boolean property whose value is <code>true</code> iff this operator + * writes on a database. + * <p> + * Most operators operate solely on streams of elements or binding sets. + * Some operators read or write on the database using an access path, + * which is typically described by an {@link IPredicate}. This property + * MUST be <code>true</code> when access path is used to write on the + * database. + * <p> + * Operators which read or write on the database must declare the + * {@link Annotations#TIMESTAMP} associated with that operation. + * + * @see Annotations#TIMESTAMP + */ + String MUTATION = BOp.class.getName() + ".mutation"; + + boolean DEFAULT_MUTATION = false; + + /** + * The timestamp (or transaction identifier) used by this operator if it + * reads or writes on the database (no default). + * + * @see com.bigdata.bop.IPredicate.Annotations#MUTATION + * + * @todo Move to {@link IPredicate}? + */ + String TIMESTAMP = BOp.class.getName() + ".timestamp"; + } /** @@ -637,4 +668,23 @@ */ public IPredicate<E> setBOpId(int bopId); + /** + * Return <code>true</code> iff this operator is an access path which writes + * on the database. + * + * @see Annotations#MUTATION + */ + boolean isMutation(); + + /** + * The timestamp or transaction identifier on which the operator will read + * or write. + * + * @see Annotations#TIMESTAMP + * + * @throws IllegalStateException + * if {@link Annotations#TIMESTAMP} was not specified. + */ + long getTimestamp(); + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IdFactory.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -0,0 +1,10 @@ +package com.bigdata.bop; + +/** + * An interface for a bop identifier factory. + */ +public interface IdFactory { + + public int nextId(); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -636,4 +636,16 @@ } + public final boolean isMutation() { + + return getProperty(IPredicate.Annotations.MUTATION, IPredicate.Annotations.DEFAULT_MUTATION); + + } + + public final long getTimestamp() { + + return (Long) getRequiredProperty(IPredicate.Annotations.TIMESTAMP); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -34,6 +34,7 @@ import java.util.Comparator; import java.util.Formatter; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -50,6 +51,8 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpContextBase; import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpIdFactory; +import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IElement; import com.bigdata.bop.IPredicate; @@ -64,8 +67,11 @@ import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.bop.rdf.join.DataSetJoin; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.relation.rule.Rule; import com.bigdata.striterator.Dechunkerator; @@ -176,13 +182,6 @@ } - /** - * - * TODO How can join constraints be moved around? Just attach them where - * ever a variable becomes bound? And when do we filter out variables which - * are not required downstream? Once we decide on a join path and execute it - * fully (rather than sampling that join path). - */ public JoinGraph(final BOp[] args, final Map<String, Object> anns) { super(args, anns); @@ -204,11 +203,6 @@ } -// /** -// * Used to assign row identifiers. -// */ -// static private final IVariable<Integer> ROWID = Var.var("__rowid"); - /** * A sample of a {@link Vertex} (an access path). */ @@ -334,9 +328,10 @@ } /** - * Take a sample of the vertex. If the sample is already exact, then - * this is a NOP. If the vertex was already sampled to that limit, then - * this is a NOP (you have to raise the limit to re-sample the vertex). + * Take a sample of the vertex, updating {@link #sample} as a + * side-effect. If the sample is already exact, then this is a NOP. If + * the vertex was already sampled to that limit, then this is a NOP (you + * have to raise the limit to re-sample the vertex). * * @param limit * The sample cutoff. @@ -677,6 +672,11 @@ /** * The last sample for this edge and <code>null</code> if the edge has * not been sampled. + * <p> + * Note: This sample is only the one-step cutoff evaluation of the edge + * given a sample of its vertex having the lesser cardinality. It is NOT + * the cutoff sample of a join path having this edge except for the + * degenerate case where the edge is the first edge in the join path. */ public EdgeSample sample = null; @@ -696,14 +696,23 @@ } /** + * The edge label is formed from the {@link BOp.Annotations#BOP_ID} of + * its ordered vertices (v1,v2). + */ + public String getLabel() { + + return "(" + v1.pred.getId() + "," + v2.pred.getId() + ")"; + + } + + /** * Note: The vertices of the edge are labeled using the * {@link BOp.Annotations#BOP_ID} associated with the {@link IPredicate} * for each vertex. */ public String toString() { - return "Edge{ (V" + v1.pred.getId() + ",V" + v2.pred.getId() - + "), estCard=" + return "Edge{ "+getLabel()+", estCard=" + (sample == null ? "N/A" : sample.estimatedCardinality) + ", shared=" + shared.toString() + ", sample=" + sample + "}"; @@ -790,27 +799,48 @@ } /** - * Estimate the cardinality of the edge. + * Estimate the cardinality of the edge, updating {@link #sample} as a + * side-effect. This is a NOP if the edge has already been sampled at + * that <i>limit</i>. This is a NOP if the edge sample is exact. * * @param context * - * @return The estimated cardinality of the edge. + * @return The new {@link EdgeSample} (this is also updated on + * {@link #sample} as a side-effect). * * @throws Exception */ - public long estimateCardinality(final QueryEngine queryEngine, + public EdgeSample estimateCardinality(final QueryEngine queryEngine, final int limit) throws Exception { if (limit <= 0) throw new IllegalArgumentException(); - /* - * Note: There is never a need to "re-sample" the edge. Unlike ROX, - * we always can sample a vertex. This means that we can sample the - * edges exactly once, during the initialization of the join graph. - */ - if (sample != null) - throw new RuntimeException(); +// /* +// * Note: There is never a need to "re-sample" the edge. Unlike ROX, +// * we always can sample a vertex. This means that we can sample the +// * edges exactly once, during the initialization of the join graph. +// */ +// if (sample != null) +// throw new RuntimeException(); + + if (sample != null) { + + if (sample.limit >= limit) { + + // Already sampled at that limit. + return sample; + + } + + if (sample.estimateEnum == EstimateEnum.Exact) { + + // Sample is exact (fully materialized result). + return sample; + + } + + } /* * Figure out which vertex has the smaller cardinality. The sample @@ -832,27 +862,6 @@ } /* - * TODO This is awkward to setup because we do not have a concept - * (or class) corresponding to a fly weight relation and we do not - * have a general purpose relation, just arrays or sequences of - * IBindingSets. Also, all relations are persistent. Temporary - * relations are on a temporary store and are locatable by their - * namespace rather than being Objects. - * - * The algorithm presupposes fly weight / temporary relations this - * both to wrap the sample and to store the computed intermediate - * results. - * - * Note: The PipelineJoin does not have a means to halt after a - * limit is satisfied. In order to achieve this, we have to wrap it - * with a SliceOp. - * - * Together, this means that we are dealing with IBindingSet[]s for - * both the input and the output of the cutoff evaluation of the - * edge rather than rows of the materialized relation. - */ - - /* * Convert the source sample into an IBindingSet[]. * * TODO We might as well do this when we sample the vertex. @@ -872,12 +881,16 @@ v.sample.rangeCount, v.sample.exact, v.sample.limit, sourceSample); - return sample.estimatedCardinality; + return sample; } /** - * Estimate the cardinality of the edge. + * Estimate the cardinality of the edge given a sample of either a + * vertex or a join path leading up to that edge. + * <p> + * Note: The caller is responsible for protecting against needless + * re-sampling. * * @param queryEngine * @param limit @@ -908,10 +921,6 @@ if (limit <= 0) throw new IllegalArgumentException(); -// // Inject a rowId column. -// sourceSample = BOpUtility.injectRowIdColumn(ROWID, 1/* start */, -// sourceSample); - /* * Note: This sets up a cutoff pipeline join operator which makes an * accurate estimate of the #of input solutions consumed and the #of @@ -928,6 +937,12 @@ * predicate) will not reduce the effort to compute the join, but * they can reduce the cardinality of the join and that is what we * are trying to estimate here. + * + * TODO How can join constraints be moved around? Just attach them + * where ever a variable becomes bound? And when do we filter out + * variables which are not required downstream? Once we decide on a + * join path and execute it fully (rather than sampling that join + * path). */ final int joinId = 1; final PipelineJoin joinOp = new PipelineJoin(new BOp[] {}, // @@ -953,22 +968,8 @@ */ new NV(PipelineJoin.Annotations.SHARED_STATE,true), new NV(PipelineJoin.Annotations.EVALUATION_CONTEXT,BOpEvaluationContext.CONTROLLER) -// // make sure the chunks are large enough to hold the result. -// new NV(PipelineJoin.Annotations.CHUNK_CAPACITY,limit), -// // no chunk timeout -// new NV(PipelineJoin.Annotations.CHUNK_TIMEOUT,Long.MAX_VALUE) ); -// BOpContext context = new BOpContext(runningQuery, partitionId, stats, source, sink, sink2); -// joinOp.eval(context); - -// final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp },// -// NV.asMap(// -// new NV(BOp.Annotations.BOP_ID, 2), // -// new NV(SliceOp.Annotations.LIMIT, (long) limit), // -// new NV(BOp.Annotations.EVALUATION_CONTEXT, -// BOpEvaluationContext.CONTROLLER))); - final PipelineOp queryOp = joinOp; // run the cutoff sampling of the edge. @@ -980,10 +981,6 @@ new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { sourceSample }))); -// // #of source samples consumed. -// int inputCount; -// // #of output samples generated. -// int outputCount = 0; final List<IBindingSet> result = new LinkedList<IBindingSet>(); try { try { @@ -993,15 +990,8 @@ runningQuery.iterator()); while (itr.hasNext()) { bset = itr.next(); -// final int rowid = (Integer) bset.get(ROWID).get(); -// if (rowid > inputCount) -// inputCount = rowid; result.add(bset); -// outputCount++; } -// // #of input rows consumed. -// inputCount = bset == null ? 0 : ((Integer) bset.get(ROWID) -// .get()); } finally { // verify no problems. runningQuery.get(); @@ -1014,8 +1004,8 @@ final PipelineJoinStats joinStats = (PipelineJoinStats) runningQuery .getStats().get(joinId); - if (log.isDebugEnabled()) - log.debug(joinStats.toString()); + if (log.isTraceEnabled()) + log.trace(joinStats.toString()); /* * TODO Improve comments here. See if it is possible to isolate a @@ -1032,8 +1022,8 @@ (int) joinStats.outputSolutions.get(), // result.toArray(new IBindingSet[result.size()])); - if (log.isTraceEnabled()) - log.trace("edge=" + this + ", sample=" + edgeSample); + if (log.isDebugEnabled()) + log.debug(getLabel() + " : newSample=" + edgeSample); return edgeSample; @@ -1081,8 +1071,7 @@ for (Edge e : edges) { if (!first) sb.append(","); - sb.append("(" + e.v1.pred.getId() + "," + e.v2.pred.getId() - + ")"); + sb.append(e.getLabel()); first = false; } sb.append(",cumEstCard=" + cumulativeEstimatedCardinality @@ -1176,68 +1165,6 @@ return false; } -// /** -// * Return <code>true</code> if this path is an unordered super set of -// * the given path. In the case where both paths have the same vertices -// * this will also return <code>true</code>. -// * -// * @param p -// * Another path. -// * -// * @return <code>true</code> if this path is an unordered super set of -// * the given path. -// */ -// public boolean isUnorderedSuperSet(final Path p) { -// -// if (p == null) -// throw new IllegalArgumentException(); -// -// if (edges.size() < p.edges.size()) { -// /* -// * Fast rejection. This assumes that each edge after the first -// * adds one distinct vertex to the path. That assumption is -// * enforced by #addEdge(). -// */ -// return false; -// } -// -// final Vertex[] v1 = getVertices(); -// final Vertex[] v2 = p.getVertices(); -// -// if (v1.length < v2.length) { -// // Proven false since the other set is larger. -// return false; -// } -// -// /* -// * Scan the vertices of the caller's path. If any of those vertices -// * are NOT found in this path then the caller's path can not be a -// * subset of this path. -// */ -// for (int i = 0; i < v2.length; i++) { -// -// final Vertex tmp = v2[i]; -// -// boolean found = false; -// for (int j = 0; j < v1.length; j++) { -// -// if (v1[j] == tmp) { -// found = true; -// break; -// } -// -// } -// -// if (!found) { -// return false; -// } -// -// } -// -// return true; -// -// } - /** * Return <code>true</code> if this path is an unordered variant of the * given path (same vertices in any order). @@ -1302,21 +1229,100 @@ } /** - * Return the vertices in this path (in path order). + * Return the vertices in this path (in path order). For the first edge, + * the minimum cardinality vertex is always reported first (this is + * critical for producing the correct join plan). For the remaining + * edges in the path, the unvisited is reported. * * @return The vertices (in path order). * * TODO This could be rewritten without the toArray() using a * method which visits the vertices of a path in any order. + * + * @todo unit test for the first vertex to be reported. */ public Vertex[] getVertices() { + final Set<Vertex> tmp = new LinkedHashSet<Vertex>(); + for (Edge e : edges) { + + if (tmp.isEmpty()) { + /* + * The first edge is handled specially in order to report + * the minimum cardinality vertex first. + */ + tmp.add(e.getMinimumCardinalityVertex()); + tmp.add(e.getMaximumCardinalityVertex()); + + } else { + + tmp.add(e.v1); + + tmp.add(e.v2); + + } + + } + + final Vertex[] a = tmp.toArray(new Vertex[tmp.size()]); + + return a; + + } + + /** + * Return the {@link IPredicate}s associated with the vertices of the + * join path in path order. + * + * @see #getVertices() + */ + public IPredicate[] getPredicates() { + + // The vertices in the selected evaluation order. + final Vertex[] vertices = getVertices(); + + // The predicates in the same order as the vertices. + final IPredicate[] preds = new IPredicate[vertices.length]; + + for (int i = 0; i < vertices.length; i++) { + + preds[i] = vertices[i].pred; + + } + + return preds; + + } + + /** + * Return the {@link BOp} identifiers of the predicates associated with + * each vertex in path order. + */ + static public int[] getVertexIds(final List<Edge> edges) { + + final Set<Vertex> tmp = new LinkedHashSet<Vertex>(); + + for (Edge e : edges) { + tmp.add(e.v1); + tmp.add(e.v2); + } + final Vertex[] a = tmp.toArray(new Vertex[tmp.size()]); - return a; + + final int[] b = new int[a.length]; + + for (int i = 0; i < a.length; i++) { + + b[i] = a[i].pred.getId(); + + } + + return b; + } /** @@ -1350,14 +1356,18 @@ /** * Add an edge to a path, computing the estimated cardinality of the new - * path, and returning the new path. + * path, and returning the new path. The cutoff join is performed using + * the {@link #sample} of <i>this</i> join path and the actual access + * path for the target vertex. * * @param queryEngine * @param limit * @param e * The edge. * - * @return The new path. + * @return The new path. The materialized sample for the new path is the + * sample obtained by the cutoff join for the edge added to the + * path. * * @throws Exception */ @@ -1432,63 +1442,12 @@ final Path tmp = new Path(edges, cumulativeEstimatedCardinality, edgeSample); - // tmp.stopVertex = e.getMaximumCardinalityVertex(); - return tmp; } } - // /** - // * Equality is defined by comparison of the unordered set of edges. - // */ - // public boolean equals(final Object o) { - // if (this == o) - // return true; - // if (!(o instanceof Path)) - // return false; - // final Path t = (Path) o; - // if (edges.length != t.edges.length) - // return false; - // for (Edge e : edges) { - // boolean found = false; - // for (Edge x : t.edges) { - // if (x.equals(e)) { - // found = true; - // break; - // } - // } - // if (!found) - // return false; - // } - // return true; - // } - // - // /** - // * The hash code of path is defined as the bit-wise XOR of the hash - // * codes of the edges in that path. - // */ - // public int hashCode() { - // - // if (hash == 0) { - // - // int result = 0; - // - // for(Edge e : edges) { - // - // result ^= e.hashCode(); - // - // } - // - // hash = result; - // - // } - // return hash; - // - // } - // private int hash; - } /** @@ -1582,31 +1541,60 @@ } /** - * A join graph (data structure and methods only). + * A runtime optimizer for a join graph. The {@link JoinGraph} bears some + * similarity to ROX (Runtime Optimizer for XQuery), but has several + * significant differences: + * <ol> + * <li> + * 1. ROX starts from the minimum cardinality edge of the minimum + * cardinality vertex. The {@link JoinGraph} starts with one or more low + * cardinality vertices.</li> + * <li> + * 2. ROX always extends the last vertex added to a given join path. The + * {@link JoinGraph} extends all vertices having unexplored edges in each + * breadth first expansion.</li> + * <li> + * 3. ROX is designed to interleave operator-at-once evaluation of join path + * segments which dominate other join path segments. The {@link JoinGraph} + * is designed to prune all join paths which are known to be dominated by + * other join paths for the same set of vertices in each round and iterates + * until a join path is identified which uses all vertices and has the + * minimum expected cumulative estimated cardinality. Join paths which + * survive pruning are re-sampled as necessary in order to obtain better + * information about edges in join paths which have a low estimated + * cardinality in order to address a problem with underflow of the + * cardinality estimates.</li> + * </ol> * - * Note: ROX was stated in terms of materialization of intermediate results. - * Bigdata was originally designed to support pipelined join evaluation in - * which the zero investment property is true (there exists an index for the - * join). While support is being developed for operator-at-once joins (e.g., - * hash joins), that support is aimed at more efficient evaluation of high - * cardinality joins using multi-block IO. Therefore, unlike ROX, the - * runtime query optimizer does not materialize the intermediate results - * when chain sampling. Instead, it feeds a sample into a cutoff pipeline - * evaluation for the join path. Since some join paths can eliminate a lot - * of intermediate solutions and hence take a long time to satisfy the - * cutoff, we also specify a timeout for the cutoff evaluation of a join - * path. Given the zero investment property (an index exists for the join), - * if the cutoff is not satisfied within the timeout, then the join has a - * low correlation. If no solutions are generated within the timeout, then - * the estimate of the correlation "underflows". + * TODO For join graphs with a large number of vertices we may need to + * constrain the #of vertices which are explored in parallel. This could be + * done by only branching the N lowest cardinality vertices from the already + * connected edges. Since fewer vertices are being explored in parallel, + * paths are more likely to converge onto the same set of vertices at which + * point we can prune the dominated paths. * - * Note: timeouts are a bit tricky when you are not running on a real-time - * platform. In particular, heavy swapping or heavy GC workloads could both - * cause a timeout to expire because no work was done on sampling the join - * path rather than because there was a lot of work to be done. Therefore, - * the timeout should be used to protect against join paths which take a - * long time to materialize <i>cutoff</i> solutions rather than to fine tune - * the running time of the query optimizer. + * TODO Compare the cumulative expected cardinality of a join path with the + * expected cost of a join path. The latter allows us to also explore + * alternative join strategies, such as the parallel subquery versus scan + * and filter decision for named graph and default graph SPARQL queries. + * + * TODO Coalescing duplicate access paths can dramatically reduce the work + * performed by a pipelined nested index subquery. (A hash join eliminates + * all duplicate access paths using a scan and filter approach.) If we will + * run a pipeline nested index subquery join, then should the runtime query + * optimizer prefer paths with duplicate access paths? + * + * TODO How can we handle things like lexicon joins. A lexicon join is is + * only evaluated when the dynamic type of a variable binding indicates that + * the RDF Value must be materialized by a join against the ID2T index. + * Binding sets having inlined values can simply be routed around the join + * against the ID2T index. Routing around saves network IO in scale-out + * where otherwise we would route binding sets having identifiers which do + * not need to be materialized to the ID2T shards. + * + * @see <a + * href="http://www-db.informatik.uni-tuebingen.de/files/research/pathfinder/publications/rox-demo.pdf"> + * ROX </a> */ public static class JGraph { @@ -1641,10 +1629,6 @@ } sb.append("\n]}"); return sb.toString(); - - // return super.toString() + "{V=" + Arrays.toString(V) + ",E=" - // + Arrays.toString(E) + - // ", executedVertices="+executedVertices+"}"; } public JGraph(final IPredicate[] v) { @@ -1707,7 +1691,7 @@ * * @throws Exception */ - public void runtimeOptimizer(final QueryEngine queryEngine, + public Path runtimeOptimizer(final QueryEngine queryEngine, final int limit) throws Exception { // Setup the join graph. @@ -1732,26 +1716,11 @@ } - /* - * FIXME Choose the best join path and execute it (or return the - * evaluation order to the caller). - * - * FIXME This must either recognize each time a join path is known - * to dominate all other join paths and then execute it or iterator - * until the total join path is decided and then execute the - * original query using that join path. - * - * @todo When executing the query, it is actually being executed as - * a subquery. Therefore we have to take appropriate care to ensure - * that the results are copied out of the subquery and into the - * parent query. See SubqueryTask for how this is done. - * - * @todo When we execute the query, we should clear the references - * to the sample (unless they are exact, in which case they can be - * used as is) in order to release memory associated with those - * samples if the query is long running. - */ - + // Should be one winner. + assert paths.length == 1; + + return paths[0]; + } /** @@ -1831,14 +1800,14 @@ */ estimateEdgeWeights(queryEngine, limit); - if (log.isInfoEnabled()) { + if (log.isDebugEnabled()) { final StringBuilder sb = new StringBuilder(); sb.append("Edges:\n"); for (Edge e : E) { sb.append(e.toString()); sb.append("\n"); } - log.info(sb.toString()); + log.debug(sb.toString()); } /* @@ -1887,52 +1856,215 @@ throw new IllegalArgumentException(); // increment the limit by itself in each round. - final int limit = round * limitIn; - - final List<Path> tmp = new LinkedList<Path>(); + final int limit = (round + 1) * limitIn; - // First, copy all existing paths. + if (log.isDebugEnabled()) + log.debug("round=" + round + ", limit=" + limit + + ", #paths(in)=" + a.length); + +// final List<Path> tmp = new LinkedList<Path>(); +// +// // First, copy all existing paths. +// for (Path x : a) { +// tmp.add(x); +// } + + /* + * Re-sample all vertices which are part of any of the existing + * paths. + * + * Note: A request to re-sample a vertex is a NOP unless the limit + * has been increased since the last time the vertex was sampled. It + * is also a NOP if the vertex has been fully materialized. + * + * TODO We only really need to deepen those paths where we have a + * low estimated join hit ratio. Paths with a higher join hit ratio + * already have a decent estimate of the cardinality and a decent + * sample size and can be explored without resampling. + */ + if (log.isDebugEnabled()) + log.debug("Re-sampling in-use vertices: limit=" + limit); + for (Path x : a) { - tmp.add(x); + + for(Edge e : x.edges) { + + e.v1.sample(queryEngine, limit); + e.v2.sample(queryEngine, limit); + + } + } - // Vertices are inserted into this collection when they are resampled. - final Set<Vertex> resampled = new LinkedHashSet<Vertex>(); + /* + * Re-sample the cutoff join for each edge in each of the existing + * paths using the newly re-sampled vertices. + * + * Note: The only way to increase the accuracy of our estimates for + * edges as we extend the join paths is to re-sample each edge in + * the join path in path order. + * + * Note: An edge must be sampled for each distinct join path prefix + * in which it appears within each round. However, it is common for + * surviving paths to share a join path prefix, so do not re-sample + * a given path prefix more than once per round. Also, do not + * re-sample paths which are from rounds before the immediately + * previous round as those paths will not be extended in this round. + */ + if (log.isDebugEnabled()) + log.debug("Re-sampling in-use path segments: limit=" + limit); - // Then expand each path. + final Map<int[], EdgeSample> edgePaths = new LinkedHashMap<int[], EdgeSample>(); + for (Path x : a) { - final int nedges = x.edges.size(); + // The edges which we have visited in this path. + final List<Edge> edges = new LinkedList<Edge>(); + + // The vertices which we have visited in this path. + final Set<Vertex> vertices = new LinkedHashSet<Vertex>(); + + EdgeSample priorEdgeSample = null; + + for(Edge e : x.edges) { + + // Add edge to the visited set for this join path. + edges.add(e); - if (nedges < round) { + // Generate unique key for this join path segment. + final int[] ids = Path.getVertexIds(edges); - // Path is from a previous round. - continue; - - } + if (priorEdgeSample == null) { - /* - * The only way to increase the accuracy of our estimates for - * edges as we extend the join paths is to re-sample each edge - * in the join path in path order. - * - * Note: An edge must be sampled for each distinct join path - * prefix in which it appears within each round. However, it is - * common for surviving paths to share a join path prefix, so do - * not re-sample a given path prefix more than once per round. - * Also, do not re-sample paths which are from rounds before the - * immediately previous round as those paths will not be - * extended in this round. - * - * FIXME Find all vertices in use by all paths which survived - * into this round. Re-sample those vertices to the new limit - * (resampling a vertex is a NOP if it has been resampled to the - * desired limit so we can do this incrementally rather than up - * front). For each edge of each path in path order, re-sample - * the edge. Shared prefix samples should be reused, but samples - * of the same edge with a different prefix must not be shared. - */ + /* + * This is the first edge in the path. + * + * Test our local table of join path segment estimates + * to see if we have already re-sampled that edge. If + * not, then re-sample it now. + */ + + // Test sample cache. + EdgeSample edgeSample = edgePaths.get(ids); + + if (edgeSample == null) { + if (e.sample != null && e.sample.limit >= limit) { + + // The existing sample for that edge is fine. + edgeSample = e.sample; + + } else { + + /* + * Re-sample the edge, updating the sample on + * the edge as a side-effect. The cutoff sample + * is based on the vertex sample for the minimum + * cardinality vertex. + */ + + edgeSample = e.estimateCardinality(queryEngine, + limit); + + } + + // Cache the sample. + if (edgePaths.put(ids, edgeSample) != null) + throw new AssertionError(); + + } + + // Add both vertices to the visited set. + vertices.add(e.v1); + vertices.add(e.v2); + + // Save sample. It will be used to re-sample the next edge. + priorEdgeSample = edgeSample; + + continue; + + } + + final boolean v1Found = vertices.contains(e.v1); + + // The source vertex for the new edge. + final Vertex sVertex = v1Found ? e.v1 : e.v2; + + // The target vertex for the new edge. + final Vertex tVertex = v1Found ? e.v2 : e.v1; + + // Look for sample for this path in our cache. + EdgeSample edgeSample = edgePaths.get(ids); + + if (edgeSample == null) { + + /* + * This is some N-step edge in the path, where N is + * greater than ONE (1). The source vertex is the vertex + * which already appears in the prior edges of this join + * path. The target vertex is the next vertex which is + * visited by the join path. The sample pass in is the + * prior edge sample - that is, the sample from the path + * segment less the target vertex. This is the sample + * that we just updated when we visited the prior edge + * of the path. + */ + + edgeSample = e + .estimateCardinality( + queryEngine, + limit, + sVertex, + tVertex,// + priorEdgeSample.estimatedCardinality,// + priorEdgeSample.estimateEnum == EstimateEnum.Exact, + priorEdgeSample.limit,// + priorEdgeSample.sample// + ); + + if (log.isDebugEnabled()) + log.debug("Resampled: " + Arrays.toString(ids) + + " : " + edgeSample); + + if (edgePaths.put(ids, edgeSample) != null) + throw new AssertionError(); + + } + + // Save sample. It will be used to re-sample the next edge. + priorEdgeSample = edgeSample; + + // Add target vertex to the visited set. + vertices.add(tVertex); + + } // next Edge [e] in Path [x] + + // Save the result on the path. + x.sample = priorEdgeSample; + } // next Path [x]. + + /* + * Expand each path one step from each vertex which branches to an + * unused vertex. + */ + + if (log.isDebugEnabled()) + log.debug("Expanding paths: limit=" + limit + ", #paths(in)=" + + a.length); + + final List<Path> tmp = new LinkedList<Path>(); + + for (Path x : a) { + +// final int nedges = x.edges.size(); +// +// if (nedges < round) { +// +// // Path is from a previous round. +// continue; +// +// } + // The set of vertices used to expand this path in this round. final Set<Vertex> used = new LinkedHashSet<Vertex>(); @@ -1969,13 +2101,8 @@ // add the new vertex to the set of used vertices. used.add(tVertex); - if (resampled.add(tVertex)) { - /* - * (Re-)sample this vertex before we sample a new edge - * which targets this vertex. - */ - tVertex.sample(queryEngine, limit); - } + // (Re-)sample vertex before we sample a new edge + tVertex.sample(queryEngine, limit); // Extend the path to the new vertex. final Path p = x.addEdge(queryEngine, limit, edgeInGraph); @@ -2355,30 +2482,26 @@ // Create the join graph. final JGraph g = new JGraph(getVertices()); - // Run it. - g.runtimeOptimizer(context.getRunningQuery().getQueryEngine(), limit); + // Find the best join path. + final Path p = g.runtimeOptimizer(context.getRunningQuery() + .getQueryEngine(), limit); + // Factory avoids reuse of bopIds assigned to the predicates. + final BOpIdFactory idFactory = new BOpIdFactory(); + + // Generate the query from the join path. + final PipelineOp queryOp = JoinGraph.getQuery(idFactory, p + .getPredicates()); + + // Run the query, blocking until it is done. + JoinGraph.runSubquery(context, queryOp); + return null; } - } + } // class JoinGraphTask -// @todo Could be used to appropriately ignore false precision in cardinality estimates. -// private static double roundToSignificantFigures(final double num, -// final int n) { -// if (num == 0) { -// return 0; -// } -// -// final double d = Math.ceil(Math.log10(num < 0 ? -num : num)); -// final int power = n - (int) d; -// -// final double magnitude = Math.pow(10, power); -// final long shifted = Math.round(num * magnitude); -// return shifted / magnitude; -// } - /** * Places vertices into order by the {@link BOp#getId()} associated with * their {@link IPredicate}. @@ -2436,4 +2559,191 @@ } + /* + * Static methods: + * + * @todo Keep with JGraph or move to utility class. However, the precise + * manner in which the query plan is generated is still up in the air since + * we are not yet handling anything except standard joins in the runtime + * optimizer. + */ + + /** + * Generate a query plan from an ordered collection of predicates. + * + * @param p + * The join path. + * + * @return The query plan. + */ + static public PipelineOp getQuery(final BOpIdFactory idFactory, + final IPredicate[] preds) { + + final PipelineJoin[] joins = new PipelineJoin[preds.length]; + +// final PipelineOp startOp = new StartOp(new BOp[] {}, +// NV.asMap(new NV[] {// +// new NV(Predicate.Annotations.BOP_ID, idFactory +// .nextId()),// +// new NV(SliceOp.Annotations.EVALUATION_CONTEXT, +// BOpEvaluationContext.CONTROLLER),// +// })); +// +// PipelineOp lastOp = startOp; + PipelineOp lastOp = null; + +// final Set<IVariable> vars = new LinkedHashSet<IVariable>(); +// for(IPredicate p : preds) { +// for(BOp arg : p.args()) { +// if(arg instanceof IVariable) { +// vars.add((IVariable)arg); +// } +// } +// } + + for (int i = 0; i < preds.length; i++) { + + // The next vertex in the selected join order. + final IPredicate p = preds[i]; + + final List<NV> anns = new LinkedList<NV>(); + + anns.add(new NV(PipelineJoin.Annotations.PREDICATE, p)); + + anns.add(new NV(PipelineJoin.Annotations.BOP_ID, idFactory + .nextId())); + +// anns.add(new NV(PipelineJoin.Annotations.EVALUATION_CONTEXT, BOpEvaluationContext.ANY)); +// +// anns.add(new NV(PipelineJoin.Annotations.SELECT, vars.toArray(new IVariable[vars.size()]))); + + final PipelineJoin joinOp = new PipelineJoin( + lastOp == null ? new BOp[0] : new BOp[] { lastOp }, + anns.toArray(new NV[anns.size()])); + + joins[i] = joinOp; + + lastOp = joinOp; + + } + +// final PipelineOp queryOp = lastOp; + + /* + * FIXME Why does wrapping with this slice appear to be + * necessary? (It is causing runtime errors when not wrapped). + * Is this a bopId collision which is not being detected? + */ + final PipelineOp queryOp = new SliceOp(new BOp[] { lastOp }, NV + .asMap(new NV[] { + new NV(JoinGraph.Annotations.BOP_ID, idFactory.nextId()), // + new NV(JoinGraph.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER) }) // + ); + + return queryOp; + + } + + /** + * Execute the selected join path. + * <p> + * Note: When executing the query, it is actually being executed as a + * subquery. Therefore we have to take appropriate care to ensure that the + * results are copied out of the subquery and into the parent query. See + * {@link AbstractSubqueryOp} for how this is done. + * + * @todo When we execute the query, we should clear the references to the + * samples (unless they are exact, in which case they can be used as + * is) in order to release memory associated with those samples if the + * query is long running. Samples must be held until we have + * identified the final join path since each vertex will be used by + * each maximum length join path and we use the samples from the + * vertices to re-sample the surviving join paths in each round. + * + * @todo If there is a slice on the outer query, then the query result may + * well be materialized by now. + * + * @todo If there are source binding sets then they need to be applied above + * (when we are sampling) and below (when we evaluate the selected + * join path). + * + * FIXME runQuery() is not working correctly. The query is being + * halted by a {@link BufferClosedException} which appears before it + * has materialized the necessary results. + */ + static public void runSubquery(final BOpContext<IBindingSet> parentContext, + final PipelineOp queryOp) { + + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + + try { + + if (log.isInfoEnabled()) + log.info("Running: " + BOpUtility.toString(queryOp)); + + final PipelineOp startOp = (PipelineOp) BOpUtility + .getPipelineStart(queryOp); + + if (log.isInfoEnabled()) + log.info("StartOp: " + BOpUtility.toString(startOp)); + + // Run the query. + final UUID queryId = UUID.randomUUID(); + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + + final RunningQuery runningQuery = queryEngine + .eval( + queryId, + queryOp, + new LocalChunkMessage<IBindingSet>( + queryEngine, + queryId, + startOp.getId()/* startId */, + -1 /* partitionId */, + /* + * @todo pass in the source binding sets + * here and also when sampling the + * vertices. + */ + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { new HashBindingSet() } }))); + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningQuery.iterator(); + + // Copy solutions from the subquery to the query. + final long nout = BOpUtility + .copy(subquerySolutionItr, parentContext.getSink(), + null/* sink2 */, null/* constraints */, null/* stats */); + + System.out.println("nout=" + nout); + + // verify no problems. + runningQuery.get(); + + System.out.println("Future Ok"); + + } catch (Throwable t) { + + log.error(t,t); + + /* + * If a subquery fails, then propagate the error to the parent + * and rethrow the first cause error out of the subquery. + */ + throw new RuntimeException(parentContext.getRunningQuery() + .halt(t)); + + } finally { + + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -72,15 +72,33 @@ if (log.isInfoEnabled()) { - final Integer[] order = BOpUtility.getEvaluationOrder(q.getQuery()); + try { - log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), - true/* summary */)); +// if (log.isDebugEnabled()) { - int orderIndex = 0; - for (Integer bopId : order) { - log.info(getTableRow(q, orderIndex, bopId, false/* summary */)); - orderIndex++; + /* + * Detail row for each operator in the query. + */ + final Integer[] order = BOpUtility.getEvaluationOrder(q + .getQuery()); + + int orderIndex = 0; + for (Integer bopId : order) { + log + .info(getTableRow(q, orderIndex, bopId, false/* summary */)); + orderIndex++; + } + +// } + + // summary row. + log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), + true/* summary */)); + + } catch (RuntimeException t) { + + log.error(t,t); + } } @@ -107,6 +125,7 @@ */ sb.append("\tevalOrder"); // [0..n-1] sb.append("\tbopId"); + sb.append("\tpredId"); sb.append("\tevalContext"); sb.append("\tcontroller"); // metadata considered by the static optimizer. @@ -120,7 +139,7 @@ sb.append("\tunitsIn"); sb.append("\tchunksOut"); sb.append("\tunitsOut"); - sb.append("\tmultipler"); // expansion rate multipler in the solution count. + sb.append("\tjoinRatio"); // expansion rate multipler in the solution count. sb.append("\taccessPathDups"); sb.append("\taccessPathCount"); sb.append("\taccessPathChunksIn"); @@ -146,7 +165,8 @@ * @param summary <code>true</code> iff the summary for the query should be written. * @return The row of the table. */ - static private String getTableRow(final IRunningQuery q, final int evalOrder, final Integer bopId, final boolean summary) { + static private String getTableRow(final IRunningQuery q, + final int evalOrder, final Integer bopId, final boolean summary) { final StringBuilder sb = new StringBuilder(); @@ -190,16 +210,32 @@ * keep this from breaking the table format. */ sb.append(BOpUtility.toString(q.getQuery()).replace('\n', ' ')); + sb.append('\t'); + sb.append("total"); // summary line. } else { - // Otherwise how just this bop. + // Otherwise show just this bop. sb.append(bopIndex.get(bopId).toString()); + sb.append('\t'); + sb.append(evalOrder); // eval order for this bop. } sb.append('\t'); - sb.append(evalOrder); - sb.append('\t'); sb.append(Integer.toString(bopId)); sb.append('\t'); + { + /* + * Show the predicate identifier if this is a Join operator. + * + * @todo handle other kinds of join operators when added using a + * shared interface. + */ + final IPredicate<?> pred = (IPredicate<?>) bop + .getProperty(PipelineJoin.Annotations.PREDICATE); + if (pred != null) { + sb.append(Integer.toString(pred.getId())); + } + } + sb.append('\t'); sb.append(bop.getEvaluationContext()); sb.append('\t'); sb.append(bop.getProperty(BOp.Annotations.CONTROLLER, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-15 18:13:06 UTC (rev 3949) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-17 02:14:08 UTC (rev 3950) @@ -491,8 +491,16 @@ pop... [truncated message content] |