|
From: <tho...@us...> - 2013-10-19 20:02:02
|
Revision: 7461
http://bigdata.svn.sourceforge.net/bigdata/?rev=7461&view=rev
Author: thompsonbry
Date: 2013-10-19 20:01:51 +0000 (Sat, 19 Oct 2013)
Log Message:
-----------
Added CC and PR implementations and a reducer to report the histogram over the #of vertices at each depth for BFS. See #629 (Graph Processing API)
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/VertexDistribution.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/AbstractGraphTestCase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestGather.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestSSSP.java
Added Paths:
-----------
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestCC.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestPR.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.png
branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.ttl
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -0,0 +1,36 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph;
+
+/**
+ * Type-safe enumeration characterizing the assumptions of an algorithm
+ * concerning its initial frontier.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public enum FrontierEnum {
+
+ /**
+ * The initial frontier is a single vertex.
+ */
+ SingleVertex,
+
+ /**
+ * The initial frontier is all vertices.
+ */
+ AllVertices;
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -50,6 +50,11 @@
IGASState<VS, ES, ST> getGASState();
/**
+ * The graph access object.
+ */
+ IGraphAccessor getGraphAccessor();
+
+ /**
* Execute one iteration.
*
* @param stats
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -39,8 +39,28 @@
public interface IGASOptions<VS, ES, ST> {
/**
+ * Return the nature of the initial frontier for this algorithm.
+ */
+ FrontierEnum getInitialFrontierEnum();
+
+ /**
+ * Return the type of edges that must exist when sampling the vertices of
+ * the graph. If {@link EdgesEnum#InEdges} is specified, then each sampled
+ * vertex will have at least one in-edge. If {@link EdgesEnum#OutEdges} is
+ * specified, then each sampled vertex will have at least one out-edge. To
+ * sample all vertices regardless of their edges, specify
+ * {@value EdgesEnum#NoEdges}. To require that each vertex has at least one
+ * in-edge and one out-edge, specify {@link EdgesEnum#AllEdges}.
+ */
+ EdgesEnum getSampleEdgesFilter();
+
+ /**
* Return the set of edges to which the GATHER is applied -or-
* {@link EdgesEnum#NoEdges} to skip the GATHER phase.
+ *
+ * TODO We may need to set dynamically when visting the vertex in the
+ * frontier rather than having it be a one-time property of the vertex
+ * program.
*/
EdgesEnum getGatherEdges();
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -32,21 +32,46 @@
* the generic type for the per-edge state, but that is not always
* true. The SUM type is scoped to the GATHER + SUM operation (NOT
* the computation).
- *
+ *
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ *
+ * TODO DESIGN: The broad problem with this approach is that it is
+ * overly coupled with the Java object model. Instead it needs to expose
+ * an API that is aimed at vectored (for GPU) execution with 2D
+ * partitioning (for out-of-core, multi-node).
*/
public interface IGASProgram<VS, ES, ST> extends IGASOptions<VS, ES, ST> {
/**
+ * One time initialization before the {@link IGASProgram} is executed.
+ *
+ * @param ctx
+ * The evaluation context.
+ */
+ void before(IGASContext<VS, ES, ST> ctx);
+
+ /**
+ * One time initialization after the {@link IGASProgram} is executed.
+ *
+ * @param ctx
+ * The evaluation context.
+ */
+ void after(IGASContext<VS, ES, ST> ctx);
+
+ /**
* Callback to initialize the state for each vertex in the initial frontier
* before the first iteration. A typical use case is to set the distance of
* the starting vertex to ZERO (0).
*
* @param u
* The vertex.
+ *
+ * TODO We do not need both the {@link IGASContext} and the
+ * {@link IGASState}. The latter is available from the former.
*/
- void init(IGASState<VS, ES, ST> state, Value u);
-
+ void initVertex(IGASContext<VS, ES, ST> ctx, IGASState<VS, ES, ST> state,
+ Value u);
+
/**
* GATHER is a map/reduce over the edges of the vertex. The SUM provides
* pair-wise reduction over the edges visited by the GATHER.
@@ -94,8 +119,14 @@
* TODO DESIGN: Rather than pair-wise reduction, why not use
* vectored reduction? That way we could use an array of primitives
* as well as objects.
+ *
+ * TODO DESIGN: This should be a reduced interface since we only
+ * need access to the comparator semantics while the [state]
+ * provides random access to vertex and edge state. The comparator
+ * is necessary for MIN semantics for the {@link Value}
+ * implementation of the backend. E.g., Value versus IV.
*/
- ST sum(ST left, ST right);
+ ST sum(final IGASState<VS, ES, ST> state, ST left, ST right);
/**
* Apply the reduced aggregation computed by GATHER + SUM to the vertex.
@@ -155,7 +186,7 @@
* Return <code>true</code> iff the algorithm should continue. This is
* invoked after every iteration, once the new frontier has been computed
* and {@link IGASState#round()} has been advanced. An implementation may
- * simple return <code>true</code>, in which case the algorithm will
+ * simply return <code>true</code>, in which case the algorithm will
* continue IFF the current frontier is not empty.
* <p>
* Note: While this can be used to make custom decisions concerning the
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -51,6 +51,8 @@
/**
* {@link #reset()} the computation state and populate the initial frontier.
*
+ * @param ctx
+ * The execution context.
* @param v
* One or more vertices that will be included in the initial
* frontier.
@@ -58,7 +60,7 @@
* @throws IllegalArgumentException
* if no vertices are specified.
*/
- void init(Value... v);
+ void setFrontier(IGASContext<VS, ES, ST> ctx, Value... v);
/**
* Discard computation state (the frontier, vertex state, and edge state)
@@ -227,5 +229,19 @@
* TODO RDR : Link to an RDR wiki page as well.
*/
Statement decodeStatement(Value v);
+
+ /**
+ * Return -1, o, or 1 if <code>u</code> is LT, EQ, or GT <code>v</code>. A
+ * number of GAS programs depend on the ability to place an order over the
+ * vertex identifiers, as does 2D partitioning. The ordering provided by
+ * this method MAY be arbitrary, but it MUST be total and stable across the
+ * life-cycle of the GAS program evaluation.
+ *
+ * @param u
+ * A vertex.
+ * @param v
+ * Another vertex.
+ */
+ int compareTo(Value u, Value v);
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -31,18 +31,37 @@
public interface IGraphAccessor {
/**
- * Return the edges for the vertex.
+ * Return the #of edges of the specified type for the given vertex.
+ * <p>
+ * Note: This is not always a flyweight operation due to the need to filter
+ * for only the observable edge types. If this information is required, it
+ * may be best to cache it on the vertex state object for a given
+ * {@link IGASProgram}.
*
- * @param p
+ * @param ctx
* The {@link IGASContext}.
* @param u
* The vertex.
* @param edges
* Typesafe enumeration indicating which edges should be visited.
- *
+ *
* @return An iterator that will visit the edges for that vertex.
*/
- Iterator<Statement> getEdges(IGASContext<?, ?, ?> p, Value u,
+ long getEdgeCount(IGASContext<?, ?, ?> ctx, Value u, EdgesEnum edges);
+
+ /**
+ * Return the edges for the given vertex.
+ *
+ * @param ctx
+ * The {@link IGASContext}.
+ * @param u
+ * The vertex.
+ * @param edges
+ * Typesafe enumeration indicating which edges should be visited.
+ *
+ * @return An iterator that will visit the edges for that vertex.
+ */
+ Iterator<Statement> getEdges(IGASContext<?, ?, ?> ctx, Value u,
EdgesEnum edges);
/**
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -37,7 +37,7 @@
* The result from applying the procedure to a single index
* partition.
*/
- public void visit(IGASState<VS, ES, ST> ctx, Value u);
+ public void visit(IGASState<VS, ES, ST> state, Value u);
/**
* Return the aggregated results as an implementation dependent object.
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -15,16 +15,23 @@
*/
package com.bigdata.rdf.graph.analytics;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.FrontierEnum;
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASScheduler;
import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
import cutthecrap.utils.striterators.IStriterator;
@@ -39,6 +46,8 @@
*/
public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> {
+// private static final Logger log = Logger.getLogger(BFS.class);
+
public static class VS {
/**
@@ -128,6 +137,13 @@
}
@Override
+ public FrontierEnum getInitialFrontierEnum() {
+
+ return FrontierEnum.SingleVertex;
+
+ }
+
+ @Override
public EdgesEnum getGatherEdges() {
return EdgesEnum.NoEdges;
@@ -158,7 +174,8 @@
* Not used.
*/
@Override
- public void init(final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) {
+ public void initVertex(final IGASContext<BFS.VS, BFS.ES, Void> ctx,
+ final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) {
state.getState(u).visit(0);
@@ -169,15 +186,20 @@
*/
@Override
public Void gather(IGASState<BFS.VS, BFS.ES, Void> state, Value u, Statement e) {
+
throw new UnsupportedOperationException();
+
}
/**
* Not used.
*/
@Override
- public Void sum(Void left, Void right) {
+ public Void sum(final IGASState<BFS.VS, BFS.ES, Void> state,
+ final Void left, final Void right) {
+
throw new UnsupportedOperationException();
+
}
/**
@@ -231,10 +253,124 @@
}
@Override
- public boolean nextRound(IGASContext<BFS.VS, BFS.ES, Void> ctx) {
+ public boolean nextRound(final IGASContext<BFS.VS, BFS.ES, Void> ctx) {
return true;
}
+ /**
+ * Reduce the active vertex stat, returning a histogram reporting the #of
+ * vertices at each distance from the starting vertex. There will always be
+ * one vertex at depth zero - this is the starting vertex. For each
+ * successive depth, the #of vertices that were labeled at that depth is
+ * reported. This is essentially the same as reporting the size of the
+ * frontier in each round of the traversal, but the histograph is reported
+ * based on the vertex state.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ *
+ * TODO Do another reducer that reports the actual BFS tree rather
+ * than a histogram. For each depth, it needs to have the set of
+ * vertices that are at that number of hops from the starting
+ * vertex. So, there is an outer map from depth to set. The inner
+ * set should also be concurrent if we allow concurrent reduction of
+ * the activated vertex state.
+ */
+ protected static class HistogramReducer implements
+ IReducer<VS, ES, Void, Map<Integer, AtomicLong>> {
+
+ private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>();
+
+ @Override
+ public void visit(final IGASState<VS, ES, Void> state, final Value u) {
+
+ final VS us = state.getState(u);
+
+ if (us != null) {
+
+ final Integer depth = Integer.valueOf(us.depth());
+
+ AtomicLong newval = values.get(depth);
+
+ if (newval == null) {
+
+ final AtomicLong oldval = values.putIfAbsent(depth,
+ newval = new AtomicLong());
+
+ if (oldval != null) {
+
+ // lost data race.
+ newval = oldval;
+
+ }
+
+ }
+
+ newval.incrementAndGet();
+
+ }
+
+ }
+
+ @Override
+ public Map<Integer, AtomicLong> get() {
+
+ return Collections.unmodifiableMap(values);
+
+ }
+
+ }
+
+ @Override
+ public void after(final IGASContext<BFS.VS, BFS.ES, Void> ctx) {
+
+ final HistogramReducer r = new HistogramReducer();
+
+ ctx.getGASState().reduce(r);
+
+ class NV implements Comparable<NV> {
+ public final int n;
+ public final long v;
+ public NV(final int n, final long v) {
+ this.n = n;
+ this.v = v;
+ }
+ @Override
+ public int compareTo(final NV o) {
+ if (o.n > this.n)
+ return -1;
+ if (o.n < this.n)
+ return 1;
+ return 0;
+ }
+ }
+
+ final Map<Integer, AtomicLong> h = r.get();
+
+ final NV[] a = new NV[h.size()];
+
+ int i = 0;
+
+ for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) {
+
+ a[i++] = new NV(e.getKey().intValue(), e.getValue().get());
+
+ }
+
+ Arrays.sort(a);
+
+ System.out.println("distance, frontierSize, sumFrontierSize");
+ long sum = 0L;
+ for (NV t : a) {
+
+ System.out.println(t.n + ", " + t.v + ", " + sum);
+
+ sum += t.v;
+
+ }
+
+ }
+
}
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -0,0 +1,411 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph.analytics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+
+import com.bigdata.rdf.graph.EdgesEnum;
+import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.FrontierEnum;
+import com.bigdata.rdf.graph.IGASContext;
+import com.bigdata.rdf.graph.IGASScheduler;
+import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.IReducer;
+import com.bigdata.rdf.graph.impl.BaseGASProgram;
+
+import cutthecrap.utils.striterators.IStriterator;
+
+/**
+ * Connected components computes the distinct sets of non-overlapping subgraphs
+ * within a graph. All vertices within a connected component are connected along
+ * at least one path.
+ * <p>
+ * The implementation works by assigning a label to each vertex. The label is
+ * initially the vertex identifier for that vertex. The labels in the graph are
+ * then relaxed with each vertex taking the minimum of its one-hop neighhor's
+ * labels. The algorithm halts when no vertex label has changed state in a given
+ * iteration.
+ *
+ * <dl>
+ * <dt>init</dt>
+ * <dd>All vertices are inserted into the initial frontier.</dd>
+ * <dt>Gather</dt>
+ * <dd>Report the source vertex label (not its identifier)</dd>
+ * <dt>Apply</dt>
+ * <dd>label = min(label,gatherLabel)</dd>
+ * <dt>Scatter</dt>
+ * <dd>iff the label has changed</dd>
+ * </dl>
+ *
+ * FIXME CC : Implement. Should push the updates through the scatter function.
+ * Find an abstraction to support this pattern. It is used by both CC and SSSP.
+ * (We can initially implement this as a Gather (over all edges) plus a
+ * conditional Scatter (over all edges iff the vertex label has changed). We can
+ * then refactor both this class and SSSP to push the updates through a Scatter
+ * (what I think of as a Gather to a remote vertex).)
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class CC extends BaseGASProgram<CC.VS, CC.ES, Value> {
+
+ private static final Logger log = Logger.getLogger(CC.class);
+
+ public static class VS {
+
+ /**
+ * The label for the vertex. This value is initially the vertex
+ * identifier. It is relaxed by the computation until it is the minimum
+ * vertex identifier for the connected component.
+ */
+ private final AtomicReference<Value> label;
+
+ /**
+ * <code>true</code> iff the label was modified.
+ */
+ private boolean changed = false;
+
+ public VS(final Value v) {
+
+ this.label = new AtomicReference<Value>(v);
+
+ }
+
+ /**
+ * The assigned label for this vertex. Once converged, all vertices in a
+ * given connected component will have the same label and the labels
+ * assigned to the vertices in each connected component will be
+ * distinct. The labels themselves are just the identifier of a vertex
+ * in that connected component. Conceptually, either the MIN or the MAX
+ * over the vertex identifiers in the connected component can be used by
+ * the algorithm since both will provide a unique labeling strategy.
+ */
+ public Value getLabel() {
+
+ return label.get();
+
+ }
+
+ private void setLabel(final Value v) {
+
+ label.set(v);
+
+ }
+
+ @Override
+ public String toString() {
+ return "{label=" + label + ",changed=" + changed + "}";
+ }
+
+ }// class VS
+
+ /**
+ * Edge state is not used.
+ */
+ public static class ES {
+
+ }
+
+ private static final Factory<Value, CC.VS> vertexStateFactory = new Factory<Value, CC.VS>() {
+
+ @Override
+ public CC.VS initialValue(final Value value) {
+
+ return new VS(value);
+
+ }
+
+ };
+
+ @Override
+ public Factory<Value, CC.VS> getVertexStateFactory() {
+
+ return vertexStateFactory;
+
+ }
+
+ @Override
+ public Factory<Statement, CC.ES> getEdgeStateFactory() {
+
+ return null;
+
+ }
+
+ @Override
+ public FrontierEnum getInitialFrontierEnum() {
+
+ return FrontierEnum.AllVertices;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Overridden to not impose any filter on the sampled vertices (it does not
+ * matter whether they have any connected edges since we need to put all
+ * vertices into the initial frontier).
+ */
+ @Override
+ public EdgesEnum getSampleEdgesFilter() {
+
+ return EdgesEnum.NoEdges;
+
+ }
+
+ @Override
+ public EdgesEnum getGatherEdges() {
+
+ return EdgesEnum.AllEdges;
+
+ }
+
+ @Override
+ public EdgesEnum getScatterEdges() {
+
+ return EdgesEnum.AllEdges;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Overridden to only visit the edges of the graph.
+ */
+ @Override
+ public IStriterator constrainFilter(
+ final IGASContext<CC.VS, CC.ES, Value> ctx, final IStriterator itr) {
+
+ return itr.addFilter(getEdgeOnlyFilter(ctx));
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Return the label of the remote vertex.
+ */
+ @Override
+ public Value gather(final IGASState<CC.VS, CC.ES, Value> state,
+ final Value u, final Statement e) {
+
+ final Value v = state.getOtherVertex(u, e);
+
+ final CC.VS vs = state.getState(v);
+
+ return vs.getLabel();
+
+ }
+
+ /**
+ * MIN
+ * <p>
+ * {@inheritDoc}
+ */
+ @Override
+ public Value sum(final IGASState<CC.VS, CC.ES, Value> state,
+ final Value left, final Value right) {
+
+ // MIN(left,right)
+ if (state.compareTo(left, right) < 0) {
+
+ return left;
+
+ }
+
+ return right;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Compute the new value for this vertex, making a note of the last change
+ * for this vertex.
+ */
+ @Override
+ public CC.VS apply(final IGASState<CC.VS, CC.ES, Value> state,
+ final Value u, final Value sum) {
+
+ final CC.VS us = state.getState(u);
+
+ if (sum == null) {
+
+ /*
+ * Nothing visited by Gather. No change. Vertex will be dropped from
+ * the frontier.
+ */
+
+ us.changed = false;
+
+ return null;
+
+ }
+
+ final Value oldval = us.getLabel();
+
+ // MIN(oldval,gatherSum)
+ if (state.compareTo(oldval, sum) <= 0) {
+
+ us.changed = false;
+
+ if (log.isDebugEnabled())
+ log.debug(" NO CHANGE: " + u + ", val=" + oldval);
+
+ } else {
+
+ us.setLabel(sum);
+
+ us.changed = true;
+
+ if (log.isDebugEnabled())
+ log.debug("DID CHANGE: " + u + ", old=" + oldval + ", new="
+ + sum);
+
+ }
+
+ return us;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Returns <code>true</code> iff the label was changed in the current round.
+ */
+ @Override
+ public boolean isChanged(final IGASState<VS, ES, Value> state,
+ final Value u) {
+
+ final CC.VS us = state.getState(u);
+
+ return us.changed;
+
+ }
+
+ /**
+ * The remote vertex is scheduled for activation unless it has already been
+ * visited.
+ * <p>
+ * Note: We are scattering to out-edges. Therefore, this vertex is
+ * {@link Statement#getSubject()}. The remote vertex is
+ * {@link Statement#getObject()}.
+ */
+ @Override
+ public void scatter(final IGASState<CC.VS, CC.ES, Value> state,
+ final IGASScheduler sch, final Value u, final Statement e) {
+
+ final Value v = state.getOtherVertex(u, e);
+
+ sch.schedule(v);
+
+ }
+
+ /**
+ * Returns a map containing the labels assigned to each connected component
+ * (which gives you a vertex in that connected component) and the #of
+ * vertices in each connected component.
+ */
+ public Map<Value, AtomicInteger> getConnectedComponents(
+ final IGASState<CC.VS, CC.ES, Value> state) {
+
+ final ConcurrentHashMap<Value, AtomicInteger> labels = new ConcurrentHashMap<Value, AtomicInteger>();
+
+ return state
+ .reduce(new IReducer<CC.VS, CC.ES, Value, Map<Value, AtomicInteger>>() {
+
+ @Override
+ public void visit(final IGASState<VS, ES, Value> state,
+ final Value u) {
+
+ final VS us = state.getState(u);
+
+ if (us != null) {
+
+ final Value label = us.getLabel();
+
+ if (log.isDebugEnabled())
+ log.debug("v=" + u + ", label=" + label);
+
+ final AtomicInteger oldval = labels.putIfAbsent(
+ label, new AtomicInteger(1));
+
+ if (oldval != null) {
+
+ // lost race. increment existing counter.
+ oldval.incrementAndGet();
+
+ }
+
+ }
+
+ }
+
+ @Override
+ public Map<Value, AtomicInteger> get() {
+
+ return Collections.unmodifiableMap(labels);
+
+ }
+ });
+
+ }
+
+ @Override
+ public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) {
+
+ final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx
+ .getGASState());
+
+ System.out.println("There are " + labels.size()
+ + " connected components");
+
+ class NV implements Comparable<NV> {
+ public final int n;
+ public final Value v;
+ public NV(int n, Value v) {
+ this.n = n;
+ this.v = v;
+ }
+ @Override
+ public int compareTo(final NV o) {
+ return o.n - this.n;
+ }
+ }
+
+ final NV[] a = new NV[labels.size()];
+ int i = 0;
+ for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) {
+ a[i++] = new NV(e.getValue().intValue(), e.getKey());
+ }
+
+ Arrays.sort(a);
+
+ System.out.println("size, label");
+ for(NV t : a) {
+ System.out.println(t.n + ", " + t.v);
+ }
+
+ }
+
+}
Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java (rev 0)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -0,0 +1,428 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph.analytics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+
+import com.bigdata.rdf.graph.EdgesEnum;
+import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.FrontierEnum;
+import com.bigdata.rdf.graph.IGASContext;
+import com.bigdata.rdf.graph.IGASScheduler;
+import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.IReducer;
+import com.bigdata.rdf.graph.impl.BaseGASProgram;
+
+import cutthecrap.utils.striterators.IStriterator;
+
+/**
+ * Page rank assigns weights to the vertices in a graph based by on the relative
+ * "importance" as determined by the patterns of directed links in the graph.
+ * The algorithm is given stated in terms of a computation that is related until
+ * the delta in the computed values for the vertices is within <i>epsilon</i> of
+ * ZERO. However, in practice convergence is based on <i>epsilon</i> is
+ * problematic due to the manner in which the results of the floating point
+ * operations depend on the sequence of those operations (which is why this
+ * implementation uses <code>double</code> precision). Thus, page rank is
+ * typically executed a specific number of iterations, e.g., 50 or 100. If
+ * convergence is based on epsilon, then it is possible that the computation
+ * will never converge, especially for smaller values of epsilon.
+ * <dl>
+ * <dt>init</dt>
+ * <dd>All vertices are inserted into the initial frontier.</dd>
+ * <dt>Gather</dt>
+ * <dd>sum( neighbor_value / neighbor_num_out_edges ) over the in-edges of the
+ * graph.</dd>
+ * <dt>Apply</dt>
+ * <dd>value = <i>resetProb</i> + (1.0 \xD0 <i>resetProb</i>) * gatherSum</dd>
+ * <dt>Scatter</dt>
+ * <dd>if (a) value has significantly changed <code>(fabs(old-new) GT
+ * <i>epsilon</i>)</code>; or (b) iterations LT limit</dd>
+ * </dl>
+ * <ul>
+ * <li>where <i>resetProb</i> is a value that determines a random reset
+ * probability and defaults to {@value PR#DEFAULT_RESET_PROB}.</li>
+ * <li>
+ * where <i>epsilon</i> controls the degree of convergence before the algorithm
+ * terminates and defaults to {@value PR#DEFAULT_EPSILON}.</li>
+ * </ul>
+ *
+ * FIXME PR UNIT TEST. Verify computed values (within variance) and max
+ * iterations. Ground truth from GL?
+ *
+ * FIXME PR: The out-edges can be taken directly from the vertex distribution.
+ * That will reduce the initialization overhead for PR.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public class PR extends BaseGASProgram<PR.VS, PR.ES, Double> {
+
+ private static final Logger log = Logger.getLogger(PR.class);
+
+ // TOOD javadoc and config.
+ protected static final int DEFAULT_LIMIT = 100;
+ protected static final double DEFAULT_RESET_PROB = 0.15d;
+ protected static final double DEFAULT_EPSILON = 0.01d;
+ protected static final double DEFAULT_MIN_PAGE_RANK = 1d;
+
+ private final double resetProb = DEFAULT_RESET_PROB;
+ private final double epsilon = DEFAULT_EPSILON;
+ private final int limit = DEFAULT_LIMIT;
+ private final double minPageRank = DEFAULT_MIN_PAGE_RANK;
+
+ public static class VS {
+
+ /**
+ * The current computed value for this vertex.
+ * <p>
+ * All vertices are initialized to the reset probability. They are then
+ * updated in each iteration to the new estimated value by apply().
+ */
+ private double value;
+
+ /**
+ * The number of out-edges. This is computed once, when the vertex state
+ * is initialized.
+ */
+ private long outEdges;
+
+ /**
+ * The last delta observed for this vertex.
+ */
+ private double lastChange = 0d;
+
+ /**
+ * The current computed value for this vertex.
+ * <p>
+ * All vertices are initialized to the reset probability. They are then
+ * updated in each iteration to the new estimated value by apply().
+ */
+ public double getValue() {
+
+ return value;
+
+ }
+
+ @Override
+ public String toString() {
+ return "{value=" + value + ",lastChange=" + lastChange + "}";
+ }
+
+ }// class VS
+
+ /**
+ * Edge state is not used.
+ */
+ public static class ES {
+
+ }
+
+ private static final Factory<Value, PR.VS> vertexStateFactory = new Factory<Value, PR.VS>() {
+
+ @Override
+ public PR.VS initialValue(final Value value) {
+
+ return new VS();
+
+ }
+
+ };
+
+ @Override
+ public Factory<Value, PR.VS> getVertexStateFactory() {
+
+ return vertexStateFactory;
+
+ }
+
+ @Override
+ public Factory<Statement, PR.ES> getEdgeStateFactory() {
+
+ return null;
+
+ }
+
+ @Override
+ public FrontierEnum getInitialFrontierEnum() {
+
+ return FrontierEnum.AllVertices;
+
+ }
+
+ @Override
+ public EdgesEnum getGatherEdges() {
+
+ return EdgesEnum.InEdges;
+
+ }
+
+ @Override
+ public EdgesEnum getScatterEdges() {
+
+ return EdgesEnum.OutEdges;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Overridden to only visit the edges of the graph.
+ */
+ @Override
+ public IStriterator constrainFilter(
+ final IGASContext<PR.VS, PR.ES, Double> ctx, final IStriterator itr) {
+
+ return itr.addFilter(getEdgeOnlyFilter(ctx));
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Each vertex is initialized to the reset probability.
+ *
+ * FIXME We need to do this efficiently. E.g., using a scan to find all of
+ * the vertices together with their in-degree or out-degree. That should be
+ * done to populate the frontier, initializing the #of out-edges at the same
+ * time.
+ */
+ @Override
+ public void initVertex(final IGASContext<PR.VS, PR.ES, Double> ctx,
+ final IGASState<PR.VS, PR.ES, Double> state, final Value u) {
+
+ final PR.VS us = state.getState(u);
+
+ us.value = resetProb;
+
+ us.outEdges = ctx.getGraphAccessor().getEdgeCount(ctx, u,
+ EdgesEnum.OutEdges);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ */
+ @Override
+ public Double gather(final IGASState<PR.VS, PR.ES, Double> state,
+ final Value u, final Statement e) {
+
+ final Value v = state.getOtherVertex(u, e);
+
+ final PR.VS vs = state.getState(v);
+
+ /*
+ * Note: Division by zero should not be possible here since the edge
+ * that we used to discover [v] is an out-edge of [v].
+ */
+
+ return (vs.value / vs.outEdges);
+
+ }
+
+ /**
+ * SUM
+ * <p>
+ * {@inheritDoc}
+ */
+ @Override
+ public Double sum(final IGASState<PR.VS, PR.ES, Double> state,
+ final Double left, final Double right) {
+
+ return left + right;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Compute the new value for this vertex, making a note of the last change
+ * for this vertex.
+ */
+ @Override
+ public PR.VS apply(final IGASState<PR.VS, PR.ES, Double> state,
+ final Value u, final Double sum) {
+
+ final PR.VS us = state.getState(u);
+
+ if (sum == null) {
+
+ /*
+ * No in-edges visited by Gather. No change. Vertex will be dropped
+ * from the frontier.
+ */
+
+ us.lastChange = 0d;
+
+ return null;
+
+ }
+
+ final double newval = resetProb + (1.0 - resetProb) * sum;
+
+ us.lastChange = (newval - us.value);
+
+ us.value = newval;
+
+ return us;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Returns <code>true</code> iff the last change was greater then epsilon.
+ */
+ @Override
+ public boolean isChanged(final IGASState<VS, ES, Double> state,
+ final Value u) {
+
+ final PR.VS us = state.getState(u);
+
+ return us.lastChange > epsilon;
+
+ }
+
+ /**
+ * The remote vertex is scheduled for activation unless it has already been
+ * visited.
+ * <p>
+ * Note: We are scattering to out-edges. Therefore, this vertex is
+ * {@link Statement#getSubject()}. The remote vertex is
+ * {@link Statement#getObject()}.
+ */
+ @Override
+ public void scatter(final IGASState<PR.VS, PR.ES, Double> state,
+ final IGASScheduler sch, final Value u, final Statement e) {
+
+ final Value v = state.getOtherVertex(u, e);
+
+ sch.schedule(v);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Continue unless the iteration limit has been reached.
+ */
+ @Override
+ public boolean nextRound(final IGASContext<PR.VS, PR.ES, Double> ctx) {
+
+ return ctx.getGASState().round() < limit;
+
+ }
+
+ @Override
+ public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) {
+
+ final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>();
+
+ ctx.getGASState().reduce(
+ new IReducer<PR.VS, PR.ES, Double, Map<Value, Double>>() {
+
+ @Override
+ public void visit(final IGASState<VS, ES, Double> state,
+ final Value u) {
+
+ final VS us = state.getState(u);
+
+ if (us != null) {
+
+ final double pageRank = us.getValue();
+
+ // FIXME Why are NaNs showing up?
+ if (Double.isNaN(pageRank))
+ return;
+
+ // FIXME Do infinite values show up?
+ if (Double.isInfinite(pageRank))
+ return;
+
+ if (pageRank < minPageRank) {
+ // Ignore small values.
+ return;
+ }
+
+ /*
+ * Only report the larger ranked values.
+ */
+
+ if (log.isDebugEnabled())
+ log.debug("v=" + u + ", pageRank=" + pageRank);
+
+ values.put(u, Double.valueOf(pageRank));
+
+ }
+
+ }
+
+ @Override
+ public Map<Value, Double> get() {
+
+ return Collections.unmodifiableMap(values);
+
+ }
+ });
+
+ class NV implements Comparable<NV> {
+ public final double n;
+ public final Value v;
+ public NV(double n, Value v) {
+ this.n = n;
+ this.v = v;
+ }
+ @Override
+ public int compareTo(final NV o) {
+ if (o.n > this.n)
+ return 1;
+ if (o.n < this.n)
+ return -1;
+ return 0;
+ }
+ }
+
+ final NV[] a = new NV[values.size()];
+
+ int i = 0;
+
+ for (Map.Entry<Value, Double> e : values.entrySet()) {
+
+ a[i++] = new NV(e.getValue().doubleValue(), e.getKey());
+
+ }
+
+ Arrays.sort(a);
+
+ System.out.println("rank, pageRank, vertex");
+ i = 0;
+ for (NV t : a) {
+
+ System.out.println(i + ", " + t.n + ", " + t.v);
+
+ i++;
+
+ }
+
+ }
+
+}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -20,6 +20,7 @@
import org.openrdf.model.Value;
import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.FrontierEnum;
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASScheduler;
import com.bigdata.rdf.graph.IGASState;
@@ -29,22 +30,24 @@
/**
* SSSP (Single Source, Shortest Path). This analytic computes the shortest path
- * to each vertex in the graph starting from the given vertex. Only connected
- * vertices are visited by this implementation (the frontier never leaves the
- * connected component in which the starting vertex is located).
+ * to each connected vertex in the graph starting from the given vertex. Only
+ * connected vertices are visited by this implementation (the frontier never
+ * leaves the connected component in which the starting vertex is located).
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*
- * TODO There is no reason to do a gather on the first round. Add
- * isGather()? (parallel to isChanged() for scatter?)
- *
- * TODO Add reducer pattern for finding the maximum degree vertex.
- *
* TODO Add parameter for directed versus undirected SSSP. When
* undirected, the gather and scatter are for AllEdges. Otherwise,
* gather on in-edges and scatter on out-edges. Also, we need to use a
* getOtherVertex(e) method to figure out the other edge when using
* undirected scatter/gather. Add unit test for undirected.
+ *
+ * FIXME New SSSP (push style scatter abstraction with new test case
+ * based on graph example developed for this)
+ *
+ * TODO Add a reducer to report the actual minimum length paths. This is
+ * similar to a BFS tree, but the path lengths are not integer values so
+ * we need a different data structure to collect them.
*/
public class SSSP extends BaseGASProgram<SSSP.VS, SSSP.ES, Integer/* dist */> {
@@ -76,8 +79,12 @@
* The minimum observed distance (in hops) from the source to this
* vertex and initially {@link Integer#MAX_VALUE}. When this value is
* modified, the {@link #changed} flag is set as a side-effect.
+ *
+ * FIXME This really needs to be a floating point value, probably
+ * double. We also need tests with non-integer weights and non- positive
+ * weights.
*/
- private int dist = Integer.MAX_VALUE;
+ private Integer dist = Integer.MAX_VALUE;
private boolean changed = false;
@@ -147,6 +154,13 @@
}
+ @Override
+ public FrontierEnum getInitialFrontierEnum() {
+
+ return FrontierEnum.SingleVertex;
+
+ }
+
// @Override
// public Factory<ISPO, SSSP.ES> getEdgeStateFactory() {
//
@@ -188,8 +202,8 @@
* {@inheritDoc}
*/
@Override
- public void init(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
- final Value u) {
+ public void initVertex(final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx,
+ final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u) {
final VS us = state.getState(u);
@@ -235,7 +249,8 @@
* MIN
*/
@Override
- public Integer sum(final Integer left, final Integer right) {
+ public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
+ final Integer left, final Integer right) {
return Math.min(left, right);
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -15,15 +15,22 @@
*/
package com.bigdata.rdf.graph.impl;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.FrontierEnum;
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.impl.util.VertexDistribution;
import cutthecrap.utils.striterators.Filter;
import cutthecrap.utils.striterators.IFilter;
@@ -40,6 +47,8 @@
abstract public class BaseGASProgram<VS, ES, ST> implements
IGASProgram<VS, ES, ST> {
+ private static final Logger log = Logger.getLogger(BaseGASProgram.class);
+
/**
* {@inheritDoc}
* <p>
@@ -151,6 +160,30 @@
/**
* {@inheritDoc}
* <p>
+ * The default implementation returns {@link #getGatherEdges()} and the
+ * {@link #getScatterEdges()} if {@link #getGatherEdges()} returns
+ * {@value EdgesEnum#NoEdges}.
+ */
+ @Override
+ public EdgesEnum getSampleEdgesFilter() {
+
+ // Assume that a GATHER will be done for each starting vertex.
+ EdgesEnum edges = getGatherEdges();
+
+ if (edges == EdgesEnum.NoEdges) {
+
+ // If no GATHER is performed, then use the SCATTER edges.
+ edges = getScatterEdges();
+
+ }
+
+ return edges;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
* The default gathers on the {@link EdgesEnum#InEdges}.
*/
@Override
@@ -175,10 +208,72 @@
/**
* {@inheritDoc}
* <p>
+ * The default implementation populates the frontier IFF this is an
+ * {@link FrontierEnum#AllVertices} {@link IGASProgram}.
+ */
+ @Override
+ public void before(final IGASContext<VS, ES, ST> ctx) {
+
+ switch (getInitialFrontierEnum()) {
+ case AllVertices: {
+ addAllVerticesToFrontier(ctx);
+ break;
+ }
+ }
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * The default implementation is a NOP.
+ */
+ @Override
+ public void after(final IGASContext<VS, ES, ST> ctx) {
+
+ // NOP
+
+ }
+
+ /**
+ * Populate the initial frontier using all vertices in the graph.
+ *
+ * @param ctx
+ * The graph evaluation context.
+ *
+ * TODO This has a random number generator whose initial seed is
+ * not controlled by the caller. However, the desired use case
+ * here is to produce a distribution over ALL vertices so the
+ * random number should be ignored - perhaps we should pass it in
+ * as <code>null</code>?
+ */
+ private void addAllVerticesToFrontier(final IGASContext<VS, ES, ST> ctx) {
+
+ final IGASState<VS, ES, ST> gasState = ctx.getGASState();
+
+ final EdgesEnum sampleEdges = getSampleEdgesFilter();
+
+ final VertexDistribution dist = ctx.getGraphAccessor().getDistribution(
+ new Random());
+
+ final Resource[] initialFrontier = dist.getUnweightedSample(
+ dist.size(), sampleEdges);
+
+ if (log.isDebugEnabled())
+ log.debug("initialFrontier=" + Arrays.toString(initialFrontier));
+
+ gasState.setFrontier(ctx, initialFrontier);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
* The default is a NOP.
*/
@Override
- public void init(final IGASState<VS, ES, ST> state, final Value u) {
+ public void initVertex(final IGASContext<VS, ES, ST> ctx,
+ final IGASState<VS, ES, ST> state, final Value u) {
// NOP
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -104,10 +104,17 @@
}
@Override
+ public IGraphAccessor getGraphAccessor() {
+ return graphAccessor;
+ }
+
+ @Override
public IGASStats call() throws Exception {
final GASStats total = new GASStats();
+ program.before(this);
+
while (!gasState.frontier().isEmpty()) {
final GASStats roundStats = new GASStats();
@@ -123,6 +130,8 @@
gasState.traceState();
+ program.after(this);
+
// Done
return total;
@@ -160,7 +169,7 @@
* SCATTER depending on some characteristics of the algorithm. Is this
* worth while?
*
- * TODO The ability to pushd down the APPLY for AllEdges for the GATHER
+ * Note: The ability to push down the APPLY for AllEdges for the GATHER
* depends on our using the union of the in-edges and out-edges
* iterators to visit those edges. That union means that we do not have
* to preserve the accumulant across the in-edges and out-edges aspects
@@ -177,23 +186,18 @@
final boolean pushDownApplyInScatter;
final boolean runApplyStage;
- if (scatterEdges == EdgesEnum.NoEdges) {
+ if (gatherEdges != EdgesEnum.NoEdges) {
// Do APPLY() in GATHER.
pushDownApplyInGather = true;
pushDownApplyInScatter = false;
runApplyStage = false;
- } else if (gatherEdges == EdgesEnum.NoEdges) {
+ } else if (scatterEdges != EdgesEnum.NoEdges) {
// APPLY() in SCATTER.
pushDownApplyInGather = false;
pushDownApplyInScatter = true;
runApplyStage = false;
} else {
- /*
- * Do not push down the APPLY.
- *
- * TODO We could still push down the apply into the GATHER if we are
- * doing both stages.
- */
+ // Do not push down the APPLY.
pushDownApplyInGather = false;
pushDownApplyInScatter = false;
runApplyStage = true;
@@ -620,7 +624,7 @@
} else {
- left = program.sum(left, right);
+ left = program.sum(gasState, left, right);
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-18 15:34:43 UTC (rev 7460)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-19 20:01:51 UTC (rev 7461)
@@ -15,6 +15,7 @@
*/
package com.bigdata.rdf.graph.impl;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,8 +27,10 @@
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
import com.bigdata.rdf.graph.Factory;
+import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASEngine;
import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGASSchedulerImpl;
@@ -108,8 +111,13 @@
* Provides access to the backing graph. Used to decode vertices and edges
* for {@link #traceState()}.
*/
- private IGraphAccessor graphAccessor;
+ private final IGraphAccessor graphAccessor;
+ /**
+ * Used to establish a total ordering over RDF {@link Value}s.
+ */
+ private final Comparator<Value> valueComparator;
+
public GASState(final IGASEngine gasEngine,//
final IGraphAccessor graphAccessor, //
final IStaticFrontier frontier,//
@@ -146,6 +154,13 @@
this.scheduler = gasScheduler;
+ /*
+ * TODO This is the SPARQL value ordering. It might be not be total or
+ * stable. If not, we can use an ordering over the string values of the
+ * RDF Values, but that will push the heap.
+ */
+ this.valueComparator = new ValueComparator();
+
}
/**
@@ -242,7 +257,7 @@
}
@Override
- public void init(final Value... vertices) {
+ public void setFrontier(final IGASContext<VS, ES, ST> ctx, final Value... vertices) {
if (vertices == null)
throw new IllegalArgumentException();
@@ -263,7 +278,7 @@
*/
for (Value v : tmp) {
- gasProgram.init(this, v);
+ gasProgram.initVertex(ctx, this, v);
}
@@ -272,7 +287,7 @@
&& sortFrontier, tmp.iterator());
}
-
+
@Override
public void traceState() {
@@ -392,4 +407,15 @@
return null;
}
+ @Override
+ public int compareTo(final Value u, final Valu...
[truncated message content] |