|
From: <tho...@us...> - 2014-03-14 20:59:29
|
Revision: 7967
http://sourceforge.net/p/bigdata/code/7967
Author: thompsonbry
Date: 2014-03-14 20:59:25 +0000 (Fri, 14 Mar 2014)
Log Message:
-----------
Added the ability to extract the predecessor from BFS. We can not do this yet for SSSP because the algorithm is using a gather phase. The predecessor would have to be communicated over the gather phase along with the distance. However, rather than do this, I want to change SSSP to use a push style scatter (1/2 the traversed edges).
See #810 (Expose a GAS SERVICE).
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java
branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 16:43:25 UTC (rev 7966)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 20:59:25 UTC (rev 7967)
@@ -21,6 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
@@ -62,8 +63,14 @@
* scheduled.
*/
private final AtomicInteger depth = new AtomicInteger(-1);
-
+
/**
+ * The predecessor is the first source vertex to visit a given target
+ * vertex.
+ */
+ private final AtomicReference<Value> predecessor = new AtomicReference<Value>();
+
+ /**
* The depth at which this vertex was first visited (origin ZERO) and
* <code>-1</code> if the vertex has not been visited.
*/
@@ -74,6 +81,15 @@
}
/**
+ * Return the first vertex to discover this vertex during BFS traversal.
+ */
+ public Value predecessor() {
+
+ return predecessor.get();
+
+ }
+
+ /**
* Note: This marks the vertex at the current traversal depth.
*
* @return <code>true</code> if the vertex was visited for the first
@@ -81,8 +97,9 @@
* first visited the vertex (this helps to avoid multiple
* scheduling of a vertex).
*/
- public boolean visit(final int depth) {
+ public boolean visit(final int depth, final Value predecessor) {
if (this.depth.compareAndSet(-1/* expect */, depth/* newValue */)) {
+ this.predecessor.set(predecessor);
// Scheduled by this thread.
return true;
}
@@ -163,8 +180,8 @@
public void initVertex(final IGASContext<BFS.VS, BFS.ES, Void> ctx,
final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) {
- state.getState(u).visit(0);
-
+ state.getState(u).visit(0, null/* predecessor */);
+
}
/**
@@ -222,10 +239,10 @@
final IGASScheduler sch, final Value u, final Statement e) {
// remote vertex state.
- final VS otherState = state.getState(e.getObject());
+ final VS otherState = state.getState(e.getObject()/* v */);
// visit.
- if (otherState.visit(state.round() + 1)) {
+ if (otherState.visit(state.round() + 1, u/* predecessor */)) {
/*
* This is the first visit for the remote vertex. Add it to the
@@ -249,8 +266,12 @@
* {@inheritDoc}
* <p>
* <dl>
- * <dt>1</dt>
- * <dd>The depth at which the vertex was first encountered during traversal.</dd>
+ * <dt>{@value Bindings#DEPTH}</dt>
+ * <dd>The depth at which the vertex was first encountered during traversal.
+ * </dd>
+ * <dt>{@value Bindings#PREDECESSOR}</dt>
+ * <dd>The predecessor is the first vertex that discovers a given vertex
+ * during traversal.</dd>
* </dl>
*/
@Override
@@ -262,7 +283,7 @@
@Override
public int getIndex() {
- return 1;
+ return Bindings.DEPTH;
}
@Override
@@ -274,11 +295,47 @@
}
});
+ tmp.add(new IBinder<BFS.VS, BFS.ES, Void>() {
+
+ @Override
+ public int getIndex() {
+ return Bindings.PREDECESSOR;
+ }
+
+ @Override
+ public Value bind(final ValueFactory vf,
+ final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) {
+
+ return state.getState(u).predecessor.get();
+
+ }
+ });
+
return tmp;
}
/**
+ * Additional {@link IBinder}s exposed by {@link BFS}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+ public interface Bindings extends BaseGASProgram.Bindings {
+
+ /**
+ * The depth at which the vertex was visited.
+ */
+ int DEPTH = 1;
+
+ /**
+ * The BFS predecessor is the first vertex to discover a given vertex.
+ *
+ */
+ int PREDECESSOR = 2;
+
+ }
+
+ /**
* Reduce the active vertex state, returning a histogram reporting the #of
* vertices at each distance from the starting vertex. There will always be
* one vertex at depth zero - this is the starting vertex. For each
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 16:43:25 UTC (rev 7966)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 20:59:25 UTC (rev 7967)
@@ -146,6 +146,77 @@
}
+ /**
+ * Mark this as a starting vertex (distance:=ZERO, changed:=true).
+ */
+ synchronized private void setStartingVertex() {
+
+ // Set distance to zero for starting vertex.
+ dist = 0;
+
+ // Must be true to trigger scatter in the 1st round!
+ changed = true;
+
+ }
+
+ /**
+ * Update the vertex state to the minimum of the combined sum and its
+ * current state.
+ *
+ * @param u
+ * The vertex that is the owner of this {@link VS vertex
+ * state} (used only for debug info).
+ * @param sum
+ * The combined sum from the gather phase.
+ *
+ * @return <code>this</code> iff the vertex state was modified.
+ *
+ * FIXME PREDECESSOR: We can not track the predecessor because
+ * the SSSP algorithm currently uses a GATHER phase and a
+ * SCATTER phase rather than doing all the work in a push-style
+ * SCATTER phase.
+ */
+ synchronized private VS apply(final Value u, final Integer sum) {
+
+ final int minDist = sum;
+
+ changed = false;
+ if (dist > minDist) {
+ dist = minDist;
+ changed = true;
+ if (log.isDebugEnabled())
+ log.debug("u=" + u + ", us=" + this + ", minDist="
+ + minDist);
+ return this;
+ }
+
+ return null;
+
+ }
+
+ /**
+ * Update the vertex state to the new (reduced) distance.
+ *
+ * @param predecessor
+ * The vertex that propagated the update to this vertex.
+ * @param newDist
+ * The new distance.
+ *
+ * @return <code>true</code> iff this vertex state was changed.
+ */
+ synchronized private boolean scatter(final Value predecessor,
+ final int newDist) {
+ /*
+ * Validate that the distance has decreased while holding the lock.
+ */
+ if (newDist < dist) {
+ dist = newDist;
+ changed = true;
+ return true;
+ }
+ return false;
+ }
+
}// class VS
/**
@@ -212,15 +283,7 @@
final VS us = state.getState(u);
- synchronized (us) {
-
- // Set distance to zero for starting vertex.
- us.dist = 0;
-
- // Must be true to trigger scatter in the 1st round!
- us.changed = true;
-
- }
+ us.setStartingVertex();
}
@@ -278,18 +341,8 @@
// Get the state for that vertex.
final SSSP.VS us = state.getState(u);
- final int minDist = sum;
-
- synchronized(us) {
- us.changed = false;
- if (us.dist > minDist) {
- us.dist = minDist;
- us.changed = true;
- if (log.isDebugEnabled())
- log.debug("u=" + u + ", us=" + us + ", minDist=" + minDist);
- return us;
- }
- }
+ return us.apply(u, sum);
+
}
// No change.
@@ -351,26 +404,26 @@
final VS otherState = state.getState(other);
- // last observed distance for the remote vertex.
- final int otherDist = otherState.dist();
-
// new distance for the remote vertex.
final int newDist = selfState.dist() + EDGE_LENGTH;
+ // last observed distance for the remote vertex.
+ final int otherDist = otherState.dist();
+
if (newDist < otherDist) {
- synchronized (otherState) {
- otherState.dist = newDist;
- otherState.changed = true;
+ if (otherState.scatter(u/* predecessor */, newDist)) {
+
+ if (log.isDebugEnabled())
+ log.debug("u=" + u + " @ " + selfState.dist()
+ + ", scheduling: " + other + " with newDist="
+ + newDist);
+
+ // Then add the remote vertex to the next frontier.
+ sch.schedule(e.getObject());
+
}
-
- if (log.isDebugEnabled())
- log.debug("u=" + u + " @ " + selfState.dist()
- + ", scheduling: " + other + " with newDist=" + newDist);
- // Then add the remote vertex to the next frontier.
- sch.schedule(e.getObject());
-
}
}
@@ -400,7 +453,7 @@
@Override
public int getIndex() {
- return 1;
+ return Bindings.DISTANCE;
}
@Override
@@ -417,4 +470,18 @@
}
+ /**
+ * Additional {@link IBinder}s exposed by {@link SSSP}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+ public interface Bindings extends BaseGASProgram.Bindings {
+
+ /**
+ * The shortest distance to the vertex.
+ */
+ int DISTANCE = 1;
+
+ }
+
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 16:43:25 UTC (rev 7966)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 20:59:25 UTC (rev 7967)
@@ -222,48 +222,55 @@
}
/**
- * Return an {@link IBinder} for the vertex itself
+ * {@inheritDoc}
+ * <p>
+ * <dl>
+ * <dt>{@value Bindings#VISITED}</dt>
+ * <dd>The visited vertex itself.</dd>
+ * </dl>
*/
- private IBinder<VS, ES, ST> getBinder0() {
+ @Override
+ public List<IBinder<VS, ES, ST>> getBinderList() {
- return new IBinder<VS, ES, ST>() {
+ final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>();
+ tmp.add(new IBinder<VS, ES, ST>() {
+
@Override
public int getIndex() {
-
- return 0;
-
+
+ return Bindings.VISITED;
+
}
@Override
public Value bind(final ValueFactory vf,
final IGASState<VS, ES, ST> state, final Value u) {
-
+
return u;
-
+
}
- };
-
+ });
+
+ return tmp;
+
}
/**
- * {@inheritDoc}
- * <p>
- * <dl>
- * <dt>0</dt>
- * <dd>The visited vertex itself.</dd>
- * </dl>
+ * Interface declares symbolic constants for the {@link IBinder}s reported
+ * by {@link BaseGASProgram#getBinderList()}.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
*/
- @Override
- public List<IBinder<VS, ES, ST>> getBinderList() {
+ public interface Bindings {
+
+ /**
+ * The visited vertex identifier.
+ */
+ int VISITED = 0;
- final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>();
-
- tmp.add(getBinder0());
-
- return tmp;
-
}
-
+
}
Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 16:43:25 UTC (rev 7966)
+++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 20:59:25 UTC (rev 7967)
@@ -70,12 +70,19 @@
gasContext.call();
assertEquals(0, gasState.getState(p.getMike()).depth());
+ assertEquals(null, gasState.getState(p.getMike()).predecessor());
assertEquals(1, gasState.getState(p.getFoafPerson()).depth());
+ assertEquals(p.getMike(), gasState.getState(p.getFoafPerson())
+ .predecessor());
assertEquals(1, gasState.getState(p.getBryan()).depth());
+ assertEquals(p.getMike(), gasState.getState(p.getBryan())
+ .predecessor());
assertEquals(2, gasState.getState(p.getMartyn()).depth());
+ assertEquals(p.getBryan(), gasState.getState(p.getMartyn())
+ .predecessor());
} finally {
Modified: branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java
===================================================================
--- branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java 2014-03-14 16:43:25 UTC (rev 7966)
+++ branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java 2014-03-14 20:59:25 UTC (rev 7967)
@@ -71,12 +71,19 @@
gasContext.call();
assertEquals(0, gasState.getState(p.getMike()).depth());
+ assertEquals(null, gasState.getState(p.getMike()).predecessor());
assertEquals(1, gasState.getState(p.getFoafPerson()).depth());
+ assertEquals(p.getMike(), gasState.getState(p.getFoafPerson())
+ .predecessor());
assertEquals(1, gasState.getState(p.getBryan()).depth());
+ assertEquals(p.getMike(), gasState.getState(p.getBryan())
+ .predecessor());
assertEquals(2, gasState.getState(p.getMartyn()).depth());
+ assertEquals(p.getBryan(), gasState.getState(p.getMartyn())
+ .predecessor());
} finally {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|