|
From: <tho...@us...> - 2014-04-02 20:53:55
|
Revision: 8029
http://sourceforge.net/p/bigdata/code/8029
Author: thompsonbry
Date: 2014-04-02 20:53:52 +0000 (Wed, 02 Apr 2014)
Log Message:
-----------
Changed SSSP to use the push-style scatter pattern and added a predecessor that is tracked during the SSSP evaluation.
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 16:13:03 UTC (rev 8028)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 20:53:52 UTC (rev 8029)
@@ -16,6 +16,7 @@
package com.bigdata.rdf.graph.analytics;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.openrdf.model.Statement;
@@ -40,21 +41,6 @@
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*
- * TODO Add parameter for directed versus undirected SSSP. When
- * undirected, the gather and scatter are for AllEdges. Otherwise,
- * gather on in-edges and scatter on out-edges. Also, we need to use a
- * getOtherVertex(e) method to figure out the other edge when using
- * undirected scatter/gather. Add unit test for undirected.
- *
- * FIXME New SSSP (push style scatter abstraction with new test case
- * based on graph example developed for this). Note: The push style
- * scatter on the GPU is implemented by capturing each (src,edge) pair
- * as a distint entry in the frontier. This gives us all of the
- * necessary variety. We then reduce that variety, applying the binary
- * operator to combine the intermediate results. Finally, an APPLY()
- * phase is executed to update the state of the distinct vertices in the
- * frontier.
- *
* FIXME Add a reducer to report the actual minimum length paths. This
* is similar to a BFS tree, but the path lengths are not integer values
* so we need a different data structure to collect them (we need to
@@ -97,17 +83,23 @@
*/
private Integer dist = Integer.MAX_VALUE;
+// /**
+// * Note: This flag is cleared by apply() and then conditionally set
+// * iff the {@link #dist()} is replaced by the new value from the
+// * gather. Thus, if the gather does not reduce the value, then the
+// * propagation of the algorithm is halted. However, this causes the
+// * algorithm to NOT scatter for round zero, which causes it to halt.
+// * I plan to fix the algorithm by doing the "push" style update in
+// * the scatter phase. That will completely remove the gather phase
+// * of the algorithm.
+// */
+// private boolean changed = false;
+
/**
- * Note: This flag is cleared by apply() and then conditionally set
- * iff the {@link #dist()} is replaced by the new value from the
- * gather. Thus, if the gather does not reduce the value, then the
- * propagation of the algorithm is halted. However, this causes the
- * algorithm to NOT scatter for round zero, which causes it to halt.
- * I plan to fix the algorithm by doing the "push" style update in
- * the scatter phase. That will completely remove the gather phase
- * of the algorithm.
+ * The predecessor is the source vertex to visit a given target vertex
+ * with the minimum observed distance.
*/
- private boolean changed = false;
+ private final AtomicReference<Value> predecessor = new AtomicReference<Value>();
// /**
// * Set the distance for the vertex to ZERO. This is done for the
@@ -120,15 +112,15 @@
// }
// }
- /**
- * Return <code>true</code> if the {@link #dist()} was updated by the
- * last APPLY.
- */
- public boolean isChanged() {
- synchronized (this) {
- return changed;
- }
- }
+// /**
+// * Return <code>true</code> if the {@link #dist()} was updated by the
+// * last APPLY.
+// */
+// public boolean isChanged() {
+// synchronized (this) {
+// return changed;
+// }
+// }
/**
* The current estimate of the minimum distance from the starting vertex
@@ -144,7 +136,9 @@
@Override
public String toString() {
- return "{dist=" + dist() + ", changed=" + isChanged() + "}";
+ return "{dist=" + dist() + ", predecessor=" + predecessor.get()
+// + ", changed=" + isChanged()
+ + "}";
}
@@ -155,47 +149,48 @@
// Set distance to zero for starting vertex.
dist = 0;
+ this.predecessor.set(null);
- // Must be true to trigger scatter in the 1st round!
- changed = true;
+// // Must be true to trigger scatter in the 1st round!
+// changed = true;
}
- /**
- * Update the vertex state to the minimum of the combined sum and its
- * current state.
- *
- * @param u
- * The vertex that is the owner of this {@link VS vertex
- * state} (used only for debug info).
- * @param sum
- * The combined sum from the gather phase.
- *
- * @return <code>this</code> iff the vertex state was modified.
- *
- * FIXME PREDECESSOR: We can not track the predecessor because
- * the SSSP algorithm currently uses a GATHER phase and a
- * SCATTER phase rather than doing all the work in a push-style
- * SCATTER phase.
- */
- synchronized private VS apply(final Value u, final Integer sum) {
+// /**
+// * Update the vertex state to the minimum of the combined sum and its
+// * current state.
+// *
+// * @param u
+// * The vertex that is the owner of this {@link VS vertex
+// * state} (used only for debug info).
+// * @param sum
+// * The combined sum from the gather phase.
+// *
+// * @return <code>this</code> iff the vertex state was modified.
+// *
+// * FIXME PREDECESSOR: We can not track the predecessor because
+// * the SSSP algorithm currently uses a GATHER phase and a
+// * SCATTER phase rather than doing all the work in a push-style
+// * SCATTER phase.
+// */
+// synchronized private VS apply(final Value u, final Integer sum) {
+//
+// final int minDist = sum;
+//
+// changed = false;
+// if (dist > minDist) {
+// dist = minDist;
+// changed = true;
+// if (log.isDebugEnabled())
+// log.debug("u=" + u + ", us=" + this + ", minDist="
+// + minDist);
+// return this;
+// }
+//
+// return null;
+//
+// }
- final int minDist = sum;
-
- changed = false;
- if (dist > minDist) {
- dist = minDist;
- changed = true;
- if (log.isDebugEnabled())
- log.debug("u=" + u + ", us=" + this + ", minDist="
- + minDist);
- return this;
- }
-
- return null;
-
- }
-
/**
* Update the vertex state to the new (reduced) distance.
*
@@ -213,7 +208,8 @@
*/
if (newDist < dist) {
dist = newDist;
- changed = true;
+ this.predecessor.set(predecessor);
+// changed = true;
return true;
}
return false;
@@ -263,7 +259,8 @@
@Override
public EdgesEnum getGatherEdges() {
- return EdgesEnum.InEdges;
+// return EdgesEnum.InEdges;
+ return EdgesEnum.NoEdges;
}
@@ -297,57 +294,57 @@
@Override
public Integer gather(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
final Value u, final Statement e) {
+ throw new UnsupportedOperationException();
-// assert e.getObject().equals(u);
+//// assert e.getObject().equals(u);
+//
+//// final VS src = state.getState(e.getSubject());
+// final VS src = state.getState(u);
+//
+// final int d = src.dist();
+//
+// if (d == Integer.MAX_VALUE) {
+//
+// // Note: Avoids overflow (wrapping around to a negative value).
+// return d;
+//
+// }
+//
+// return d + EDGE_LENGTH;
-// final VS src = state.getState(e.getSubject());
- final VS src = state.getState(u);
-
- final int d = src.dist();
-
- if (d == Integer.MAX_VALUE) {
-
- // Note: Avoids overflow (wrapping around to a negative value).
- return d;
-
- }
-
- return d + EDGE_LENGTH;
-
}
/**
- * MIN
+ * UNUSED.
*/
@Override
public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
final Integer left, final Integer right) {
+ throw new UnsupportedOperationException();
+// return Math.min(left, right);
- return Math.min(left, right);
-
}
- /**
- * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new
- * <i>sum</i>.
- * <p>
- * {@inheritDoc}
- */
+ /** NOP. */
+// * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new
+// * <i>sum</i>.
+// * <p>
+// * {@inheritDoc}
@Override
public SSSP.VS apply(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
final Value u, final Integer sum) {
- if (sum != null) {
+// if (sum != null) {
+//
+//// log.error("u=" + u + ", us=" + us + ", sum=" + sum);
+//
+// // Get the state for that vertex.
+// final SSSP.VS us = state.getState(u);
+//
+// return us.apply(u, sum);
+//
+// }
-// log.error("u=" + u + ", us=" + us + ", sum=" + sum);
-
- // Get the state for that vertex.
- final SSSP.VS us = state.getState(u);
-
- return us.apply(u, sum);
-
- }
-
// No change.
return null;
@@ -370,32 +367,9 @@
// }
/**
- * The remote vertex is scheduled if this vertex is changed.
- * <p>
- * Note: We are scattering to out-edges. Therefore, this vertex is
- * {@link Statement#getSubect()}. The remote vertex is
- * {@link Statement#getObject()}.
- * <p>
- * {@inheritDoc}
- *
- * FIXME OPTIMIZE: Test both variations on a variety of data sets and see
- * which is better (actually, just replace with a push style Scatter of the
- * updates):
- *
- * <p>
- * Zhisong wrote: In the original GASengine, the scatter operator only need
- * to access the status of the source: src.changes.
- *
- * To check the status of destination, it needs to load destination data:
- * dst.dist and edge data: e. And then check if new dist is different from
- * the old value.
- *
- * Bryan wrote: I will have to think about this more. It sounds like it
- * depends on the fan-out of the scatter at time t versus the fan-in of the
- * gather at time t+1. The optimization might only benefit if a reasonable
- * fraction of the destination vertices wind up NOT being retriggered. I
- * will try on these variations in the Java code as well.
- * </p>
+ * The remote vertex is scheduled the weighted edge from this vertex to the
+ * remote vertex plus the weight on this vertex is less than the weight on
+ * the remote vertex.
*/
@Override
public void scatter(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
@@ -413,8 +387,10 @@
// last observed distance for the remote vertex.
final int otherDist = otherState.dist();
+ // Note: test first without lock.
if (newDist < otherDist) {
+ // Tested again inside VS while holding lock.
if (otherState.scatter(u/* predecessor */, newDist)) {
if (log.isDebugEnabled())
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|