From: <tho...@us...> - 2013-12-30 16:02:25
|
Revision: 7701 http://bigdata.svn.sourceforge.net/bigdata/?rev=7701&view=rev Author: thompsonbry Date: 2013-12-30 16:02:18 +0000 (Mon, 30 Dec 2013) Log Message: ----------- Added parallel resampling of the vertices in the join graph. See #64 (RTO) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2013-12-30 15:36:02 UTC (rev 7700) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/joinGraph/rto/JGraph.java 2013-12-30 16:02:18 UTC (rev 7701) @@ -799,16 +799,22 @@ } - for (Path x : a) { + // re-sample vertices. + sampleVertices(queryEngine, vertexLimit); + +// for (Map.Entry<Vertex, AtomicInteger> e : vertexLimit.entrySet()) { +// +//// final Vertex v = x.vertices[0]; +//// final int limit = vertexLimit.get(v).intValue(); +// +// final Vertex v = e.getKey(); +// +// final int limit = e.getValue().get(); +// +// v.sample(queryEngine, limit, sampleType); +// +// } - final Vertex v = x.vertices[0]; - - final int limit = vertexLimit.get(v).intValue(); - - v.sample(queryEngine, limit, sampleType); - - } - } /* @@ -830,15 +836,15 @@ int nunderflow = 0; for (Path x : a) { - /* - * Get the new sample limit for the path. - * - * TODO We only need to increase the sample limit starting at the - * vertex where we have a cardinality underflow or variability in - * the cardinality estimate. This is increasing the limit in each - * round of expansion, which means that we are reading more data - * than we really need to read. - */ + /* + * Get the new sample limit for the path. + * + * TODO We only need to increase the sample limit starting at the + * vertex where we have a cardinality underflow or variability in + * the cardinality estimate. This is increasing the limit in each + * round of expansion, which means that we are reading more data + * than we really need to read. + */ final int limit = x.getNewLimit(limitIn); // The cutoff join sample of the one step shorter path segment. @@ -1289,10 +1295,44 @@ */ public void sampleAllVertices(final QueryEngine queryEngine, final int limit) { + final Map<Vertex, AtomicInteger> vertexLimit = new LinkedHashMap<Vertex, AtomicInteger>(); + + for (Vertex v : V) { + + vertexLimit.put(v,new AtomicInteger(limit)); + + } + + sampleVertices(queryEngine, vertexLimit); + + } + + /** + * (Re-)sample a set of vertices. Sampling is done in parallel. + * <p> + * Note: A request to re-sample a vertex is a NOP unless the limit has been + * increased since the last time the vertex was sampled. It is also a NOP if + * the vertex has been fully materialized. + * + * @param queryEngine + * @param vertexLimit + * A map whose keys are the {@link Vertex vertices} to be + * (re-)samples and whose values are the <code>limit</code> to be + * used when sampling the associated vertex. This map is + * read-only so it only needs to be thread-safe for concurrent + * readers. + */ + private void sampleVertices(final QueryEngine queryEngine, + final Map<Vertex, AtomicInteger> vertexLimit) { + // Setup tasks to sample vertices. final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); - for (Vertex v : V) { + for (Map.Entry<Vertex, AtomicInteger> e : vertexLimit.entrySet()) { + final Vertex v = e.getKey(); + + final int limit = e.getValue().get(); + tasks.add(new SampleVertexTask(queryEngine, v, limit, sampleType)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |