From: <tho...@us...> - 2010-11-22 22:05:18
|
Revision: 3974 http://bigdata.svn.sourceforge.net/bigdata/?rev=3974&view=rev Author: thompsonbry Date: 2010-11-22 21:08:56 +0000 (Mon, 22 Nov 2010) Log Message: ----------- Added a sumRangeCounts counter to the join stats. Modified the runtime optimizer to use the sum of the range counts considered by the cutoff join when the cardinality estimate is recognized as a lower bound. Added test case for "bar" data set for the runtime optimizer. This query and data set ran into the lower bound estimate problem. The change in this commit fixed the query. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBarData.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -142,6 +142,15 @@ * within the round. This would imply that we keep per join path limits. * The vertex and edge samples are already aware of the limit at which * they were last sampled so this should not cause any problems there. + * <p> + * A related option would be to deepen the samples only when we are in + * danger of cardinality estimation underflow. E.g., a per-path limit. + * Resampling vertices may only make sense when we increase the limit + * since otherwise we may find a different correlation with the new sample + * but the comparison of paths using one sample base with paths using a + * different sample base in a different round does not carry forward the + * cardinality estimates from the prior round (unless we do something like + * a weighted moving average). * * @todo When comparing choices among join paths having fully bound tails where * the estimated cardinality has also gone to zero, we should prefer to @@ -152,7 +161,7 @@ * those which reach the 1-var vertex. [In order to support this, we would * need a means to indicate that a fully bound access path should use an * index specified by the query optimizer rather than the primary index - * for the relation. In addition, this suggests that we should keep bloom + * for the relation. In addition, this suggests that we should keep bloom * filters for more than just the SPO(C) index in scale-out.] * * @todo Examine behavior when we do not have perfect covering indices. This @@ -187,6 +196,15 @@ String LIMIT = JoinGraph.class.getName() + ".limit"; int DEFAULT_LIMIT = 100; + + /** + * The <i>nedges</i> edges of the join graph having the lowest + * cardinality will be used to generate the initial join paths (default + * {@value #DEFAULT_NEDGES}). This must be a positive integer. + */ + String NEDGES = JoinGraph.class.getName() + ".nedges"; + + int DEFAULT_NEDGES = 2; } /** @@ -207,6 +225,15 @@ } + /** + * @see Annotations#NEDGES + */ + public int getNEdges() { + + return getProperty(Annotations.NEDGES, Annotations.DEFAULT_NEDGES); + + } + public JoinGraph(final NV... anns) { this(BOpBase.NOARGS, NV.asMap(anns)); @@ -542,7 +569,7 @@ * there is an error in the query such that the join will not select * anything. This is not 100%, merely indicative. */ - public final int outputCount; + public final long outputCount; /** * The ratio of the #of input samples consumed to the #of output samples @@ -592,7 +619,9 @@ final int sourceSampleLimit,// final int limit,// final int inputCount, // - final int outputCount,// + final long outputCount,// + final double f, + final long estimatedCardinality, final IBindingSet[] sample) { if (sample == null) @@ -609,10 +638,10 @@ this.outputCount = outputCount; - f = outputCount == 0 ? 0 : (outputCount / (double) inputCount); - - estimatedCardinality = (long) (rangeCount * f); - + this.f = f; + + this.estimatedCardinality = estimatedCardinality; + if (sourceSampleExact && outputCount < limit) { /* * Note: If the entire source vertex is being fed into the @@ -1037,20 +1066,55 @@ if (log.isTraceEnabled()) log.trace(joinStats.toString()); - + /* * TODO Improve comments here. See if it is possible to isolate a * common base class which would simplify the setup of the cutoff * join and the computation of the sample stats. */ + // #of solutions in. + final int nin = (int) joinStats.inputSolutions.get(); + + // #of solutions out. + long nout = joinStats.outputSolutions.get(); + + // cumulative range count of the sampled access paths. + final long sumRangeCount = joinStats.accessPathRangeCount.get(); + + if (nin == 1 && nout == limit) { + /* + * We are getting [limit] solutions out for one solution in. In + * this case, (nout/nin) is a lower bound for the estimated + * cardinality of the edge. In fact, this condition suggests + * that the upper bound is a must better estimate of the + * cardinality of this join. Therefore, we replace [nout] with + * the sum of the range counts for the as-bound predicates + * considered by the cutoff join. + * + * For example, consider a join feeding a rangeCount of 16 into + * a rangeCount of 175000. With a limit of 100, we estimated the + * cardinality at 1600L (lower bound). In fact, the cardinality + * is 16*175000. This falsely low estimate can cause solutions + * which are really better to be dropped. + */ + nout = sumRangeCount; + + } + + final double f = nout == 0 ? 0 : (nout / (double) nin); + + final long estimatedCardinality = (long) (sourceSampleRangeCount * f); + final EdgeSample edgeSample = new EdgeSample( sourceSampleRangeCount, // sourceSampleExact, // @todo redundant with sourceSampleLimit sourceSampleLimit, // limit, // - (int) joinStats.inputSolutions.get(),// - (int) joinStats.outputSolutions.get(), // + nin,// + nout, // + f, // + estimatedCardinality, // result.toArray(new IBindingSet[result.size()])); if (log.isDebugEnabled()) @@ -1719,19 +1783,25 @@ * @param limit * The limit for sampling a vertex and the initial limit for * cutoff join evaluation. + * @param nedges + * The edges in the join graph are sorted in order of + * increasing cardinality and up to <i>nedges</i> of the + * edges having the lowest cardinality are used to form the + * initial set of join paths. For each edge selected to form + * a join path, the starting vertex will be the vertex of + * that edge having the lower cardinality. * * @throws Exception */ public Path runtimeOptimizer(final QueryEngine queryEngine, - final int limit) throws Exception { + final int limit, final int nedges) throws Exception { // Setup the join graph. - Path[] paths = round0(queryEngine, limit, 2/* nedges */); + Path[] paths = round0(queryEngine, limit, nedges); /* - * The input paths for the first round have two vertices (one edge - * is two vertices). Each round adds one more vertex, so we have - * three vertices by the end of round 1. We are done once we have + * The initial paths all have one edge, and hence two vertices. Each + * round adds one more vertex to each path. We are done once we have * generated paths which include all vertices. * * This occurs at round := nvertices - 1 @@ -1796,6 +1866,11 @@ * The maximum #of edges to choose. Those having the smallest * expected cardinality will be chosen. * + * @return An initial set of paths starting from any most <i>nedges</i>. + * For each of the <i>nedges</i> lowest cardinality edges, the + * starting vertex will be the vertex with the lowest + * cardinality for that edge. + * * @throws Exception */ public Path[] round0(final QueryEngine queryEngine, final int limit, @@ -2489,6 +2564,8 @@ private final JGraph g; private int limit; + + private int nedges; JoinGraphTask(final BOpContext<IBindingSet> context) { @@ -2499,9 +2576,14 @@ limit = getLimit(); + nedges = getNEdges(); + if (limit <= 0) throw new IllegalArgumentException(); + if (nedges <= 0) + throw new IllegalArgumentException(); + final IPredicate[] v = getVertices(); g = new JGraph(v); @@ -2515,7 +2597,7 @@ // Find the best join path. final Path p = g.runtimeOptimizer(context.getRunningQuery() - .getQueryEngine(), limit); + .getQueryEngine(), limit, nedges); // Factory avoids reuse of bopIds assigned to the predicates. final BOpIdFactory idFactory = new BOpIdFactory(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -158,6 +158,7 @@ sb.append("\tjoinRatio"); // expansion rate multipler in the solution count. sb.append("\taccessPathDups"); sb.append("\taccessPathCount"); + sb.append("\taccessPathRangeCount"); sb.append("\taccessPathChunksIn"); sb.append("\taccessPathUnitsIn"); // dynamics based on elapsed wall clock time. @@ -337,6 +338,8 @@ sb.append('\t'); sb.append(stats.accessPathCount.get()); sb.append('\t'); + sb.append(stats.accessPathRangeCount.get()); + sb.append('\t'); sb.append(stats.accessPathChunksIn.get()); sb.append('\t'); sb.append(stats.accessPathUnitsIn.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -778,6 +778,7 @@ sb.append("\tunitsOut"); sb.append("\taccessPathDups"); sb.append("\taccessPathCount"); + sb.append("\taccessPathRangeCount"); sb.append("\taccessPathChunksIn"); sb.append("\taccessPathUnitsIn"); //{chunksIn=1,unitsIn=100,chunksOut=4,unitsOut=313,accessPathDups=0,accessPathCount=100,chunkCount=100,elementCount=313} @@ -929,6 +930,8 @@ sb.append('\t'); sb.append(t.accessPathCount.get()); sb.append('\t'); + sb.append(t.accessPathRangeCount.get()); + sb.append('\t'); sb.append(t.accessPathChunksIn.get()); sb.append('\t'); sb.append(t.accessPathUnitsIn.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,7 +115,7 @@ final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); /** - * The timestamp(ms) when the query begins to execute. + * The timestamp (ms) when the query begins to execute. */ final private AtomicLong startTime = new AtomicLong(System .currentTimeMillis()); @@ -171,10 +172,91 @@ } /** - * The maximum number of operator tasks which may be concurrently executor + * The maximum number of operator tasks which may be concurrently executed * for a given (bopId,shardId). + * + * @see QueryEngineTestAnnotations#MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD */ final private int maxConcurrentTasksPerOperatorAndShard; + +// /** +// * The maximum #of concurrent tasks for this query across all operators and +// * shards. +// * +// * Note: This is not a safe option and MUST be removed. It is possible for +// * N-1 tasks to backup with the Nth task not running due to concurrent +// * execution of some of the N-t tasks. +// */ +// final private int maxConcurrentTasks = 10; + + /* + * FIXME Explore the use of this semaphore to limit the maximum #of messages + * further. (Note that placing a limit on messages would allow us to buffer + * potentially many chunks. That could be solved by making LocalChunkMessage + * transparent in terms of the #of chunks or _binding_sets_ which it is + * carrying, but let's take this one step at a time). + * + * The first issue is ensuring that the query continue to make progress when + * a semaphore with a limited #of permits is introduced. This is because the + * ChunkFutureTask only attempts to schedule the next task for a given + * (bopId,shardId) but we could have failed to accept outstanding work for + * any of a number of operator/shard combinations. Likewise, the QueryEngine + * tells the RunningQuery to schedule work each time a message is dropped + * onto the QueryEngine, but the signal to execute more work is lost if the + * permits were not available immediately. + * + * One possibility would be to have a delayed retry. Another would be to + * have ChunkTaskFuture try to run *any* messages, not just messages for the + * same (bopId,shardId). + * + * Also, when scheduling work, there needs to be some bias towards the + * downstream operators in the query plan in order to ensure that they get a + * chance to clear work from upstream operators. This suggests that we might + * carry an order[] and use it to scan the work queue -- or make the work + * queue a priority heap using the order[] to place a primary sort over the + * bopIds in terms of the evaluation order and letting the shardIds fall in + * increasing shard order so we have a total order for the priority heap (a + * total order may also require a tie breaker, but I think that the priority + * heap allows ties). + * + * This concept of memory overhead and permits would be associated with the + * workload waiting on a given node for processing. (In scale-out, we do not + * care how much data is moving in the cluster, only how much data is + * challenging an individual machine). + * + * This emphasize again why we need to get the data off of the Java heap. + * + * The same concept should apply for chained buffers. Maybe one way to do + * this is to allocate a fixed budget to each query for the Java heap and + * the C heap and then the query blocks or goes to disk. + */ +// /** +// * The maximum number of binding sets which may be outstanding before a task +// * which is producing binding sets will block. This value may be used to +// * limit the memory demand of a query in which some operators produce +// * binding sets faster than other operators can consume them. +// * +// * @todo This could be generalized to consider the Java heap separately from +// * the native heap as we get into the use of native ByteBuffers to +// * buffer intermediate results. +// * +// * @todo This is expressed in terms of messages and not {@link IBindingSet}s +// * because the {@link LocalChunkMessage} does not self-report the #of +// * {@link IBindingSet}s (or chunks). +// */ +// final private int maxOutstandingMessageCount = 100; +// +// /** +// * A counting semaphore used to limit the #of outstanding binding set chunks +// * which may be buffered before a producer will block when trying to emit +// * another chunk. +// * +// * @see HandleChunkBuffer#outputChunk(IBindingSet[]) +// * @see #scheduleNext(BSBundle) +// * +// * @see #maxOutstandingMessageCount +// */ +// final private Semaphore outstandingMessageSemaphore = new Semaphore(maxOutstandingMessageCount); /** * A collection of (bopId,partitionId) keys mapped onto a collection of @@ -471,6 +553,8 @@ this.bopIndex = BOpUtility.getIndex(query); + +// this.maxConcurrentTasksPerOperatorAndShard = 300; this.maxConcurrentTasksPerOperatorAndShard = query .getProperty( QueryEngineTestAnnotations.MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD, @@ -1203,6 +1287,33 @@ return false; } } +// if (runState.getTotalRunningCount() > maxConcurrentTasks) { +// // Too many already running. +// return false; +// } +// { +// /* +// * Verify that we can acquire sufficient permits to do some +// * work. +// */ +// final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues +// .get(bundle); +// if (queue == null || queue.isEmpty()) { +// // No work. +// return false; +// } +// // The queue could be increased, but this will be its minimum size. +// final int minQueueSize = queue.size(); +// if(!outstandingMessageSemaphore.tryAcquire(minQueueSize)) { +// // Not enough permits. +// System.err.println("Permits: required=" + minQueueSize +// + ", available=" +// + outstandingMessageSemaphore.availablePermits() +// + ", bundle=" + bundle); +// return false; +// } +// +// } // Remove the work queue for that (bopId,partitionId). final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues .remove(bundle); @@ -1210,7 +1321,7 @@ // no work return false; } - // Drain the work queue. + // Drain the work queue for that (bopId,partitionId). final List<IChunkMessage<IBindingSet>> messages = new LinkedList<IChunkMessage<IBindingSet>>(); queue.drainTo(messages); final int nmessages = messages.size(); @@ -1218,9 +1329,11 @@ * Combine the messages into a single source to be consumed by a * task. */ + int nchunks = 1; final IMultiSourceAsynchronousIterator<IBindingSet[]> source = new MultiSourceSequentialAsynchronousIterator<IBindingSet[]>(messages.remove(0).getChunkAccessor().iterator()); for (IChunkMessage<IBindingSet> msg : messages) { source.add(msg.getChunkAccessor().iterator()); + nchunks++; } /* * Create task to consume that source. @@ -1852,13 +1965,23 @@ */ private void outputChunk(final IBindingSet[] e) { - stats.unitsOut.add(((Object[]) e).length); + final int chunkSize = e.length; + + stats.unitsOut.add(chunkSize); stats.chunksOut.increment(); - sinkMessagesOut.addAndGet(q.getChunkHandler().handleChunk(q, bopId, - sinkId, e)); + final int messagesOut = q.getChunkHandler().handleChunk(q, bopId, + sinkId, e); + sinkMessagesOut.addAndGet(messagesOut); + +// try { +// q.outstandingMessageSemaphore.acquire(); +// } catch (InterruptedException e1) { +// throw new RuntimeException(e1); +// } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -218,6 +218,12 @@ public final CAT accessPathCount = new CAT(); /** + * The running sum of the range counts of the accepted as-bound access + * paths. + */ + public final CAT accessPathRangeCount = new CAT(); + + /** * The #of input solutions consumed (not just accepted). * <p> * This counter is highly correlated with {@link BOpStats#unitsIn} but @@ -333,6 +339,8 @@ accessPathCount.add(t.accessPathCount.get()); + accessPathRangeCount.add(t.accessPathRangeCount.get()); + accessPathChunksIn.add(t.accessPathChunksIn.get()); accessPathUnitsIn.add(t.accessPathUnitsIn.get()); @@ -358,6 +366,7 @@ protected void toString(final StringBuilder sb) { sb.append(",accessPathDups=" + accessPathDups.get()); sb.append(",accessPathCount=" + accessPathCount.get()); + sb.append(",accessPathRangeCount=" + accessPathRangeCount.get()); sb.append(",accessPathChunksIn=" + accessPathChunksIn.get()); sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.get()); sb.append(",inputSolutions=" + inputSolutions.get()); @@ -1562,6 +1571,10 @@ stats.accessPathCount.increment(); + // the range count of the as-bound access path (should be cached). + stats.accessPathRangeCount.add(accessPath + .rangeCount(false/* exact */)); + if (accessPath.getPredicate() instanceof IStarJoin<?>) { handleStarJoin(); Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBarData.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBarData.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnBarData.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -0,0 +1,608 @@ +package com.bigdata.bop.rdf.joinGraph; + +import java.io.File; +import java.util.Arrays; +import java.util.Properties; + +import junit.framework.TestCase2; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.openrdf.rio.RDFFormat; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.BOpIdFactory; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.IPredicate.Annotations; +import com.bigdata.bop.controller.JoinGraph; +import com.bigdata.bop.controller.JoinGraph.JGraph; +import com.bigdata.bop.controller.JoinGraph.Path; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.QueryLog; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.bop.fed.QueryEngineFactory; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.rdf.model.BigdataLiteral; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.model.BigdataValueFactory; +import com.bigdata.rdf.spo.SPOPredicate; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.DataLoader; +import com.bigdata.rdf.store.LocalTripleStore; +import com.bigdata.rdf.store.DataLoader.ClosureEnum; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.rule.IRule; +import com.bigdata.relation.rule.Rule; +import com.bigdata.relation.rule.eval.DefaultEvaluationPlan2; +import com.bigdata.relation.rule.eval.IRangeCountFactory; + +/** + * Unit tests for runtime query optimization using {@link JoinGraph} and the + * "bar data" test set. + * <p> + * Note: When running large queries, be sure to provide a sufficient heap, set + * the -server flag, etc. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: TestJoinGraph.java 3918 2010-11-08 21:31:17Z thompsonbry $ + */ +public class TestJoinGraphOnBarData extends TestCase2 { + + /** + * + */ + public TestJoinGraphOnBarData() { + } + + /** + * @param name + */ + public TestJoinGraphOnBarData(String name) { + super(name); + } + + @Override + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + +// p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient +// .toString()); + + p.setProperty(AbstractTripleStore.Options.QUADS_MODE, "true"); + + /* + * Don't compute closure in the data loader since it does TM, not + * database at once closure. + */ + p.setProperty(DataLoader.Options.CLOSURE, ClosureEnum.None.toString()); + + return p; + + } + + private Journal jnl; + + private AbstractTripleStore database; + + /** The initial sampling limit. */ + private final int limit = 100; + + /** The #of edges considered for the initial paths. */ + private final int nedges = 2; + + private QueryEngine queryEngine; + + private String namespace; + + /** + * When true, do a warm up run of the plan generated by the static query + * optimizer. + */ + private final boolean warmUp = false; + + /** + * The #of times to run each query. Use N GT ONE (1) if you want to converge + * onto the hot query performance. + */ + private final int ntrials = 1; + + /** + * When <code>true</code> runs the dynamic query optimizer and then evaluates + * the generated query plan. + */ + private final boolean runRuntimeQueryOptimizer = true; + + /** + * When <code>true</code> runs the static query optimizer and then evaluates + * the generated query plan. + */ + private final boolean runStaticQueryOptimizer = true; + + /** + * Loads LUBM U1 into a triple store. + */ + protected void setUp() throws Exception { + +// QueryLog.logTableHeader(); + + super.setUp(); + +// System.err.println(UUID.randomUUID().toString()); +// System.exit(0); + + final Properties properties = getProperties(); + + final File file; + { + /* + * Use a specific file generated by some external process. + */ + file = new File("/data/bardata/bigdata-bardata.WORM.jnl"); + namespace = "bardata"; + } + + properties.setProperty(Journal.Options.FILE, file.toString()); + +// properties.setProperty(Journal.Options.BUFFER_MODE,BufferMode.DiskRW.toString()); + +// file.delete(); + + if (!file.exists()) { + + jnl = new Journal(properties); + + final AbstractTripleStore tripleStore = new LocalTripleStore(jnl, + namespace, ITx.UNISOLATED, properties); + + // Create the KB instance. + tripleStore.create(); + + tripleStore.getDataLoader().loadFiles( + new File("/root/Desktop/Downloads/barData/barData.trig"), + null/* baseURI */, RDFFormat.TRIG, null/* defaultGraph */, + null/* filter */); + + // Truncate the journal (trim its size). + jnl.truncate(); + + // Commit the journal. + jnl.commit(); + + // Close the journal. + jnl.close(); + + } + + // Open the test resource. + jnl = new Journal(properties); + + queryEngine = QueryEngineFactory + .getQueryController(jnl/* indexManager */); + + database = (AbstractTripleStore) jnl.getResourceLocator().locate( + namespace, jnl.getLastCommitTime()); + + if (database == null) + throw new RuntimeException("Not found: " + namespace); + + } + + protected void tearDown() throws Exception { + + if (database != null) { + database = null; + } + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if(jnl != null) { + jnl.close(); + jnl = null; + } + + super.tearDown(); + + } + + /** + * Sample query for the synthetic data set. The query is arranged in a known + * good order. + * <p> + * Note: The runtime optimizer estimate of the cardinality of the edge [5 4] + * in this query is a lower bound, which makes this an interesting test + * case. The runtime optimizer detects this lower bound and replaces [nout] + * with the sum of the range count of the as-bound predicates for the join, + * which leads to an efficient query plan. + * + * <pre> + * SELECT ?f (COUNT(?d) AS ?total) WHERE { + * ?a <http://test/bar#beverageType> "Beer" . + * ?value <http://test/bar#orderItems> ?a. + * ?value <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://test/bar#Order> . + * ?a <http://test/bar#beverageType> ?d. + * ?value <http://test/bar#employee> ?b. + * ?b <http://test/bar#employeeNum> ?f. + * } GROUP BY ?f + * </pre> + * + * Note: Mike suggests that it is easier to read the query like this: + * + * <pre> + * SELECT ?employeeNum (COUNT(?type) AS ?total) + * WHERE { + * ?order <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> + * <http://test/bar#Order> . + * ?order <http://test/bar#orderItems> ?item . + * ?item <http://test/bar#beverageType> "Beer" . + * ?item <http://test/bar#beverageType> ?type . + * + * ?order <http://test/bar#employee> ?employee . + * + * ?employee <http://test/bar#employeeNum> ?employeeNum . + * } GROUP BY ?employeeNum + * </pre> + * + * @throws Exception + */ + public void test_query() throws Exception { + + /* + * Resolve terms against the lexicon. + */ + final BigdataValueFactory valueFactory = database.getLexiconRelation() + .getValueFactory(); + + final BigdataURI rdfType = valueFactory + .createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); + + final BigdataLiteral beer = valueFactory.createLiteral("Beer"); + + final BigdataURI beverageType = valueFactory + .createURI("http://test/bar#beverageType"); + + final BigdataURI orderItems = valueFactory + .createURI("http://test/bar#orderItems"); + + final BigdataURI Order = valueFactory + .createURI("http://test/bar#Order"); + + final BigdataURI employee = valueFactory + .createURI("http://test/bar#employee"); + + final BigdataURI employeeNum = valueFactory + .createURI("http://test/bar#employeeNum"); + + final BigdataValue[] terms = new BigdataValue[] { rdfType, beer, + beverageType, orderItems, Order, employee, employeeNum }; + + // resolve terms. + database.getLexiconRelation() + .addTerms(terms, terms.length, true/* readOnly */); + + { + for (BigdataValue tmp : terms) { + System.out.println(tmp + " : " + tmp.getIV()); + if (tmp.getIV() == null) + throw new RuntimeException("Not defined: " + tmp); + } + } + + final IPredicate[] preds; + final IPredicate p0, p1, p2, p3, p4, p5; + { +// a, value, d, b, f + final IVariable<?> a = Var.var("a"); + final IVariable<?> value = Var.var("value"); + final IVariable<?> d = Var.var("d"); + final IVariable<?> b = Var.var("b"); + final IVariable<?> f = Var.var("f"); + + final IVariable<?> g0 = Var.var("g0"); + final IVariable<?> g1 = Var.var("g1"); + final IVariable<?> g2 = Var.var("g2"); + final IVariable<?> g3 = Var.var("g3"); + final IVariable<?> g4 = Var.var("g4"); + final IVariable<?> g5 = Var.var("g5"); + + + // The name space for the SPO relation. + final String[] spoRelation = new String[] { namespace + ".spo" }; + + // The name space for the Lexicon relation. + final String[] lexRelation = new String[] { namespace + ".lex" }; + + final long timestamp = jnl.getLastCommitTime(); + + int nextId = 0; + +// ?a <http://test/bar#beverageType> "Beer" . + p0 = new SPOPredicate(new BOp[] { a, + new Constant(beverageType.getIV()), + new Constant(beer.getIV()), g0 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + + // ?value <http://test/bar#orderItems> ?a. + p1 = new SPOPredicate(new BOp[] { value, + new Constant(orderItems.getIV()), a, g1 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + +// ?value <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://test/bar#Order> . + p2 = new SPOPredicate(new BOp[] { value, + new Constant(rdfType.getIV()), + new Constant(Order.getIV()), g2 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + +// ?a <http://test/bar#beverageType> ?d. + p3 = new SPOPredicate(new BOp[] { a, + new Constant(beverageType.getIV()), d, g3 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + +// ?value <http://test/bar#employee> ?b. + p4 = new SPOPredicate(new BOp[] { value, + new Constant(employee.getIV()), b, g4 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + +// ?b <http://test/bar#employeeNum> ?f. + p5 = new SPOPredicate(new BOp[] { b, + new Constant(employeeNum.getIV()), f, g5 },// + new NV(BOp.Annotations.BOP_ID, nextId++),// + new NV(Annotations.TIMESTAMP, timestamp),// + new NV(IPredicate.Annotations.RELATION_NAME, spoRelation)// + ); + + // the vertices of the join graph (the predicates). + preds = new IPredicate[] { p0, p1, p2, p3, p4, p5 }; + + } + + doTest(preds); + + } // LUBM_Q9 + + /** + * + * @param preds + * @throws Exception + * + * @todo To actually test anything this needs to compare the results (or at + * least the #of result). We could also test for known good join + * orders as generated by the runtime optimizer, but that requires a + * known data set (e.g., U1 or U50) and non-random sampling. + * + * @todo This is currently providing a "hot run" comparison by a series of + * trials. This means that the IO costs are effectively being wiped + * away, assuming that the file system cache is larger than the data + * set. The other way to compare performance is a cold cache / cold + * JVM run using the known solutions produced by the runtime versus + * static query optimizers. + */ + private void doTest(final IPredicate[] preds) throws Exception { + + if (warmUp) + runQuery("Warmup", queryEngine, runStaticQueryOptimizer(preds)); + + /* + * Run the runtime query optimizer once (its cost is not counted + * thereafter). + */ + final IPredicate[] runtimePredOrder = runRuntimeQueryOptimizer(preds); + + long totalRuntimeTime = 0; + long totalStaticTime = 0; + + for (int i = 0; i < ntrials; i++) { + + final String RUNTIME = getName() + " : runtime["+i+"] :"; + + final String STATIC = getName() + " : static ["+i+"] :"; + + final String GIVEN = getName() + " : given ["+i+"] :"; + + if (true/* originalOrder */) { + + runQuery(GIVEN, queryEngine, preds); + + } + + if (runStaticQueryOptimizer) { + + totalStaticTime += runQuery(STATIC, queryEngine, + runStaticQueryOptimizer(preds)); + + } + + if (runRuntimeQueryOptimizer) { + + /* + * Run the runtime query optimizer each time (its overhead is + * factored into the running comparison of the two query + * optimizers). + */ +// final IPredicate[] runtimePredOrder = runRuntimeQueryOptimizer(new JGraph( +// preds)); + + // Evaluate the query using the selected join order. + totalRuntimeTime += runQuery(RUNTIME, queryEngine, + runtimePredOrder); + + } + + } + + if(runStaticQueryOptimizer&&runRuntimeQueryOptimizer) { + System.err.println(getName() + " : Total times" + // + ": static=" + totalStaticTime + // + ", runtime=" + totalRuntimeTime + // + ", delta(static-runtime)=" + (totalStaticTime - totalRuntimeTime)); + } + + } + + /** + * Apply the runtime query optimizer. + * <p> + * Note: This temporarily raises the {@link QueryLog} log level during + * sampling to make the log files cleaner (this can not be done for a + * deployed system since the logger level is global and there are concurrent + * query mixes). + * + * @return The predicates in order as recommended by the runtime query + * optimizer. + * + * @throws Exception + */ + private IPredicate[] runRuntimeQueryOptimizer(final IPredicate[] preds) throws Exception { + + final Logger tmp = Logger.getLogger(QueryLog.class); + final Level oldLevel = tmp.getEffectiveLevel(); + tmp.setLevel(Level.WARN); + + try { + + final JGraph g = new JGraph(preds); + + final Path p = g.runtimeOptimizer(queryEngine, limit, nedges); + +// System.err.println(getName() + " : runtime optimizer join order " +// + Arrays.toString(Path.getVertexIds(p.edges))); + + return p.getPredicates(); + + } finally { + + tmp.setLevel(oldLevel); + + } + + } + + /** + * Apply the static query optimizer. + * + * @return The predicates in order as recommended by the static query + * optimizer. + */ + private IPredicate[] runStaticQueryOptimizer(final IPredicate[] preds) { + + final BOpContextBase context = new BOpContextBase(queryEngine); + + final IRule rule = new Rule("tmp", null/* head */, preds, null/* constraints */); + + final DefaultEvaluationPlan2 plan = new DefaultEvaluationPlan2( + new IRangeCountFactory() { + + public long rangeCount(final IPredicate pred) { + return context.getRelation(pred).getAccessPath(pred) + .rangeCount(false); + } + + }, rule); + + // evaluation plan order. + final int[] order = plan.getOrder(); + + final int[] ids = new int[order.length]; + + final IPredicate[] out = new IPredicate[order.length]; + + for (int i = 0; i < order.length; i++) { + + out[i] = preds[order[i]]; + + ids[i] = out[i].getId(); + + } + +// System.err.println(getName() + " : static optimizer join order " +// + Arrays.toString(ids)); + + return out; + + } + + /** + * Run a query joining a set of {@link IPredicate}s in the given join order. + * + * @return The elapsed query time (ms). + */ + private static long runQuery(final String msg, + final QueryEngine queryEngine, final IPredicate[] predOrder) + throws Exception { + + final BOpIdFactory idFactory = new BOpIdFactory(); + + final int[] ids = new int[predOrder.length]; + + for(int i=0; i<ids.length; i++) { + + final IPredicate<?> p = predOrder[i]; + + idFactory.reserve(p.getId()); + + ids[i] = p.getId(); + + } + + final PipelineOp queryOp = JoinGraph.getQuery(idFactory, predOrder); + + // submit query to runtime optimizer. + final RunningQuery q = queryEngine.eval(queryOp); + + // drain the query results. + long nout = 0; + long nchunks = 0; + final IAsynchronousIterator<IBindingSet[]> itr = q.iterator(); + try { + while (itr.hasNext()) { + final IBindingSet[] chunk = itr.next(); + nout += chunk.length; + nchunks++; + } + } finally { + itr.close(); + } + + // check the Future for the query. + q.get(); + + // show the results. + final BOpStats stats = q.getStats().get(queryOp.getId()); + + System.err.println(msg + " : ids=" + Arrays.toString(ids) + + ", elapsed=" + q.getElapsed() + ", nout=" + nout + + ", nchunks=" + nchunks + ", stats=" + stats); + + return q.getElapsed(); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java 2010-11-22 19:22:05 UTC (rev 3973) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/joinGraph/TestJoinGraphOnLubm.java 2010-11-22 21:08:56 UTC (rev 3974) @@ -75,7 +75,12 @@ * FIXME There is now an option to converge onto the hot query * performance. Add an option to drop the file system cache and to * reopen the journal in order to converge on the cold query - * performance for the selected join orderings. + * performance for the selected join orderings. (Or, either devise a + * benchmark which can be used assess the relative performance with + * disk IO or use the LUBM benchmark at a data scale which would force + * queries to touch the disk (this actually requires a very high data + * scale for LUBM since the complex queries are not parameterized and + * tend to fully cache the relevant data on their first presentation.) * * FIXME Looks like U1000 Q2 runs into GC OH problems with both the * static and runtime query optimizers. Track down why. Note that Q2 @@ -167,6 +172,9 @@ /** The initial sampling limit. */ private final int limit = 100; + /** The #of edges considered for the initial paths. */ + private final int nedges = 2; + private QueryEngine queryEngine; private String namespace; @@ -184,10 +192,16 @@ private static final UUID resourceId = UUID.fromString("bb93d970-0cc4-48ca-ba9b-123412683b3d"); /** + * When true, do a warm up run of the plan generated by the static query + * optimizer. + */ + private final boolean warmUp = false; + + /** * The #of times to run each query. Use N GT ONE (1) if you want to converge * onto the hot query performance. */ - private final int ntrials = 5; + private final int ntrials = 1; /** * When <code>true</code> runs the dynamic query optimizer and then evaluates @@ -206,6 +220,8 @@ */ protected void setUp() throws Exception { +// QueryLog.logTableHeader(); + super.setUp(); // System.err.println(UUID.randomUUID().toString()); @@ -228,7 +244,7 @@ /* * Use a specific file generated by some external process. */ - final int nuniv = 50; + final int nuniv = 1000; file = new File("/data/lubm/U" + nuniv + "/bigdata-lubm.WORM.jnl"); namespace = "LUBM_U" + nuniv; } @@ -803,14 +819,14 @@ */ private void doTest(final IPredicate[] preds) throws Exception { - runQuery("Warmup", queryEngine, runStaticQueryOptimizer(preds)); + if (warmUp) + runQuery("Warmup", queryEngine, runStaticQueryOptimizer(preds)); /* * Run the runtime query optimizer once (its cost is not counted * thereafter). */ - final IPredicate[] runtimePredOrder = runRuntimeQueryOptimizer(new JGraph( - preds)); + final IPredicate[] runtimePredOrder = runRuntimeQueryOptimizer(preds); long totalRuntimeTime = 0; long totalStaticTime = 0; @@ -868,7 +884,7 @@ * * @throws Exception */ - private IPredicate[] runRuntimeQueryOptimizer(final JGraph g) throws Exception { + private IPredicate[] runRuntimeQueryOptimizer(final IPredicate[] preds) throws Exception { final Logger tmp = Logger.getLogger(QueryLog.class); final Level oldLevel = tmp.getEffectiveLevel(); @@ -876,7 +892,9 @@ try { - final Path p = g.runtimeOptimizer(queryEngine, limit); + final JGraph g = new JGraph(preds); + + final Path p = g.runtimeOptimizer(queryEngine, limit, nedges); // System.err.println(getName() + " : runtime optimizer join order " // + Arrays.toString(Path.getVertexIds(p.edges))); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |