From: <tho...@us...> - 2014-01-05 19:23:14
|
Revision: 7729 http://bigdata.svn.sourceforge.net/bigdata/?rev=7729&view=rev Author: thompsonbry Date: 2014-01-05 19:23:06 +0000 (Sun, 05 Jan 2014) Log Message: ----------- Increased parallelism in the RTO. The cutoff joins for the initial edges in round zero are now executed in parallel. See #64. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/Path.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2014-01-05 17:35:43 UTC (rev 7728) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2014-01-05 19:23:06 UTC (rev 7729) @@ -720,30 +720,30 @@ } - /** - * Resample the initial vertices for the specified join paths and then - * resample the cutoff join for each given join path in path order. - * - * @param queryEngine - * The query engine. - * @param limitIn - * The original limit. - * @param round - * The round number in [1:n]. - * @param a - * The set of paths from the previous round. For the first round, - * this is formed from the initial set of edges to consider. - * @param edgeSamples - * A map used to associate join path segments (expressed as an - * ordered array of bopIds) with {@link EdgeSample}s to avoid - * redundant effort. - * - * @return The number of join paths which are experiencing cardinality - * estimate underflow. - * - * @throws Exception - */ - public int resamplePaths(final QueryEngine queryEngine, int limitIn, + /** + * Resample the initial vertices for the specified join paths and then + * resample the cutoff join for each given join path in path order. + * + * @param queryEngine + * The query engine. + * @param limitIn + * The original limit. + * @param round + * The round number in [1:n]. + * @param a + * The set of paths from the previous round. For the first round, + * this is formed from the initial set of edges to consider. + * @param edgeSamples + * A map used to associate join path segments (expressed as an + * ordered array of bopIds) with {@link EdgeSample}s to avoid + * redundant effort. + * + * @return The number of join paths which are experiencing cardinality + * estimate underflow. + * + * @throws Exception + */ + protected int resamplePaths(final QueryEngine queryEngine, int limitIn, final int round, final Path[] a, final Map<PathIds, EdgeSample> edgeSamples) throws Exception { @@ -799,43 +799,90 @@ } - // re-sample vertices. + // re-sample vertices (this is done in parallel). sampleVertices(queryEngine, vertexLimit); - -// for (Map.Entry<Vertex, AtomicInteger> e : vertexLimit.entrySet()) { -// -//// final Vertex v = x.vertices[0]; -//// final int limit = vertexLimit.get(v).intValue(); -// -// final Vertex v = e.getKey(); -// -// final int limit = e.getValue().get(); -// -// v.sample(queryEngine, limit, sampleType); -// -// } } /* - * Re-sample the cutoff join for each edge in each of the existing - * paths using the newly re-sampled vertices. + * Re-sample the cutoff join for each edge in each of the existing paths + * using the newly re-sampled vertices. * * Note: The only way to increase the accuracy of our estimates for - * edges as we extend the join paths is to re-sample each edge in - * the join path in path order. + * edges as we extend the join paths is to re-sample each edge in the + * join path in path order. * - * Note: An edge must be sampled for each distinct join path prefix - * in which it appears within each round. However, it is common for - * surviving paths to share a join path prefix, so do not re-sample - * a given path prefix more than once per round. + * Note: An edge must be sampled for each distinct join path prefix in + * which it appears within each round. However, it is common for + * surviving paths to share a join path prefix, so do not re-sample a + * given path prefix more than once per round. + * + * FIXME PARALLELIZE: Parallelize the re-sampling for the active paths. + * This step is responsible for deepening the samples on the non-pruned + * paths. There is a data race that can occur since the [edgeSamples] + * map can contain samples for the same sequence of edges in different + * paths. This is because two paths can shared a common prefix sequence + * of edges, e.g., [2, 4, 6, 7] and [2, 4, 6, 9] share the path prefix + * [2, 4, 6]. Therefore both inspection and update of the [edgeSamples] + * map MUST be synchronized. This code is single threaded since that + * synchronization mechanism has not yet been put into place. The + * obvious way to handle this is to use a memoization pattern for the + * [ids] key for the [edgeSamples] map. This will ensure that the + * threads that need to resample a given edge will coordinate with the + * first such thread doing the resampling and the other thread(s) + * blocking until the resampled edge is available. */ if (log.isDebugEnabled()) log.debug("Re-sampling in-use path segments."); + // #of paths with cardinality estimate underflow. int nunderflow = 0; + final List<Callable<Boolean>> tasks = new LinkedList<Callable<Boolean>>(); for (Path x : a) { + tasks.add(new ResamplePathTask(queryEngine, x, limitIn, edgeSamples)); + + } // next Path [x]. + + // Execute in the caller's thread. + for (Callable<Boolean> task : tasks) { + + if (task.call()) { + + nunderflow++; + + } + + } + + return nunderflow; + + } + + /** + * Resample the edges along a join path. Edges are resampled based on the + * desired cutoff limit and only as necessary. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class ResamplePathTask implements Callable<Boolean> { + + private final QueryEngine queryEngine; + private final Path x; + private final int limitIn; + private final Map<PathIds, EdgeSample> edgeSamples; + + public ResamplePathTask(final QueryEngine queryEngine, final Path x, + final int limitIn, final Map<PathIds, EdgeSample> edgeSamples) { + this.queryEngine = queryEngine; + this.x = x; + this.limitIn = limitIn; + this.edgeSamples = edgeSamples; + } + + @Override + public Boolean call() throws Exception { /* * Get the new sample limit for the path. * @@ -845,9 +892,9 @@ * round of expansion, which means that we are reading more data * than we really need to read. */ - final int limit = x.getNewLimit(limitIn); + final int limit = x.getNewLimit(limitIn); - // The cutoff join sample of the one step shorter path segment. + // The cutoff join sample of the one step shorter path segment. EdgeSample priorEdgeSample = null; for (int segmentLength = 2; segmentLength <= x.vertices.length; segmentLength++) { @@ -893,7 +940,7 @@ queryEngine, limit,// x.getPathSegment(2),// 1st edge. C,// constraints - V.length == 2,// pathIsComplete + V.length == 2,// pathIsComplete x.vertices[0].sample// source sample. ); @@ -955,19 +1002,19 @@ // Save the result on the path. x.edgeSample = priorEdgeSample; - - if (x.edgeSample.estimateEnum == EstimateEnum.Underflow) { - if (log.isDebugEnabled()) - log.debug("Cardinality underflow: " + x); - nunderflow++; + + final boolean underflow = x.edgeSample.estimateEnum == EstimateEnum.Underflow; + if (underflow) { + if (log.isDebugEnabled()) + log.debug("Cardinality underflow: " + x); } - } // next Path [x]. - - return nunderflow; - + // Done. + return underflow; + } + } - + /** * Do one breadth first expansion. In each breadth first expansion we extend * each of the active join paths by one vertex for each remaining vertex @@ -1269,30 +1316,36 @@ return null; } - /** - * Obtain a sample and estimated cardinality (fast range count) for each - * vertex. - * - * @param queryEngine - * The query engine. - * @param limit - * The sample size. - * - * TODO Only sample vertices with an index. - * - * TODO Consider other cases where we can avoid sampling a vertex - * or an initial edge. - * <p> - * Be careful about rejecting high cardinality vertices here as - * they can lead to good solutions (see the "bar" data set - * example). - * <p> - * BSBM Q5 provides a counter example where (unless we translate - * it into a key-range constraint on an index) some vertices do - * not share a variable directly and hence will materialize the - * full cross product before filtering which is *really* - * expensive. - */ + /** + * Obtain a sample and estimated cardinality (fast range count) for each + * vertex. + * + * @param queryEngine + * The query engine. + * @param limit + * The sample size. + * + * TODO Only sample vertices with an index. + * + * TODO Consider other cases where we can avoid sampling a vertex + * or an initial edge. + * <p> + * Be careful about rejecting high cardinality vertices here as + * they can lead to good solutions (see the "bar" data set + * example). + * <p> + * BSBM Q5 provides a counter example where (unless we translate + * it into a key-range constraint on an index) some vertices do + * not share a variable directly and hence will materialize the + * full cross product before filtering which is *really* + * expensive. + * + * FIXME We need attach any access path filters that are required + * for named graphs or scale-out for the RTO to function in those + * environments. We DO NOT need to attach SPARQL FILTERs here - + * those get applied when we evaluate the cutoff joins from one + * vertex to another. + */ public void sampleAllVertices(final QueryEngine queryEngine, final int limit) { final Map<Vertex, AtomicInteger> vertexLimit = new LinkedHashMap<Vertex, AtomicInteger>(); @@ -1436,7 +1489,7 @@ private Path[] estimateInitialEdgeWeights(final QueryEngine queryEngine, final int limit) throws Exception { - final List<Path> paths = new LinkedList<Path>(); + final List<Callable<Path>> tasks = new LinkedList<Callable<Path>>(); /* * Examine all unordered vertex pairs (v1,v2) once. If any vertex has @@ -1519,30 +1572,86 @@ } - // The path segment - final IPredicate<?>[] preds = new IPredicate[] { v.pred, vp.pred }; + tasks.add(new CutoffJoinTask(queryEngine, limit, v, vp, + pathIsComplete)); - // cutoff join of the edge (v,vp) - final EdgeSample edgeSample = Path.cutoffJoin(queryEngine,// - limit, // sample limit - preds, // ordered path segment. - C, // constraints - pathIsComplete,// - v.sample // sourceSample - ); + } // next other vertex. + + } // next vertex - final Path p = new Path(v, vp, edgeSample); + /* + * Now sample those paths in parallel. + */ - paths.add(p); + final List<Path> paths = new LinkedList<Path>(); - } // next other vertex. +// // Sample the paths in the caller's thread. +// for(Callable<Path> task : tasks) { +// +// paths.add(task.call()); +// +// } - } // next vertex - + // Sample the paths in parallel. + final List<Future<Path>> futures = queryEngine.getIndexManager() + .getExecutorService().invokeAll(tasks); + + // Check future, collecting new paths from each task. + for (Future<Path> f : futures) { + + paths.add(f.get()); + + } + return paths.toArray(new Path[paths.size()]); } + + /** + * Cutoff sample an initial join path consisting of two vertices. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private class CutoffJoinTask implements Callable<Path> { + private final QueryEngine queryEngine; + private final int limit; + private final Vertex v; + private final Vertex vp; + private final boolean pathIsComplete; + + public CutoffJoinTask(final QueryEngine queryEngine, final int limit, + final Vertex v, final Vertex vp, final boolean pathIsComplete) { + this.queryEngine = queryEngine; + this.limit = limit; + this.v = v; + this.vp = vp; + this.pathIsComplete = pathIsComplete; + } + + @Override + public Path call() throws Exception { + + // The path segment + final IPredicate<?>[] preds = new IPredicate[] { v.pred, vp.pred }; + + // cutoff join of the edge (v,vp) + final EdgeSample edgeSample = Path.cutoffJoin(queryEngine,// + limit, // sample limit + preds, // ordered path segment. + C, // constraints + pathIsComplete,// + v.sample // sourceSample + ); + + final Path p = new Path(v, vp, edgeSample); + + return p; + + } + + } + /** * Prune paths which are dominated by other paths. Paths are extended in * each round. Paths from previous rounds are always pruned. Of the new @@ -1607,10 +1716,10 @@ final Path Pj = a[j]; if (Pj.edgeSample == null) throw new RuntimeException("Not sampled: " + Pj); - if (pruned.contains(Pj)) { - // already pruned. + if (pruned.contains(Pj)) { + // already pruned. continue; - } + } final boolean isPiSuperSet = Pi.isUnorderedVariant(Pj); if (!isPiSuperSet) { // Can not directly compare these join paths. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/Path.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/Path.java 2014-01-05 17:35:43 UTC (rev 7728) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/Path.java 2014-01-05 19:23:06 UTC (rev 7729) @@ -1,3 +1,26 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ package com.bigdata.bop.joinGraph.rto; import java.util.Arrays; @@ -847,7 +870,7 @@ final List<IBindingSet> result = new LinkedList<IBindingSet>(); try { - int nresults = 0; + int nresults = 0; try { IBindingSet bset = null; // Figure out the #of source samples consumed. @@ -862,7 +885,7 @@ } } } finally { - // ensure terminated regardless. + // ensure terminated regardless. runningQuery.cancel(true/* mayInterruptIfRunning */); } } finally { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |