From: <tho...@us...> - 2013-10-19 20:02:02
|
Revision: 7461 http://bigdata.svn.sourceforge.net/bigdata/?rev=7461&view=rev Author: thompsonbry Date: 2013-10-19 20:01:51 +0000 (Sat, 19 Oct 2013) Log Message: ----------- Added CC and PR implementations and a reducer to report the histogram over the #of vertices at each depth for BFS. See #629 (Graph Processing API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/VertexDistribution.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/AbstractGraphTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestSSSP.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestCC.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestPR.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.png branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.ttl Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,36 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph; + +/** + * Type-safe enumeration characterizing the assumptions of an algorithm + * concerning its initial frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public enum FrontierEnum { + + /** + * The initial frontier is a single vertex. + */ + SingleVertex, + + /** + * The initial frontier is all vertices. + */ + AllVertices; + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -50,6 +50,11 @@ IGASState<VS, ES, ST> getGASState(); /** + * The graph access object. + */ + IGraphAccessor getGraphAccessor(); + + /** * Execute one iteration. * * @param stats Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -39,8 +39,28 @@ public interface IGASOptions<VS, ES, ST> { /** + * Return the nature of the initial frontier for this algorithm. + */ + FrontierEnum getInitialFrontierEnum(); + + /** + * Return the type of edges that must exist when sampling the vertices of + * the graph. If {@link EdgesEnum#InEdges} is specified, then each sampled + * vertex will have at least one in-edge. If {@link EdgesEnum#OutEdges} is + * specified, then each sampled vertex will have at least one out-edge. To + * sample all vertices regardless of their edges, specify + * {@value EdgesEnum#NoEdges}. To require that each vertex has at least one + * in-edge and one out-edge, specify {@link EdgesEnum#AllEdges}. + */ + EdgesEnum getSampleEdgesFilter(); + + /** * Return the set of edges to which the GATHER is applied -or- * {@link EdgesEnum#NoEdges} to skip the GATHER phase. + * + * TODO We may need to set dynamically when visting the vertex in the + * frontier rather than having it be a one-time property of the vertex + * program. */ EdgesEnum getGatherEdges(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -32,21 +32,46 @@ * the generic type for the per-edge state, but that is not always * true. The SUM type is scoped to the GATHER + SUM operation (NOT * the computation). - * + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * + * TODO DESIGN: The broad problem with this approach is that it is + * overly coupled with the Java object model. Instead it needs to expose + * an API that is aimed at vectored (for GPU) execution with 2D + * partitioning (for out-of-core, multi-node). */ public interface IGASProgram<VS, ES, ST> extends IGASOptions<VS, ES, ST> { /** + * One time initialization before the {@link IGASProgram} is executed. + * + * @param ctx + * The evaluation context. + */ + void before(IGASContext<VS, ES, ST> ctx); + + /** + * One time initialization after the {@link IGASProgram} is executed. + * + * @param ctx + * The evaluation context. + */ + void after(IGASContext<VS, ES, ST> ctx); + + /** * Callback to initialize the state for each vertex in the initial frontier * before the first iteration. A typical use case is to set the distance of * the starting vertex to ZERO (0). * * @param u * The vertex. + * + * TODO We do not need both the {@link IGASContext} and the + * {@link IGASState}. The latter is available from the former. */ - void init(IGASState<VS, ES, ST> state, Value u); - + void initVertex(IGASContext<VS, ES, ST> ctx, IGASState<VS, ES, ST> state, + Value u); + /** * GATHER is a map/reduce over the edges of the vertex. The SUM provides * pair-wise reduction over the edges visited by the GATHER. @@ -94,8 +119,14 @@ * TODO DESIGN: Rather than pair-wise reduction, why not use * vectored reduction? That way we could use an array of primitives * as well as objects. + * + * TODO DESIGN: This should be a reduced interface since we only + * need access to the comparator semantics while the [state] + * provides random access to vertex and edge state. The comparator + * is necessary for MIN semantics for the {@link Value} + * implementation of the backend. E.g., Value versus IV. */ - ST sum(ST left, ST right); + ST sum(final IGASState<VS, ES, ST> state, ST left, ST right); /** * Apply the reduced aggregation computed by GATHER + SUM to the vertex. @@ -155,7 +186,7 @@ * Return <code>true</code> iff the algorithm should continue. This is * invoked after every iteration, once the new frontier has been computed * and {@link IGASState#round()} has been advanced. An implementation may - * simple return <code>true</code>, in which case the algorithm will + * simply return <code>true</code>, in which case the algorithm will * continue IFF the current frontier is not empty. * <p> * Note: While this can be used to make custom decisions concerning the Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -51,6 +51,8 @@ /** * {@link #reset()} the computation state and populate the initial frontier. * + * @param ctx + * The execution context. * @param v * One or more vertices that will be included in the initial * frontier. @@ -58,7 +60,7 @@ * @throws IllegalArgumentException * if no vertices are specified. */ - void init(Value... v); + void setFrontier(IGASContext<VS, ES, ST> ctx, Value... v); /** * Discard computation state (the frontier, vertex state, and edge state) @@ -227,5 +229,19 @@ * TODO RDR : Link to an RDR wiki page as well. */ Statement decodeStatement(Value v); + + /** + * Return -1, o, or 1 if <code>u</code> is LT, EQ, or GT <code>v</code>. A + * number of GAS programs depend on the ability to place an order over the + * vertex identifiers, as does 2D partitioning. The ordering provided by + * this method MAY be arbitrary, but it MUST be total and stable across the + * life-cycle of the GAS program evaluation. + * + * @param u + * A vertex. + * @param v + * Another vertex. + */ + int compareTo(Value u, Value v); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -31,18 +31,37 @@ public interface IGraphAccessor { /** - * Return the edges for the vertex. + * Return the #of edges of the specified type for the given vertex. + * <p> + * Note: This is not always a flyweight operation due to the need to filter + * for only the observable edge types. If this information is required, it + * may be best to cache it on the vertex state object for a given + * {@link IGASProgram}. * - * @param p + * @param ctx * The {@link IGASContext}. * @param u * The vertex. * @param edges * Typesafe enumeration indicating which edges should be visited. - * + * * @return An iterator that will visit the edges for that vertex. */ - Iterator<Statement> getEdges(IGASContext<?, ?, ?> p, Value u, + long getEdgeCount(IGASContext<?, ?, ?> ctx, Value u, EdgesEnum edges); + + /** + * Return the edges for the given vertex. + * + * @param ctx + * The {@link IGASContext}. + * @param u + * The vertex. + * @param edges + * Typesafe enumeration indicating which edges should be visited. + * + * @return An iterator that will visit the edges for that vertex. + */ + Iterator<Statement> getEdges(IGASContext<?, ?, ?> ctx, Value u, EdgesEnum edges); /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -37,7 +37,7 @@ * The result from applying the procedure to a single index * partition. */ - public void visit(IGASState<VS, ES, ST> ctx, Value u); + public void visit(IGASState<VS, ES, ST> state, Value u); /** * Return the aggregated results as an implementation dependent object. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,16 +15,23 @@ */ package com.bigdata.rdf.graph.analytics; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.openrdf.model.Statement; import org.openrdf.model.Value; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.BaseGASProgram; import cutthecrap.utils.striterators.IStriterator; @@ -39,6 +46,8 @@ */ public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> { +// private static final Logger log = Logger.getLogger(BFS.class); + public static class VS { /** @@ -128,6 +137,13 @@ } @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.SingleVertex; + + } + + @Override public EdgesEnum getGatherEdges() { return EdgesEnum.NoEdges; @@ -158,7 +174,8 @@ * Not used. */ @Override - public void init(final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { + public void initVertex(final IGASContext<BFS.VS, BFS.ES, Void> ctx, + final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { state.getState(u).visit(0); @@ -169,15 +186,20 @@ */ @Override public Void gather(IGASState<BFS.VS, BFS.ES, Void> state, Value u, Statement e) { + throw new UnsupportedOperationException(); + } /** * Not used. */ @Override - public Void sum(Void left, Void right) { + public Void sum(final IGASState<BFS.VS, BFS.ES, Void> state, + final Void left, final Void right) { + throw new UnsupportedOperationException(); + } /** @@ -231,10 +253,124 @@ } @Override - public boolean nextRound(IGASContext<BFS.VS, BFS.ES, Void> ctx) { + public boolean nextRound(final IGASContext<BFS.VS, BFS.ES, Void> ctx) { return true; } + /** + * Reduce the active vertex stat, returning a histogram reporting the #of + * vertices at each distance from the starting vertex. There will always be + * one vertex at depth zero - this is the starting vertex. For each + * successive depth, the #of vertices that were labeled at that depth is + * reported. This is essentially the same as reporting the size of the + * frontier in each round of the traversal, but the histograph is reported + * based on the vertex state. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * TODO Do another reducer that reports the actual BFS tree rather + * than a histogram. For each depth, it needs to have the set of + * vertices that are at that number of hops from the starting + * vertex. So, there is an outer map from depth to set. The inner + * set should also be concurrent if we allow concurrent reduction of + * the activated vertex state. + */ + protected static class HistogramReducer implements + IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { + + private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>(); + + @Override + public void visit(final IGASState<VS, ES, Void> state, final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final Integer depth = Integer.valueOf(us.depth()); + + AtomicLong newval = values.get(depth); + + if (newval == null) { + + final AtomicLong oldval = values.putIfAbsent(depth, + newval = new AtomicLong()); + + if (oldval != null) { + + // lost data race. + newval = oldval; + + } + + } + + newval.incrementAndGet(); + + } + + } + + @Override + public Map<Integer, AtomicLong> get() { + + return Collections.unmodifiableMap(values); + + } + + } + + @Override + public void after(final IGASContext<BFS.VS, BFS.ES, Void> ctx) { + + final HistogramReducer r = new HistogramReducer(); + + ctx.getGASState().reduce(r); + + class NV implements Comparable<NV> { + public final int n; + public final long v; + public NV(final int n, final long v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + if (o.n > this.n) + return -1; + if (o.n < this.n) + return 1; + return 0; + } + } + + final Map<Integer, AtomicLong> h = r.get(); + + final NV[] a = new NV[h.size()]; + + int i = 0; + + for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) { + + a[i++] = new NV(e.getKey().intValue(), e.getValue().get()); + + } + + Arrays.sort(a); + + System.out.println("distance, frontierSize, sumFrontierSize"); + long sum = 0L; + for (NV t : a) { + + System.out.println(t.n + ", " + t.v + ", " + sum); + + sum += t.v; + + } + + } + } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,411 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.analytics; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.EdgesEnum; +import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.impl.BaseGASProgram; + +import cutthecrap.utils.striterators.IStriterator; + +/** + * Connected components computes the distinct sets of non-overlapping subgraphs + * within a graph. All vertices within a connected component are connected along + * at least one path. + * <p> + * The implementation works by assigning a label to each vertex. The label is + * initially the vertex identifier for that vertex. The labels in the graph are + * then relaxed with each vertex taking the minimum of its one-hop neighhor's + * labels. The algorithm halts when no vertex label has changed state in a given + * iteration. + * + * <dl> + * <dt>init</dt> + * <dd>All vertices are inserted into the initial frontier.</dd> + * <dt>Gather</dt> + * <dd>Report the source vertex label (not its identifier)</dd> + * <dt>Apply</dt> + * <dd>label = min(label,gatherLabel)</dd> + * <dt>Scatter</dt> + * <dd>iff the label has changed</dd> + * </dl> + * + * FIXME CC : Implement. Should push the updates through the scatter function. + * Find an abstraction to support this pattern. It is used by both CC and SSSP. + * (We can initially implement this as a Gather (over all edges) plus a + * conditional Scatter (over all edges iff the vertex label has changed). We can + * then refactor both this class and SSSP to push the updates through a Scatter + * (what I think of as a Gather to a remote vertex).) + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class CC extends BaseGASProgram<CC.VS, CC.ES, Value> { + + private static final Logger log = Logger.getLogger(CC.class); + + public static class VS { + + /** + * The label for the vertex. This value is initially the vertex + * identifier. It is relaxed by the computation until it is the minimum + * vertex identifier for the connected component. + */ + private final AtomicReference<Value> label; + + /** + * <code>true</code> iff the label was modified. + */ + private boolean changed = false; + + public VS(final Value v) { + + this.label = new AtomicReference<Value>(v); + + } + + /** + * The assigned label for this vertex. Once converged, all vertices in a + * given connected component will have the same label and the labels + * assigned to the vertices in each connected component will be + * distinct. The labels themselves are just the identifier of a vertex + * in that connected component. Conceptually, either the MIN or the MAX + * over the vertex identifiers in the connected component can be used by + * the algorithm since both will provide a unique labeling strategy. + */ + public Value getLabel() { + + return label.get(); + + } + + private void setLabel(final Value v) { + + label.set(v); + + } + + @Override + public String toString() { + return "{label=" + label + ",changed=" + changed + "}"; + } + + }// class VS + + /** + * Edge state is not used. + */ + public static class ES { + + } + + private static final Factory<Value, CC.VS> vertexStateFactory = new Factory<Value, CC.VS>() { + + @Override + public CC.VS initialValue(final Value value) { + + return new VS(value); + + } + + }; + + @Override + public Factory<Value, CC.VS> getVertexStateFactory() { + + return vertexStateFactory; + + } + + @Override + public Factory<Statement, CC.ES> getEdgeStateFactory() { + + return null; + + } + + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.AllVertices; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to not impose any filter on the sampled vertices (it does not + * matter whether they have any connected edges since we need to put all + * vertices into the initial frontier). + */ + @Override + public EdgesEnum getSampleEdgesFilter() { + + return EdgesEnum.NoEdges; + + } + + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.AllEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.AllEdges; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to only visit the edges of the graph. + */ + @Override + public IStriterator constrainFilter( + final IGASContext<CC.VS, CC.ES, Value> ctx, final IStriterator itr) { + + return itr.addFilter(getEdgeOnlyFilter(ctx)); + + } + + /** + * {@inheritDoc} + * <p> + * Return the label of the remote vertex. + */ + @Override + public Value gather(final IGASState<CC.VS, CC.ES, Value> state, + final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + final CC.VS vs = state.getState(v); + + return vs.getLabel(); + + } + + /** + * MIN + * <p> + * {@inheritDoc} + */ + @Override + public Value sum(final IGASState<CC.VS, CC.ES, Value> state, + final Value left, final Value right) { + + // MIN(left,right) + if (state.compareTo(left, right) < 0) { + + return left; + + } + + return right; + + } + + /** + * {@inheritDoc} + * <p> + * Compute the new value for this vertex, making a note of the last change + * for this vertex. + */ + @Override + public CC.VS apply(final IGASState<CC.VS, CC.ES, Value> state, + final Value u, final Value sum) { + + final CC.VS us = state.getState(u); + + if (sum == null) { + + /* + * Nothing visited by Gather. No change. Vertex will be dropped from + * the frontier. + */ + + us.changed = false; + + return null; + + } + + final Value oldval = us.getLabel(); + + // MIN(oldval,gatherSum) + if (state.compareTo(oldval, sum) <= 0) { + + us.changed = false; + + if (log.isDebugEnabled()) + log.debug(" NO CHANGE: " + u + ", val=" + oldval); + + } else { + + us.setLabel(sum); + + us.changed = true; + + if (log.isDebugEnabled()) + log.debug("DID CHANGE: " + u + ", old=" + oldval + ", new=" + + sum); + + } + + return us; + + } + + /** + * {@inheritDoc} + * <p> + * Returns <code>true</code> iff the label was changed in the current round. + */ + @Override + public boolean isChanged(final IGASState<VS, ES, Value> state, + final Value u) { + + final CC.VS us = state.getState(u); + + return us.changed; + + } + + /** + * The remote vertex is scheduled for activation unless it has already been + * visited. + * <p> + * Note: We are scattering to out-edges. Therefore, this vertex is + * {@link Statement#getSubject()}. The remote vertex is + * {@link Statement#getObject()}. + */ + @Override + public void scatter(final IGASState<CC.VS, CC.ES, Value> state, + final IGASScheduler sch, final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + sch.schedule(v); + + } + + /** + * Returns a map containing the labels assigned to each connected component + * (which gives you a vertex in that connected component) and the #of + * vertices in each connected component. + */ + public Map<Value, AtomicInteger> getConnectedComponents( + final IGASState<CC.VS, CC.ES, Value> state) { + + final ConcurrentHashMap<Value, AtomicInteger> labels = new ConcurrentHashMap<Value, AtomicInteger>(); + + return state + .reduce(new IReducer<CC.VS, CC.ES, Value, Map<Value, AtomicInteger>>() { + + @Override + public void visit(final IGASState<VS, ES, Value> state, + final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final Value label = us.getLabel(); + + if (log.isDebugEnabled()) + log.debug("v=" + u + ", label=" + label); + + final AtomicInteger oldval = labels.putIfAbsent( + label, new AtomicInteger(1)); + + if (oldval != null) { + + // lost race. increment existing counter. + oldval.incrementAndGet(); + + } + + } + + } + + @Override + public Map<Value, AtomicInteger> get() { + + return Collections.unmodifiableMap(labels); + + } + }); + + } + + @Override + public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) { + + final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx + .getGASState()); + + System.out.println("There are " + labels.size() + + " connected components"); + + class NV implements Comparable<NV> { + public final int n; + public final Value v; + public NV(int n, Value v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + return o.n - this.n; + } + } + + final NV[] a = new NV[labels.size()]; + int i = 0; + for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) { + a[i++] = new NV(e.getValue().intValue(), e.getKey()); + } + + Arrays.sort(a); + + System.out.println("size, label"); + for(NV t : a) { + System.out.println(t.n + ", " + t.v); + } + + } + +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,428 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.analytics; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.EdgesEnum; +import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.impl.BaseGASProgram; + +import cutthecrap.utils.striterators.IStriterator; + +/** + * Page rank assigns weights to the vertices in a graph based by on the relative + * "importance" as determined by the patterns of directed links in the graph. + * The algorithm is given stated in terms of a computation that is related until + * the delta in the computed values for the vertices is within <i>epsilon</i> of + * ZERO. However, in practice convergence is based on <i>epsilon</i> is + * problematic due to the manner in which the results of the floating point + * operations depend on the sequence of those operations (which is why this + * implementation uses <code>double</code> precision). Thus, page rank is + * typically executed a specific number of iterations, e.g., 50 or 100. If + * convergence is based on epsilon, then it is possible that the computation + * will never converge, especially for smaller values of epsilon. + * <dl> + * <dt>init</dt> + * <dd>All vertices are inserted into the initial frontier.</dd> + * <dt>Gather</dt> + * <dd>sum( neighbor_value / neighbor_num_out_edges ) over the in-edges of the + * graph.</dd> + * <dt>Apply</dt> + * <dd>value = <i>resetProb</i> + (1.0 \xD0 <i>resetProb</i>) * gatherSum</dd> + * <dt>Scatter</dt> + * <dd>if (a) value has significantly changed <code>(fabs(old-new) GT + * <i>epsilon</i>)</code>; or (b) iterations LT limit</dd> + * </dl> + * <ul> + * <li>where <i>resetProb</i> is a value that determines a random reset + * probability and defaults to {@value PR#DEFAULT_RESET_PROB}.</li> + * <li> + * where <i>epsilon</i> controls the degree of convergence before the algorithm + * terminates and defaults to {@value PR#DEFAULT_EPSILON}.</li> + * </ul> + * + * FIXME PR UNIT TEST. Verify computed values (within variance) and max + * iterations. Ground truth from GL? + * + * FIXME PR: The out-edges can be taken directly from the vertex distribution. + * That will reduce the initialization overhead for PR. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class PR extends BaseGASProgram<PR.VS, PR.ES, Double> { + + private static final Logger log = Logger.getLogger(PR.class); + + // TOOD javadoc and config. + protected static final int DEFAULT_LIMIT = 100; + protected static final double DEFAULT_RESET_PROB = 0.15d; + protected static final double DEFAULT_EPSILON = 0.01d; + protected static final double DEFAULT_MIN_PAGE_RANK = 1d; + + private final double resetProb = DEFAULT_RESET_PROB; + private final double epsilon = DEFAULT_EPSILON; + private final int limit = DEFAULT_LIMIT; + private final double minPageRank = DEFAULT_MIN_PAGE_RANK; + + public static class VS { + + /** + * The current computed value for this vertex. + * <p> + * All vertices are initialized to the reset probability. They are then + * updated in each iteration to the new estimated value by apply(). + */ + private double value; + + /** + * The number of out-edges. This is computed once, when the vertex state + * is initialized. + */ + private long outEdges; + + /** + * The last delta observed for this vertex. + */ + private double lastChange = 0d; + + /** + * The current computed value for this vertex. + * <p> + * All vertices are initialized to the reset probability. They are then + * updated in each iteration to the new estimated value by apply(). + */ + public double getValue() { + + return value; + + } + + @Override + public String toString() { + return "{value=" + value + ",lastChange=" + lastChange + "}"; + } + + }// class VS + + /** + * Edge state is not used. + */ + public static class ES { + + } + + private static final Factory<Value, PR.VS> vertexStateFactory = new Factory<Value, PR.VS>() { + + @Override + public PR.VS initialValue(final Value value) { + + return new VS(); + + } + + }; + + @Override + public Factory<Value, PR.VS> getVertexStateFactory() { + + return vertexStateFactory; + + } + + @Override + public Factory<Statement, PR.ES> getEdgeStateFactory() { + + return null; + + } + + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.AllVertices; + + } + + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.InEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.OutEdges; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to only visit the edges of the graph. + */ + @Override + public IStriterator constrainFilter( + final IGASContext<PR.VS, PR.ES, Double> ctx, final IStriterator itr) { + + return itr.addFilter(getEdgeOnlyFilter(ctx)); + + } + + /** + * {@inheritDoc} + * <p> + * Each vertex is initialized to the reset probability. + * + * FIXME We need to do this efficiently. E.g., using a scan to find all of + * the vertices together with their in-degree or out-degree. That should be + * done to populate the frontier, initializing the #of out-edges at the same + * time. + */ + @Override + public void initVertex(final IGASContext<PR.VS, PR.ES, Double> ctx, + final IGASState<PR.VS, PR.ES, Double> state, final Value u) { + + final PR.VS us = state.getState(u); + + us.value = resetProb; + + us.outEdges = ctx.getGraphAccessor().getEdgeCount(ctx, u, + EdgesEnum.OutEdges); + } + + /** + * {@inheritDoc} + * <p> + */ + @Override + public Double gather(final IGASState<PR.VS, PR.ES, Double> state, + final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + final PR.VS vs = state.getState(v); + + /* + * Note: Division by zero should not be possible here since the edge + * that we used to discover [v] is an out-edge of [v]. + */ + + return (vs.value / vs.outEdges); + + } + + /** + * SUM + * <p> + * {@inheritDoc} + */ + @Override + public Double sum(final IGASState<PR.VS, PR.ES, Double> state, + final Double left, final Double right) { + + return left + right; + + } + + /** + * {@inheritDoc} + * <p> + * Compute the new value for this vertex, making a note of the last change + * for this vertex. + */ + @Override + public PR.VS apply(final IGASState<PR.VS, PR.ES, Double> state, + final Value u, final Double sum) { + + final PR.VS us = state.getState(u); + + if (sum == null) { + + /* + * No in-edges visited by Gather. No change. Vertex will be dropped + * from the frontier. + */ + + us.lastChange = 0d; + + return null; + + } + + final double newval = resetProb + (1.0 - resetProb) * sum; + + us.lastChange = (newval - us.value); + + us.value = newval; + + return us; + + } + + /** + * {@inheritDoc} + * <p> + * Returns <code>true</code> iff the last change was greater then epsilon. + */ + @Override + public boolean isChanged(final IGASState<VS, ES, Double> state, + final Value u) { + + final PR.VS us = state.getState(u); + + return us.lastChange > epsilon; + + } + + /** + * The remote vertex is scheduled for activation unless it has already been + * visited. + * <p> + * Note: We are scattering to out-edges. Therefore, this vertex is + * {@link Statement#getSubject()}. The remote vertex is + * {@link Statement#getObject()}. + */ + @Override + public void scatter(final IGASState<PR.VS, PR.ES, Double> state, + final IGASScheduler sch, final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + sch.schedule(v); + + } + + /** + * {@inheritDoc} + * <p> + * Continue unless the iteration limit has been reached. + */ + @Override + public boolean nextRound(final IGASContext<PR.VS, PR.ES, Double> ctx) { + + return ctx.getGASState().round() < limit; + + } + + @Override + public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) { + + final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>(); + + ctx.getGASState().reduce( + new IReducer<PR.VS, PR.ES, Double, Map<Value, Double>>() { + + @Override + public void visit(final IGASState<VS, ES, Double> state, + final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final double pageRank = us.getValue(); + + // FIXME Why are NaNs showing up? + if (Double.isNaN(pageRank)) + return; + + // FIXME Do infinite values show up? + if (Double.isInfinite(pageRank)) + return; + + if (pageRank < minPageRank) { + // Ignore small values. + return; + } + + /* + * Only report the larger ranked values. + */ + + if (log.isDebugEnabled()) + log.debug("v=" + u + ", pageRank=" + pageRank); + + values.put(u, Double.valueOf(pageRank)); + + } + + } + + @Override + public Map<Value, Double> get() { + + return Collections.unmodifiableMap(values); + + } + }); + + class NV implements Comparable<NV> { + public final double n; + public final Value v; + public NV(double n, Value v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + if (o.n > this.n) + return 1; + if (o.n < this.n) + return -1; + return 0; + } + } + + final NV[] a = new NV[values.size()]; + + int i = 0; + + for (Map.Entry<Value, Double> e : values.entrySet()) { + + a[i++] = new NV(e.getValue().doubleValue(), e.getKey()); + + } + + Arrays.sort(a); + + System.out.println("rank, pageRank, vertex"); + i = 0; + for (NV t : a) { + + System.out.println(i + ", " + t.n + ", " + t.v); + + i++; + + } + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -20,6 +20,7 @@ import org.openrdf.model.Value; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; @@ -29,22 +30,24 @@ /** * SSSP (Single Source, Shortest Path). This analytic computes the shortest path - * to each vertex in the graph starting from the given vertex. Only connected - * vertices are visited by this implementation (the frontier never leaves the - * connected component in which the starting vertex is located). + * to each connected vertex in the graph starting from the given vertex. Only + * connected vertices are visited by this implementation (the frontier never + * leaves the connected component in which the starting vertex is located). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * - * TODO There is no reason to do a gather on the first round. Add - * isGather()? (parallel to isChanged() for scatter?) - * - * TODO Add reducer pattern for finding the maximum degree vertex. - * * TODO Add parameter for directed versus undirected SSSP. When * undirected, the gather and scatter are for AllEdges. Otherwise, * gather on in-edges and scatter on out-edges. Also, we need to use a * getOtherVertex(e) method to figure out the other edge when using * undirected scatter/gather. Add unit test for undirected. + * + * FIXME New SSSP (push style scatter abstraction with new test case + * based on graph example developed for this) + * + * TODO Add a reducer to report the actual minimum length paths. This is + * similar to a BFS tree, but the path lengths are not integer values so + * we need a different data structure to collect them. */ public class SSSP extends BaseGASProgram<SSSP.VS, SSSP.ES, Integer/* dist */> { @@ -76,8 +79,12 @@ * The minimum observed distance (in hops) from the source to this * vertex and initially {@link Integer#MAX_VALUE}. When this value is * modified, the {@link #changed} flag is set as a side-effect. + * + * FIXME This really needs to be a floating point value, probably + * double. We also need tests with non-integer weights and non- positive + * weights. */ - private int dist = Integer.MAX_VALUE; + private Integer dist = Integer.MAX_VALUE; private boolean changed = false; @@ -147,6 +154,13 @@ } + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.SingleVertex; + + } + // @Override // public Factory<ISPO, SSSP.ES> getEdgeStateFactory() { // @@ -188,8 +202,8 @@ * {@inheritDoc} */ @Override - public void init(final IGASState<SSSP.VS, SSSP.ES, Integer> state, - final Value u) { + public void initVertex(final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx, + final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u) { final VS us = state.getState(u); @@ -235,7 +249,8 @@ * MIN */ @Override - public Integer sum(final Integer left, final Integer right) { + public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state, + final Integer left, final Integer right) { return Math.min(left, right); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,15 +15,22 @@ */ package com.bigdata.rdf.graph.impl; +import java.util.Arrays; +import java.util.Random; + +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.impl.util.VertexDistribution; import cutthecrap.utils.striterators.Filter; import cutthecrap.utils.striterators.IFilter; @@ -40,6 +47,8 @@ abstract public class BaseGASProgram<VS, ES, ST> implements IGASProgram<VS, ES, ST> { + private static final Logger log = Logger.getLogger(BaseGASProgram.class); + /** * {@inheritDoc} * <p> @@ -151,6 +160,30 @@ /** * {@inheritDoc} * <p> + * The default implementation returns {@link #getGatherEdges()} and the + * {@link #getScatterEdges()} if {@link #getGatherEdges()} returns + * {@value EdgesEnum#NoEdges}. + */ + @Override + public EdgesEnum getSampleEdgesFilter() { + + // Assume that a GATHER will be done for each starting vertex. + EdgesEnum edges = getGatherEdges(); + + if (edges == EdgesEnum.NoEdges) { + + // If no GATHER is performed, then use the SCATTER edges. + edges = getScatterEdges(); + + } + + return edges; + + } + + /** + * {@inheritDoc} + * <p> * The default gathers on the {@link EdgesEnum#InEdges}. */ @Override @@ -175,10 +208,72 @@ /** * {@inheritDoc} * <p> + * The default implementation populates the frontier IFF this is an + * {@link FrontierEnum#AllVertices} {@link IGASProgram}. + */ + @Override + public void before(final IGASContext<VS, ES, ST> ctx) { + + switch (getInitialFrontierEnum()) { + case AllVertices: { + addAllVerticesToFrontier(ctx); + break; + } + } + + } + + /** + * {@inheritDoc} + * <p> + * The default implementation is a NOP. + */ + @Override + public void after(final IGASContext<VS, ES, ST> ctx) { + + // NOP + + } + + /** + * Populate the initial frontier using all vertices in the graph. + * + * @param ctx + * The graph evaluation context. + * + * TODO This has a random number generator whose initial seed is + * not controlled by the caller. However, the desired use case + * here is to produce a distribution over ALL vertices so the + * random number should be ignored - perhaps we should pass it in + * as <code>null</code>? + */ + private void addAllVerticesToFrontier(final IGASContext<VS, ES, ST> ctx) { + + final IGASState<VS, ES, ST> gasState = ctx.getGASState(); + + final EdgesEnum sampleEdges = getSampleEdgesFilter(); + + final VertexDistribution dist = ctx.getGraphAccessor().getDistribution( + new Random()); + + final Resource[] initialFrontier = dist.getUnweightedSample( + dist.size(), sampleEdges); + + if (log.isDebugEnabled()) + log.debug("initialFrontier=" + Arrays.toString(initialFrontier)); + + gasState.setFrontier(ctx, initialFrontier); + + } + + /** + * {@inheritDoc} + * <p> * The default is a NOP. */ @Override - public void init(final IGASState<VS, ES, ST> state, final Value u) { + public void initVertex(final IGASContext<VS, ES, ST> ctx, + final IGASState<VS, ES, ST> state, final Value u) { // NOP Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -104,10 +104,17 @@ } @Override + public IGraphAccessor getGraphAccessor() { + return graphAccessor; + } + + @Override public IGASStats call() throws Exception { final GASStats total = new GASStats(); + program.before(this); + while (!gasState.frontier().isEmpty()) { final GASStats roundStats = new GASStats(); @@ -123,6 +130,8 @@ gasState.traceState(); + program.after(this); + // Done return total; @@ -160,7 +169,7 @@ * SCATTER depending on some characteristics of the algorithm. Is this * worth while? * - * TODO The ability to pushd down the APPLY for AllEdges for the GATHER + * Note: The ability to push down the APPLY for AllEdges for the GATHER * depends on our using the union of the in-edges and out-edges * iterators to visit those edges. That union means that we do not have * to preserve the accumulant across the in-edges and out-edges aspects @@ -177,23 +186,18 @@ final boolean pushDownApplyInScatter; final boolean runApplyStage; - if (scatterEdges == EdgesEnum.NoEdges) { + if (gatherEdges != EdgesEnum.NoEdges) { // Do APPLY() in GATHER. pushDownApplyInGather = true; pushDownApplyInScatter = false; runApplyStage = false; - } else if (gatherEdges == EdgesEnum.NoEdges) { + } else if (scatterEdges != EdgesEnum.NoEdges) { // APPLY() in SCATTER. pushDownApplyInGather = false; pushDownApplyInScatter = true; runApplyStage = false; } else { - /* - * Do not push down the APPLY. - * - * TODO We could still push down the apply into the GATHER if we are - * doing both stages. - */ + // Do not push down the APPLY. pushDownApplyInGather = false; pushDownApplyInScatter = false; runApplyStage = true; @@ -620,7 +624,7 @@ } else { - left = program.sum(left, right); + left = program.sum(gasState, left, right); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,6 +15,7 @@ */ package com.bigdata.rdf.graph.impl; +import java.util.Comparator; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,8 +27,10 @@ import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASSchedulerImpl; @@ -108,8 +111,13 @@ * Provides access to the backing graph. Used to decode vertices and edges * for {@link #traceState()}. */ - private IGraphAccessor graphAccessor; + private final IGraphAccessor graphAccessor; + /** + * Used to establish a total ordering over RDF {@link Value}s. + */ + private final Comparator<Value> valueComparator; + public GASState(final IGASEngine gasEngine,// final IGraphAccessor graphAccessor, // final IStaticFrontier frontier,// @@ -146,6 +154,13 @@ this.scheduler = gasScheduler; + /* + * TODO This is the SPARQL value ordering. It might be not be total or + * stable. If not, we can use an ordering over the string values of the + * RDF Values, but that will push the heap. + */ + this.valueComparator = new ValueComparator(); + } /** @@ -242,7 +257,7 @@ } @Override - public void init(final Value... vertices) { + public void setFrontier(final IGASContext<VS, ES, ST> ctx, final Value... vertices) { if (vertices == null) throw new IllegalArgumentException(); @@ -263,7 +278,7 @@ */ for (Value v : tmp) { - gasProgram.init(this, v); + gasProgram.initVertex(ctx, this, v); } @@ -272,7 +287,7 @@ && sortFrontier, tmp.iterator()); } - + @Override public void traceState() { @@ -392,4 +407,15 @@ return null; } + @Override + public int compareTo(final Value u, final Valu... [truncated message content] |