From: <tho...@us...> - 2013-12-30 15:36:09
|
Revision: 7700 http://bigdata.svn.sourceforge.net/bigdata/?rev=7700&view=rev Author: thompsonbry Date: 2013-12-30 15:36:02 +0000 (Mon, 30 Dec 2013) Log Message: ----------- Added parallel expansion of the join paths. Each join path is expanded in a separate thread. However, if there are multiple possible expansions for a given join path, then those expansions are processed in sequence. Note: This change required the introduction of a thread-safe collection for the edgeSamples map within the scope of the expand() method. @see #64 (RTO) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/PathIds.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2013-12-30 15:05:21 UTC (rev 7699) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2013-12-30 15:36:02 UTC (rev 7700) @@ -1004,7 +1004,7 @@ */ public Path[] expand(final QueryEngine queryEngine, int limitIn, final int round, final Path[] a, - final Map<PathIds, EdgeSample> edgeSamples) throws Exception { + Map<PathIds, EdgeSample> edgeSamples) throws Exception { if (queryEngine == null) throw new IllegalArgumentException(); @@ -1016,7 +1016,14 @@ throw new IllegalArgumentException(); if (a.length == 0) throw new IllegalArgumentException(); - + + /* + * Ensure that we use a synchronized view of this collection since we + * will write on it from parallel threads when we expand the join paths + * in parallel. + */ + edgeSamples = Collections.synchronizedMap(edgeSamples); + // // increment the limit by itself in each round. // final int limit = (round + 1) * limitIn; @@ -1031,164 +1038,214 @@ if (log.isDebugEnabled()) log.debug("Expanding paths: #paths(in)=" + a.length); - final List<Path> tmp = new LinkedList<Path>(); + // The new set of paths to be explored. + final List<Path> tmpAll = new LinkedList<Path>(); + // Setup tasks to expand the current join paths. + final List<Callable<List<Path>>> tasks = new LinkedList<Callable<List<Path>>>(); for (Path x : a) { - /* - * We already increased the sample limit for the path in the loop - * above. - */ - final int limit = x.edgeSample.limit; + tasks.add(new ExpandPathTask(queryEngine, x, edgeSamples)); - /* - * The set of vertices used to expand this path in this round. - */ - final Set<Vertex> used = new LinkedHashSet<Vertex>(); + } - { + // Expand paths in parallel. + final List<Future<List<Path>>> futures = queryEngine.getIndexManager().getExecutorService() + .invokeAll(tasks); + + // Check future, collecting new paths from each task. + for(Future<List<Path>> f : futures) { - /* - * Any vertex which (a) does not appear in the path to be - * extended; (b) has not already been used to extend the path; - * and (c) does not share any variables indirectly via - * constraints is added to this collection. - * - * If we are not able to extend the path at least once using a - * constrained join then we will use this collection as the - * source of unconnected edges which need to be used to extend - * the path. - */ - final Set<Vertex> nothingShared = new LinkedHashSet<Vertex>(); - - // Consider all vertices. - for (Vertex tVertex : V) { + tmpAll.addAll(f.get()); + + } - // Figure out which vertices are already part of this path. - final boolean vFound = x.contains(tVertex); + /* + * Now examine the set of generated and sampled join paths. If any paths + * span the same vertices then they are alternatives and we can pick the + * best alternative now and prune the other alternatives for those + * vertices. + */ + final Path[] paths_tp1 = tmpAll.toArray(new Path[tmpAll.size()]); - if (vFound) { - // Vertex is already part of this path. - if (log.isTraceEnabled()) - log.trace("Vertex: " + tVertex - + " - already part of this path."); - continue; - } + final Path[] paths_tp1_pruned = pruneJoinPaths(paths_tp1, edgeSamples); - if (used.contains(tVertex)) { - // Vertex already used to extend this path. - if (log.isTraceEnabled()) - log - .trace("Vertex: " - + tVertex - + " - already used to extend this path."); - continue; - } + if (log.isDebugEnabled()) // shows which paths were pruned. + log.info("\n*** round=" + round + ": paths{in=" + a.length + + ",considered=" + paths_tp1.length + ",out=" + + paths_tp1_pruned.length + "}\n" + + JGraph.showTable(paths_tp1, paths_tp1_pruned)); - // FIXME RTO: Replace with StaticAnalysis. - if (!PartitionedJoinGroup.canJoinUsingConstraints(// - x.getPredicates(),// path - tVertex.pred,// vertex - C// constraints - )) { - /* - * Vertex does not share variables either directly - * or indirectly. - */ - if (log.isTraceEnabled()) - log - .trace("Vertex: " - + tVertex - + " - unconstrained join for this path."); - nothingShared.add(tVertex); - continue; - } + if (log.isInfoEnabled()) // only shows the surviving paths. + log.info("\n*** round=" + round + + ": paths{in=" + a.length + ",considered=" + + paths_tp1.length + ",out=" + paths_tp1_pruned.length + + "}\n" + JGraph.showTable(paths_tp1_pruned)); - // add the new vertex to the set of used vertices. - used.add(tVertex); + return paths_tp1_pruned; - // Extend the path to the new vertex. - final Path p = x - .addEdge(queryEngine, limit, tVertex, /* dynamicEdge, */ - C, x.getVertexCount() + 1 == V.length/* pathIsComplete */); + } - // Add to the set of paths for this round. - tmp.add(p); + /** + * Task expands a path by one edge into one or more new paths. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private class ExpandPathTask implements Callable<List<Path>> { - // Record the sample for the new path. - if (edgeSamples.put(new PathIds(p.getVertexIds()), - p.edgeSample) != null) - throw new AssertionError(); + private final QueryEngine queryEngine; + private final Path x; + /** + * Note: The collection provided by the caller MUST be thread-safe since + * this task will be run by parallel threads over the different join + * paths from the last round. There will not be any conflict over writes + * on this map since each {@link PathIds} instance resulting from the + * expansion will be unique, but we still need to use a thread-safe + * collection since there will be concurrent modifications to this map. + */ + private final Map<PathIds, EdgeSample> edgeSamples; - if (log.isTraceEnabled()) - log.trace("Extended path with dynamic edge: vnew=" - + tVertex.pred.getId() + ", new path=" + p); + public ExpandPathTask(final QueryEngine queryEngine, final Path x, + final Map<PathIds, EdgeSample> edgeSamples) { + this.queryEngine = queryEngine; + this.x = x; + this.edgeSamples = edgeSamples; + } + + @Override + public List<Path> call() throws Exception { + /* + * We already increased the sample limit for the path in the loop + * above. + */ + final int limit = x.edgeSample.limit; - } // next vertex. + /* + * The set of vertices used to expand this path in this round. + */ + final Set<Vertex> used = new LinkedHashSet<Vertex>(); - if (tmp.isEmpty()) { + /* + * Any vertex which (a) does not appear in the path to be + * extended; (b) has not already been used to extend the path; + * and (c) does not share any variables indirectly via + * constraints is added to this collection. + * + * If we are not able to extend the path at least once using a + * constrained join then we will use this collection as the + * source of unconnected edges which need to be used to extend + * the path. + */ + final Set<Vertex> nothingShared = new LinkedHashSet<Vertex>(); + + // The new set of paths to be explored as extensions to this path. + final List<Path> tmp = new LinkedList<Path>(); + + // Consider all vertices. + for (Vertex tVertex : V) { - /* - * No constrained joins were identified so we must consider - * edges which represent fully unconstrained joins. - */ + // Figure out which vertices are already part of this path. + final boolean vFound = x.contains(tVertex); - assert !nothingShared.isEmpty(); + if (vFound) { + // Vertex is already part of this path. + if (log.isTraceEnabled()) + log.trace("Vertex: " + tVertex + + " - already part of this path."); + continue; + } + if (used.contains(tVertex)) { + // Vertex already used to extend this path. + if (log.isTraceEnabled()) + log + .trace("Vertex: " + + tVertex + + " - already used to extend this path."); + continue; + } + + // FIXME RTO: Replace with StaticAnalysis. + if (!PartitionedJoinGroup.canJoinUsingConstraints(// + x.getPredicates(),// path + tVertex.pred,// vertex + C// constraints + )) { /* - * Choose any vertex from the set of those which do - * not share any variables with the join path. Since - * all of these are fully unconstrained joins we do - * not want to expand the join path along multiple - * edges in this iterator, just along a single - * unconstrained edge. + * Vertex does not share variables either directly + * or indirectly. */ - final Vertex tVertex = nothingShared.iterator().next(); - - // Extend the path to the new vertex. - final Path p = x - .addEdge(queryEngine, limit, tVertex,/* dynamicEdge */ - C, x.getVertexCount() + 1 == V.length/* pathIsComplete */); + if (log.isTraceEnabled()) + log + .trace("Vertex: " + + tVertex + + " - unconstrained join for this path."); + nothingShared.add(tVertex); + continue; + } - // Add to the set of paths for this round. - tmp.add(p); + // add the new vertex to the set of used vertices. + used.add(tVertex); - if (log.isTraceEnabled()) - log.trace("Extended path with dynamic edge: vnew=" - + tVertex.pred.getId() + ", new path=" + p); + // Extend the path to the new vertex. + final Path p = x + .addEdge(queryEngine, limit, tVertex, /* dynamicEdge, */ + C, x.getVertexCount() + 1 == V.length/* pathIsComplete */); - } + // Add to the set of paths for this round. + tmp.add(p); - } + // Record the sample for the new path. + if (edgeSamples.put(new PathIds(p.getVertexIds()), + p.edgeSample) != null) + throw new AssertionError(); - } // next path + if (log.isTraceEnabled()) + log.trace("Extended path with dynamic edge: vnew=" + + tVertex.pred.getId() + ", new path=" + p); - /* - * Now examine the set of generated and sampled join paths. If any paths - * span the same vertices then they are alternatives and we can pick the - * best alternative now and prune the other alternatives for those - * vertices. - */ - final Path[] paths_tp1 = tmp.toArray(new Path[tmp.size()]); + } // next target vertex. - final Path[] paths_tp1_pruned = pruneJoinPaths(paths_tp1, edgeSamples); + if (tmp.isEmpty()) { - if (log.isDebugEnabled()) // shows which paths were pruned. - log.info("\n*** round=" + round + ": paths{in=" + a.length - + ",considered=" + paths_tp1.length + ",out=" - + paths_tp1_pruned.length + "}\n" - + JGraph.showTable(paths_tp1, paths_tp1_pruned)); + /* + * No constrained joins were identified as extensions of this + * join path, so we must consider edges which represent fully + * unconstrained joins. + */ - if (log.isInfoEnabled()) // only shows the surviving paths. - log.info("\n*** round=" + round - + ": paths{in=" + a.length + ",considered=" - + paths_tp1.length + ",out=" + paths_tp1_pruned.length - + "}\n" + JGraph.showTable(paths_tp1_pruned)); + assert !nothingShared.isEmpty(); - return paths_tp1_pruned; + /* + * Choose any vertex from the set of those which do + * not share any variables with the join path. Since + * all of these are fully unconstrained joins we do + * not want to expand the join path along multiple + * edges in this iterator, just along a single + * unconstrained edge. + */ + final Vertex tVertex = nothingShared.iterator().next(); + + // Extend the path to the new vertex. + final Path p = x + .addEdge(queryEngine, limit, tVertex,/* dynamicEdge */ + C, x.getVertexCount() + 1 == V.length/* pathIsComplete */); + // Add to the set of paths for this round. + tmp.add(p); + + if (log.isTraceEnabled()) + log.trace("Extended path with dynamic edge: vnew=" + + tVertex.pred.getId() + ", new path=" + p); + + } // if(tmp.isEmpty()) + + return tmp; + + } + } - + /** * Return the {@link Vertex} whose {@link IPredicate} is associated with * the given {@link BOp.Annotations#BOP_ID}. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/PathIds.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/PathIds.java 2013-12-30 15:05:21 UTC (rev 7699) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/PathIds.java 2013-12-30 15:36:02 UTC (rev 7700) @@ -83,6 +83,7 @@ * vertices may be expressed and also recognizes that the vertex hash codes * are based on the bop ids, which are often small integers. */ + @Override public int hashCode() { int h = hash; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |