From: <tho...@us...> - 2010-11-08 21:31:24
|
Revision: 3918 http://bigdata.svn.sourceforge.net/bigdata/?rev=3918&view=rev Author: thompsonbry Date: 2010-11-08 21:31:17 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Added a utility class for exploring adaptive query optimization and wrote the initialization logic for the JGraph (in JoinGraph). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-08 21:30:29 UTC (rev 3917) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -28,24 +28,51 @@ package com.bigdata.bop.controller; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; import com.bigdata.bop.BOpContext; -import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IElement; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.ap.SampleIndex; +import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.engine.LocalChunkMessage; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.relation.rule.Rule; +import com.bigdata.striterator.Dechunkerator; /** * A join graph with annotations for estimated cardinality and other details in * support of runtime query optimization. A join graph is a collection of - * relations and joins which connect those relations. + * relations and joins which connect those relations. This boils down to a + * collection of {@link IPredicate}s (selects on relations) and shared variables + * (which identify joins). * <p> * * @see http://arxiv.org/PS_cache/arxiv/pdf/0810/0810.4809v1.pdf, XQuery Join @@ -86,163 +113,548 @@ */ public class JoinGraph extends PipelineOp { + private static final transient Logger log = Logger.getLogger(JoinGraph.class); + private static final long serialVersionUID = 1L; /** * Known annotations. */ public interface Annotations extends PipelineOp.Annotations { - /** - * The default sample size (100 is a good value). - */ - String SAMPLE_SIZE = "sampleSize"; + /** + * The vertices of the join graph expressed an an {@link IPredicate}[]. + */ + String VERTICES = JoinGraph.class.getName() + ".vertices"; + + /** + * The initial sample size (default {@value #DEFAULT_SAMPLE_SIZE}). + */ + String SAMPLE_SIZE = JoinGraph.class.getName() + ".sampleSize"; + + int DEFAULT_SAMPLE_SIZE = 100; } - /** - * Vertices of the join graph. + /** + * @see Annotations#VERTICES */ - private final Vertex[] V; + public IPredicate[] getVertices() { + + return (IPredicate[]) getRequiredProperty(Annotations.VERTICES); + + } /** - * Edges of the join graph. + * @see Annotations#SAMPLE_SIZE */ - private final Edge[] E; - - /** - * A vertex of the join graph is an annotated relation (this corresponds to - * an {@link IPredicate} with additional annotations to support the adaptive - * query optimization algorithm). - */ - private static class Vertex implements Serializable { + public int getSampleSize() { + + return getProperty(Annotations.SAMPLE_SIZE, Annotations.DEFAULT_SAMPLE_SIZE); + + } + + public JoinGraph(final NV ...anns) { - /** - * - */ - private static final long serialVersionUID = 1L; + this(BOpBase.NOARGS, NV.asMap(anns)); + + } - final IPredicate<?> pred; + /** + * + * @todo We can derive the vertices from the join operators or the join + * operators from the vertices. However, if a specific kind of join + * operator is required then the question is whether we have better + * information to make that choice when the join graph is evaluated or + * before it is constructed. + * + * @todo How we will handle optional joins? Presumably they are outside of + * the code join graph as part of the tail attached to that join + * graph. + * + * @todo How can join constraints be moved around? Just attach them where + * ever a variable becomes bound? And when do we filter out variables + * which are not required downstream? Once we decide on a join path + * and execute it fully (rather than sampling that join path). + */ + public JoinGraph(final BOp[] args, final Map<String,Object> anns) { - Vertex(final IPredicate<?> pred) { - if (pred == null) - throw new IllegalArgumentException(); - this.pred = pred; + super(args,anns); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); } + } - /** - * An edge of the join graph is an annotated join operator. The edges of the - * join graph are undirected. Edges exist when the vertices share at least - * one variable. - */ - private static class Edge implements Serializable { - - /** + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new JoinGraphTask(context)); + + } + + /** + * A vertex of the join graph is an annotated relation (this corresponds to + * an {@link IPredicate} with additional annotations to support the adaptive + * query optimization algorithm). + */ + public static class Vertex implements Serializable { + + /** * */ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - /** - * The vertices connected by that edge. - */ - final Vertex v1, v2; + final IPredicate<?> pred; - /** - * A weight representing the estimated cardinality of the join. + /** + * The limit used to produce the {@link #sample}. + */ + int limit; + + /** + * Fast range count and <code>null</code> until initialized. + */ + Long rangeCount; + + /** + * Sample (when not-null). + */ + Object[] sample; + + Vertex(final IPredicate<?> pred) { + + if (pred == null) + throw new IllegalArgumentException(); + + this.pred = pred; + + } + + public String toString() { + + return "\nVertex{pred=" + pred + ",rangeCount=" + rangeCount + + ",sampleSize=" + (sample == null ? "N/A" : sample.length) + + "}"; + + } + + public void sample(final BOpContextBase context,final int limit) { + + final IRelation r = context.getRelation(pred); + + final IAccessPath ap = context.getAccessPath(r, pred); + + if (rangeCount == null) { + + rangeCount = ap.rangeCount(false/* exact */); + + } + + if (sample == null) { // @todo new sample each time? + + final SampleIndex sampleOp = new SampleIndex(new BOp[] {}, // + NV.asMap(// + new NV(SampleIndex.Annotations.PREDICATE, pred),// + new NV(SampleIndex.Annotations.LIMIT, limit))); + + sample = sampleOp.eval(context); + + this.limit = limit; + + } + + } + + } + + /** + * An edge of the join graph is an annotated join operator. The edges of the + * join graph are undirected. Edges exist when the vertices share at least + * one variable. + */ + public static class Edge implements Serializable { + + /** + * */ - double w; + private static final long serialVersionUID = 1L; - public Edge(final Vertex v1, final Vertex v2) { - if (v1 == null) + /** + * The vertices connected by that edge. + */ + final Vertex v1, v2; + + /** + * The set of shared variables. + */ + final Set<IVariable<?>> shared; + + class EdgeSample { + + /** + * The fast range count (aka cardinality) for the source vertex of + * the edge (whichever vertex has the lower cardinality). + */ + final long inputRangeCount; + /** + * The limit used to sample the edge (this is the limit on the #of + * solutions generated by the cutoff join used when this sample was + * taken). + */ + final int limit; + /** + * The #of binding sets out of the source sample vertex sample which + * were consumed. + */ + final int inputCount; + /** + * The #of binding sets generated before the join was cutoff. + */ + final int outputCount; + /** + * The ratio of the #of input samples consumed to the #of output + * samples generated. + */ + final double f; + /** + * The estimated cardinality of the join. + */ + final long estimatedCardinality; + + /** + * @param limit + * The limit used to sample the edge (this is the limit + * on the #of solutions generated by the cutoff join used + * when this sample was taken). + * @param inputRangeCount + * The fast range count (aka cardinality) for the source + * vertex of the edge (whichever vertex has the lower + * cardinality). + * @param inputCount + * The #of binding sets out of the source sample vertex + * sample which were consumed. + * @param outputCount + * The #of binding sets generated before the join was + * cutoff. + * + * @todo If the outputCount is zero then this is a good indicator + * that there is an error in the query such that the join will + * not select anything. This is not 100%, merely indicative. + */ + EdgeSample(final long inputRangeCount, final int limit, final int inputCount, + final int outputCount) { + + this.inputRangeCount = inputRangeCount; + + this.limit = limit; + + this.inputCount = inputCount; + + this.outputCount = outputCount; + + f = outputCount == 0 ? 0 : (outputCount / (double) inputCount); + + estimatedCardinality = (long) (inputRangeCount * f); + + } + + public String toString() { + return "EdgeSample" + "{inputRangeCount=" + inputRangeCount + + ", limit=" + limit + ", inputCount=" + inputCount + + ", outputCount=" + outputCount + ", f=" + f + + ", estimatedCardinality=" + estimatedCardinality + + "}"; + } + + }; + + /** + * The last sample for this edge and <code>null</code> if the edge has + * not been sampled. + */ + EdgeSample sample = null; + + public Edge(final Vertex v1, final Vertex v2, final Set<IVariable<?>> shared) { + if (v1 == null) + throw new IllegalArgumentException(); + if (v2 == null) + throw new IllegalArgumentException(); + if (shared==null) + throw new IllegalArgumentException(); + if (shared.isEmpty()) + throw new IllegalArgumentException(); + this.v1 = v1; + this.v2 = v2; + this.shared = shared; + } + + public String toString() { + + return "\nEdge{v1=" + v1.pred.getId() + ",v2=" + v2.pred.getId() + + ",shared=" + shared.toString() + ", sample=" + sample + "}"; + + } + + /** + * Estimate the cardinality of the edge. + * + * @param context + * @throws Exception + */ + public void estimateCardinality(final QueryEngine queryEngine, + final int limit) throws Exception { + + if (limit <= 0) + throw new IllegalArgumentException(); + + /* + * Figure out which vertex has the smaller cardinality. The sample + * of that vertex is used since it is more representative than the + * sample of the other vertex. + */ + // vertex v, vprime + final Vertex v, vp; + if (v1.rangeCount < v2.rangeCount) { + v = v1; + vp = v2; + } else { + v = v2; + vp = v1; + } + + /* + * @todo This is difficult to setup because we do not have a concept + * (or class) corresponding to a fly weight relation and we do not + * have a general purpose relation, just arrays or sequences of + * IBindingSets. Also, all relations are persistent. Temporary + * relations are on a temporary store and are locatable by their + * namespace rather than being Objects. + * + * The algorithm presupposes fly weight / temporary relations this + * both to wrap the sample and to store the computed intermediate + * results. + * + * Note: The PipelineJoin does not have a means to halt after a + * limit is satisfied. In order to achieve this, we have to wrap it + * with a SliceOp. + * + * Together, this means that we are dealing with IBindingSet[]s for + * both the input and the output of the cutoff evaluation of the + * edge rather than rows of the materialized relation. + * + * @todo On subsequent iterations we would probably re-sample [v] + * and we would run against the materialized intermediate result for + * [v']. + */ + + /* + * Convert the source sample into an IBindingSet[], injecting a + * rowid column. + */ + final IVariable<Integer> ROWID = Var.var("__rowid"); + final IBindingSet[] sample = new IBindingSet[v.sample.length]; + { + for (int i = 0; i < sample.length; i++) { + final IBindingSet bset = new HashBindingSet(); + BOpContext.copyValues((IElement) v.sample[i], v.pred, bset); + bset.set(ROWID, new Constant<Integer>(Integer.valueOf(i))); + sample[i] = bset; + } + } + + /* + * @todo Any constraints on the edge (other than those implied by + * shared variables) need to be annotated on the join. Constraints + * (other than range constraints which are directly coded by the + * predicate) will not reduce the effort to compute the join, but + * they can reduce the cardinality of the join and that is what we + * are trying to estimate here. + */ + final PipelineJoin joinOp = new PipelineJoin(new BOp[] {}, // + new NV(BOp.Annotations.BOP_ID, 1),// + new NV(PipelineJoin.Annotations.PREDICATE,vp.pred.setBOpId(3)) + ); + + final SliceOp sliceOp = new SliceOp(new BOp[] { joinOp },// + NV.asMap(// + new NV(BOp.Annotations.BOP_ID, 2), // + new NV(SliceOp.Annotations.LIMIT, (long)limit), // + new NV( + BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER))); + + // run the cutoff sampling of the edge. + final UUID queryId = UUID.randomUUID(); + final RunningQuery runningQuery = queryEngine.eval(queryId, + sliceOp, new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, joinOp.getId()/* startId */, + -1 /* partitionId */, + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { sample }))); + + // #of source samples consumed. + int inputCount = 0; + // #of output samples generated. + int outputCount = 0; + try { + try { + IBindingSet bset = null; + // Figure out the #of source samples consumed. + final Iterator<IBindingSet> itr = new Dechunkerator<IBindingSet>( + runningQuery.iterator()); + while (itr.hasNext()) { + bset = itr.next(); + outputCount++; + } + // #of input rows consumed. Note: +1 since origin ZERO. + inputCount = bset == null ? 0 : ((Integer) bset.get(ROWID) + .get()) + 1; + } finally { + // verify no problems. FIXME Restore test of the query. +// runningQuery.get(); + } + } finally { + runningQuery.cancel(true/* mayInterruptIfRunning */); + } + + this.sample = new EdgeSample(v.rangeCount, limit, inputCount, + outputCount); + + if (log.isInfoEnabled()) + log.info("edge=" + this + sample); + + } + + } + + /** + * A join graph (data structure and methods only). + */ + public static class JGraph { + + /** + * Vertices of the join graph. + */ + private final Vertex[] V; + + /** + * Edges of the join graph. + */ + private final Edge[] E; + + public List<Vertex> getVertices() { + return Collections.unmodifiableList(Arrays.asList(V)); + } + + public List<Edge> getEdges() { + return Collections.unmodifiableList(Arrays.asList(E)); + } + + public String toString() { + return super.toString() + "{V=" + Arrays.toString(V) + ",E=" + + Arrays.toString(E) + "}"; + } + + public JGraph(final IPredicate[] v) { + + if (v == null) throw new IllegalArgumentException(); - if (v2 == null) - throw new IllegalArgumentException(); - this.v1 = v1; - this.v2 = v2; - } - } - /** - * - * @param joinNexus - * @param v - * @param sampleSize - * The default sample size to use when sampling a vertex of the - * join graph (100). - * - * @todo We can derive the vertices from the join operators or the join - * operators from the vertices. However, if a specific kind of join - * operator is required then the question is whether we have better - * information to make that choice when the join graph is evaluated or - * before it is constructed. - */ - public JoinGraph(final IPredicate<?>[] v, final int sampleSize) { + if (v.length < 2) + throw new IllegalArgumentException(); - super(v/* args */, NV.asMap(new NV[] {// - new NV(Annotations.SAMPLE_SIZE, Integer.valueOf(sampleSize))// - })); + V = new Vertex[v.length]; - if (v == null) - throw new IllegalArgumentException(); + for (int i = 0; i < v.length; i++) { - if (sampleSize <= 0) - throw new IllegalArgumentException(); + V[i] = new Vertex(v[i]); - switch (getEvaluationContext()) { - case CONTROLLER: - break; - default: - throw new UnsupportedOperationException( - Annotations.EVALUATION_CONTEXT + "=" - + getEvaluationContext()); - } + } - V = new Vertex[v.length]; + /* + * Identify the edges by looking for shared variables among the + * predicates. + */ + { - for (int i = 0; i < v.length; i++) { + final List<Edge> tmp = new LinkedList<Edge>(); - V[i] = new Vertex(v[i]); - - } + for (int i = 0; i < v.length; i++) { - /* - * Identify the edges by looking for shared variables among the - * predicates. - */ - { + final IPredicate<?> p1 = v[i]; - final List<Edge> tmp = new LinkedList<Edge>(); + for (int j = i + 1; j < v.length; j++) { - for (int i = 0; i < v.length; i++) { + final IPredicate<?> p2 = v[j]; - final IPredicate<?> p1 = v[i]; + final Set<IVariable<?>> shared = Rule.getSharedVars(p1, + p2); - for (int j = i + 1; j < v.length; j++) { + if (shared != null && !shared.isEmpty()) { - final IPredicate<?> p2 = v[j]; + tmp.add(new Edge(V[i], V[j], shared)); - final Set<IVariable<?>> shared = Rule.getSharedVars(p1, p2); + } - if (shared != null) { + } - tmp.add(new Edge(V[i], V[j])); + } - } + E = tmp.toArray(new Edge[0]); - } + } - } - - E = tmp.toArray(new Edge[0]); - - } + } - } + /** + * Obtain a sample and estimated cardinality (fast range count) for each vertex. + * + * @param context + * @param limit + * The sample size. + */ + public void sampleVertices(final BOpContextBase context, final int limit) { - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + for (Vertex v : V) { - return new FutureTask<Void>(new JoinGraphTask(context)); + v.sample(context, limit); + + } + + } - } + /** + * Estimate the cardinality of each edge. + * + * @param context + * + * @throws Exception + */ + public void estimateEdgeWeights(final QueryEngine queryEngine, final int limit) throws Exception { + + for(Edge e : E) { + + if (e.v1.sample == null || e.v2.sample == null) { + + /* + * We can only estimate the cardinality of edges connecting + * vertices for which samples were obtained. + */ + continue; + + } + + e.estimateCardinality(queryEngine, limit); + + } + + } + + } // class JGraph /** * Evaluation of a {@link JoinGraph}. @@ -254,6 +666,8 @@ private final BOpContext<IBindingSet> context; + private final JGraph g; + JoinGraphTask(final BOpContext<IBindingSet> context) { if (context == null) @@ -261,6 +675,15 @@ this.context = context; + final IPredicate[] v = getVertices(); + + final int sampleSize = getSampleSize(); + + if (sampleSize <= 0) + throw new IllegalArgumentException(); + + g = new JGraph(v); + } public Void call() throws Exception { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java 2010-11-08 21:30:29 UTC (rev 3917) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -27,8 +27,6 @@ package com.bigdata.bop.controller; -import com.bigdata.bop.controller.JoinGraph; - import junit.framework.TestCase2; /** @@ -52,8 +50,142 @@ super(name); } +// @Override +// public Properties getProperties() { +// +// final Properties p = new Properties(super.getProperties()); +// +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); +// +// return p; +// +// } +// +// static private final String namespace = "ns"; +// +// Journal jnl; +// +// R rel; +// +// public void setUp() throws Exception { +// +// jnl = new Journal(getProperties()); +// +// } +// +// /** +// * Create and populate relation in the {@link #namespace}. +// * +// * @return The #of distinct entries. +// */ +// private int loadData(final int scale) { +// +// final String[] names = new String[] { "John", "Mary", "Saul", "Paul", +// "Leon", "Jane", "Mike", "Mark", "Jill", "Jake", "Alex", "Lucy" }; +// +// final Random rnd = new Random(); +// +// // #of distinct instances of each name. +// final int populationSize = Math.max(10, (int) Math.ceil(scale / 10.)); +// +// // #of trailing zeros for each name. +// final int nzeros = 1 + (int) Math.ceil(Math.log10(populationSize)); +// +//// System.out.println("scale=" + scale + ", populationSize=" +//// + populationSize + ", nzeros=" + nzeros); +// +// final NumberFormat fmt = NumberFormat.getIntegerInstance(); +// fmt.setMinimumIntegerDigits(nzeros); +// fmt.setMaximumIntegerDigits(nzeros); +// fmt.setGroupingUsed(false); +// +// // create the relation. +// final R rel = new R(jnl, namespace, ITx.UNISOLATED, new Properties()); +// rel.create(); +// +// // data to insert. +// final E[] a = new E[scale]; +// +// for (int i = 0; i < scale; i++) { +// +// final String n1 = names[rnd.nextInt(names.length)] +// + fmt.format(rnd.nextInt(populationSize)); +// +// final String n2 = names[rnd.nextInt(names.length)] +// + fmt.format(rnd.nextInt(populationSize)); +// +//// System.err.println("i=" + i + ", n1=" + n1 + ", n2=" + n2); +// +// a[i] = new E(n1, n2); +// +// } +// +// // sort before insert for efficiency. +// Arrays.sort(a,R.primaryKeyOrder.getComparator()); +// +// // insert data (the records are not pre-sorted). +// final long ninserts = rel.insert(new ChunkedArrayIterator<E>(a.length, a, null/* keyOrder */)); +// +// // Do commit since not scale-out. +// jnl.commit(); +// +// // should exist as of the last commit point. +// this.rel = (R) jnl.getResourceLocator().locate(namespace, +// ITx.READ_COMMITTED); +// +// assertNotNull(rel); +// +// return (int) ninserts; +// +// } +// +// public void tearDown() throws Exception { +// +// if (jnl != null) { +// jnl.destroy(); +// jnl = null; +// } +// +// // clear reference. +// rel = null; +// +// } + public void test_something() { - + +//// final int scale = 10000; +//// +//// final int nrecords = loadData(scale); +// +// final IVariable<?> x = Var.var("x"); +// +// final IVariable<?> y = Var.var("y"); +// +// final IPredicate<E> p1 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// final IPredicate<E> p2 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// final IPredicate<E> p3 = new Predicate<E>(new BOp[] { x, y }, +// new NV(IPredicate.Annotations.RELATION_NAME, +// new String[] { namespace }),// +// new NV(IPredicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED)// +// ); +// +// new JoinGraph(// +// new NV(BOp.Annotations.BOP_ID, 1),// +// new NV(JoinGraph.Annotations.VERTICES,new IPredicate[]{}),// +// new NV(JoinGraph.Annotations.SAMPLE_SIZE, 100)// +// ); + fail("write tests"); } Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/AdaptiveQueryOptimization.java 2010-11-08 21:31:17 UTC (rev 3918) @@ -0,0 +1,335 @@ +package com.bigdata.rdf.sail.bench; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.Var; +import com.bigdata.bop.controller.JoinGraph.JGraph; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; +import com.bigdata.journal.Journal; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPOPredicate; +import com.bigdata.rdf.store.AbstractTripleStore; + +/** + * Hard codes LUBM UQ. + * + * <pre> + * [query2] + * PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> + * PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#> + * SELECT ?x ?y ?z + * WHERE{ + * ?x a ub:GraduateStudent . + * ?y a ub:University . + * ?z a ub:Department . + * ?x ub:memberOf ?z . + * ?z ub:subOrganizationOf ?y . + * ?x ub:undergraduateDegreeFrom ?y + * } + * </pre> + * + * Re-ordered joins to cluster by shared variables. This makes a nicer graph if + * you draw it. + * + * <pre> + * v2 ?z a ub:Department . + * v3 ?x ub:memberOf ?z . + * v4 ?z ub:subOrganizationOf ?y . + * v1 ?y a ub:University . + * v5 ?x ub:undergraduateDegreeFrom ?y + * v0 ?x a ub:GraduateStudent . + * </pre> + * + * <pre> + * http://www.w3.org/1999/02/22-rdf-syntax-ns#type (TermId(8U)) + * + * http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#UndergraduateStudent (TermId(324U)) + * </pre> + */ +public class AdaptiveQueryOptimization { + + public static void main(String[] args) throws Exception { + + final String namespace = "LUBM_U50"; + final String propertyFile = "/root/workspace/bigdata-quads-query-branch/bigdata-perf/lubm/ant-build/bin/WORMStore.properties"; + final String journalFile = "/data/lubm/U50/bigdata-lubm.WORM.jnl"; + + final Properties properties = new Properties(); + { + // Read the properties from the file. + final InputStream is = new BufferedInputStream(new FileInputStream( + propertyFile)); + try { + properties.load(is); + } finally { + is.close(); + } + if (System.getProperty(BigdataSail.Options.FILE) != null) { + // Override/set from the environment. + properties.setProperty(BigdataSail.Options.FILE, System + .getProperty(BigdataSail.Options.FILE)); + } + if (properties.getProperty(BigdataSail.Options.FILE) == null) { + properties.setProperty(BigdataSail.Options.FILE, journalFile); + } + } + + final Journal jnl = new Journal(properties); + try { + + final AbstractTripleStore database = (AbstractTripleStore) jnl + .getResourceLocator().locate(namespace, + jnl.getLastCommitTime()); + + if (database == null) + throw new RuntimeException("Not found: " + namespace); + + /* + * Resolve terms against the lexicon. + */ + final BigdataValueFactory f = database.getLexiconRelation() + .getValueFactory(); + + final BigdataURI rdfType = f + .createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); + + final BigdataURI graduateStudent = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#GraduateStudent"); + + final BigdataURI university = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#University"); + + final BigdataURI department = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#Department"); + + final BigdataURI memberOf = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#memberOf"); + + final BigdataURI subOrganizationOf = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#subOrganizationOf"); + + final BigdataURI undergraduateDegreeFrom = f + .createURI("http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#undergraduateDegreeFrom"); + + final BigdataValue[] terms = new BigdataValue[] { rdfType, + graduateStudent, university, department, memberOf, + subOrganizationOf, undergraduateDegreeFrom }; + + // resolve terms. + database.getLexiconRelation() + .addTerms(terms, terms.length, true/* readOnly */); + + { + for (BigdataValue tmp : terms) { + System.out.println(tmp + " : " + tmp.getIV()); + if (tmp.getIV() == null) + throw new RuntimeException("Not defined: " + tmp); + } + } + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + // The name space for the SPO relation. + final String[] relation = new String[] {namespace + ".spo"}; + + final long timestamp = jnl.getLastCommitTime(); + + int nextId = 0; + + // ?x a ub:GraduateStudent . + final IPredicate p0 = new SPOPredicate(new BOp[] { x, + new Constant(rdfType.getIV()), new Constant(graduateStudent.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?y a ub:University . + final IPredicate p1 = new SPOPredicate(new BOp[] { y, + new Constant(rdfType.getIV()), new Constant(university.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?z a ub:Department . + final IPredicate p2 = new SPOPredicate(new BOp[] { z, + new Constant(rdfType.getIV()), new Constant(department.getIV()) },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?x ub:memberOf ?z . + final IPredicate p3 = new SPOPredicate(new BOp[] { x, + new Constant(memberOf.getIV()), z },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?z ub:subOrganizationOf ?y . + final IPredicate p4 = new SPOPredicate(new BOp[] { z, + new Constant(subOrganizationOf.getIV()), y },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // ?x ub:undergraduateDegreeFrom ?y + final IPredicate p5 = new SPOPredicate(new BOp[] { x, + new Constant(undergraduateDegreeFrom.getIV()), y },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(IPredicate.Annotations.TIMESTAMP,timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, relation)// + ); + + // the vertices of the join graph (the predicates). + final IPredicate[] preds = new IPredicate[] { p0, p1, p2, p3, p4, + p5 }; + +// final JoinGraph op = new JoinGraph(// +// new NV(JoinGraph.Annotations.VERTICES, preds),// +// new NV(JoinGraph.Annotations.SAMPLE_SIZE, 100) // +// ); + + final JGraph g = new JGraph(preds); + + final int limit = 100; + + final QueryEngine queryEngine = QueryEngineFactory + .getQueryController(jnl/* indexManager */); + + final BOpContextBase context = new BOpContextBase(queryEngine); + + System.err.println("joinGraph=" + g.toString()); + + /* + * Sample the vertices. + * + * @todo Sampling for scale-out not yet finished. + * + * @todo Re-sampling might always produce the same sample depending + * on the sample operator impl (it should be random, but it is not). + */ + g.sampleVertices(context, limit); + + System.err.println("joinGraph=" + g.toString()); + + /* + * Estimate the cardinality and weights for each edge. + * + * @todo It would be very interesting to see the variety and/or + * distribution of the values bound when the edge is sampled. This + * can be easily done using a hash map with a counter. That could + * tell us a lot about the cardinality of the next join path + * (sampling the join path also tells us a lot, but it does not + * explain it as much as seeing the histogram of the bound values). + * I believe that there are some interesting online algorithms for + * computing the N most frequent observations and the like which + * could be used here. + */ + g.estimateEdgeWeights(queryEngine, limit); + + System.err.println("joinGraph=" + g.toString()); + + /* + * @todo choose starting vertex (most selective). see if there are + * any paths which are fully determined based on static optimization + * (doubtful). + */ + + /* + * @todo iteratively chain sample to choose best path, then execute + * that path. this is where most of the complex bits are. + * constraints must be applied to appropriate joins, variables must + * be filtered when no longer required, edges which are must be + * dropped from paths in which they have become redundant, etc., + * etc. + * + * @todo a simpler starting place is just to explore the cost of the + * query under different join orderings. e.g., Choose(N), where N is + * the #of predicates (full search). Or dynamic programming (also + * full search, just a little smarter). + */ +// g.run(); + + +// /* +// * Run the index scan without materializing anything from the +// * lexicon. +// */ +// if (true) { +// System.out.println("Running SPO only access path."); +// final long begin = System.currentTimeMillis(); +// final IAccessPath<ISPO> accessPath = database.getAccessPath( +// null/* s */, rdfType, undergraduateStudent); +// final IChunkedOrderedIterator<ISPO> itr = accessPath.iterator(); +// try { +// while (itr.hasNext()) { +// itr.next(); +// } +// } finally { +// itr.close(); +// } +// final long elapsed = System.currentTimeMillis() - begin; +// System.err.println("Materialize SPOs : elapsed=" + elapsed +// + "ms"); +// } + +// /* +// * Open the sail and run Q14. +// * +// * @todo It would be interesting to run this using a lexicon join. +// * Also, given the changes in the various defaults which were +// * recently made, it is worth while to again explore the parameter +// * space for this query. +// */ +// if (true) { +// final BigdataSail sail = new BigdataSail(database); +// sail.initialize(); +// final BigdataSailConnection conn = sail.getReadOnlyConnection(); +// try { +// System.out.println("Materializing statements."); +// final long begin = System.currentTimeMillis(); +// final CloseableIteration<? extends Statement, SailException> itr = conn +// .getStatements(null/* s */, rdfType, +// undergraduateStudent, true/* includeInferred */); +// try { +// while (itr.hasNext()) { +// itr.next(); +// } +// } finally { +// itr.close(); +// } +// final long elapsed = System.currentTimeMillis() - begin; +// System.err.println("Materialize statements: elapsed=" +// + elapsed + "ms"); +// } finally { +// conn.close(); +// } +// sail.shutDown(); +// } + + } finally { + jnl.close(); + } + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |