From: <tho...@us...> - 2014-03-14 20:59:29
|
Revision: 7967 http://sourceforge.net/p/bigdata/code/7967 Author: thompsonbry Date: 2014-03-14 20:59:25 +0000 (Fri, 14 Mar 2014) Log Message: ----------- Added the ability to extract the predecessor from BFS. We can not do this yet for SSSP because the algorithm is using a gather phase. The predecessor would have to be communicated over the gather phase along with the distance. However, rather than do this, I want to change SSSP to use a push style scatter (1/2 the traversed edges). See #810 (Expose a GAS SERVICE). Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 16:43:25 UTC (rev 7966) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 20:59:25 UTC (rev 7967) @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.openrdf.model.Statement; import org.openrdf.model.Value; @@ -62,8 +63,14 @@ * scheduled. */ private final AtomicInteger depth = new AtomicInteger(-1); - + /** + * The predecessor is the first source vertex to visit a given target + * vertex. + */ + private final AtomicReference<Value> predecessor = new AtomicReference<Value>(); + + /** * The depth at which this vertex was first visited (origin ZERO) and * <code>-1</code> if the vertex has not been visited. */ @@ -74,6 +81,15 @@ } /** + * Return the first vertex to discover this vertex during BFS traversal. + */ + public Value predecessor() { + + return predecessor.get(); + + } + + /** * Note: This marks the vertex at the current traversal depth. * * @return <code>true</code> if the vertex was visited for the first @@ -81,8 +97,9 @@ * first visited the vertex (this helps to avoid multiple * scheduling of a vertex). */ - public boolean visit(final int depth) { + public boolean visit(final int depth, final Value predecessor) { if (this.depth.compareAndSet(-1/* expect */, depth/* newValue */)) { + this.predecessor.set(predecessor); // Scheduled by this thread. return true; } @@ -163,8 +180,8 @@ public void initVertex(final IGASContext<BFS.VS, BFS.ES, Void> ctx, final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { - state.getState(u).visit(0); - + state.getState(u).visit(0, null/* predecessor */); + } /** @@ -222,10 +239,10 @@ final IGASScheduler sch, final Value u, final Statement e) { // remote vertex state. - final VS otherState = state.getState(e.getObject()); + final VS otherState = state.getState(e.getObject()/* v */); // visit. - if (otherState.visit(state.round() + 1)) { + if (otherState.visit(state.round() + 1, u/* predecessor */)) { /* * This is the first visit for the remote vertex. Add it to the @@ -249,8 +266,12 @@ * {@inheritDoc} * <p> * <dl> - * <dt>1</dt> - * <dd>The depth at which the vertex was first encountered during traversal.</dd> + * <dt>{@value Bindings#DEPTH}</dt> + * <dd>The depth at which the vertex was first encountered during traversal. + * </dd> + * <dt>{@value Bindings#PREDECESSOR}</dt> + * <dd>The predecessor is the first vertex that discovers a given vertex + * during traversal.</dd> * </dl> */ @Override @@ -262,7 +283,7 @@ @Override public int getIndex() { - return 1; + return Bindings.DEPTH; } @Override @@ -274,11 +295,47 @@ } }); + tmp.add(new IBinder<BFS.VS, BFS.ES, Void>() { + + @Override + public int getIndex() { + return Bindings.PREDECESSOR; + } + + @Override + public Value bind(final ValueFactory vf, + final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { + + return state.getState(u).predecessor.get(); + + } + }); + return tmp; } /** + * Additional {@link IBinder}s exposed by {@link BFS}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + public interface Bindings extends BaseGASProgram.Bindings { + + /** + * The depth at which the vertex was visited. + */ + int DEPTH = 1; + + /** + * The BFS predecessor is the first vertex to discover a given vertex. + * + */ + int PREDECESSOR = 2; + + } + + /** * Reduce the active vertex state, returning a histogram reporting the #of * vertices at each distance from the starting vertex. There will always be * one vertex at depth zero - this is the starting vertex. For each Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 16:43:25 UTC (rev 7966) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 20:59:25 UTC (rev 7967) @@ -146,6 +146,77 @@ } + /** + * Mark this as a starting vertex (distance:=ZERO, changed:=true). + */ + synchronized private void setStartingVertex() { + + // Set distance to zero for starting vertex. + dist = 0; + + // Must be true to trigger scatter in the 1st round! + changed = true; + + } + + /** + * Update the vertex state to the minimum of the combined sum and its + * current state. + * + * @param u + * The vertex that is the owner of this {@link VS vertex + * state} (used only for debug info). + * @param sum + * The combined sum from the gather phase. + * + * @return <code>this</code> iff the vertex state was modified. + * + * FIXME PREDECESSOR: We can not track the predecessor because + * the SSSP algorithm currently uses a GATHER phase and a + * SCATTER phase rather than doing all the work in a push-style + * SCATTER phase. + */ + synchronized private VS apply(final Value u, final Integer sum) { + + final int minDist = sum; + + changed = false; + if (dist > minDist) { + dist = minDist; + changed = true; + if (log.isDebugEnabled()) + log.debug("u=" + u + ", us=" + this + ", minDist=" + + minDist); + return this; + } + + return null; + + } + + /** + * Update the vertex state to the new (reduced) distance. + * + * @param predecessor + * The vertex that propagated the update to this vertex. + * @param newDist + * The new distance. + * + * @return <code>true</code> iff this vertex state was changed. + */ + synchronized private boolean scatter(final Value predecessor, + final int newDist) { + /* + * Validate that the distance has decreased while holding the lock. + */ + if (newDist < dist) { + dist = newDist; + changed = true; + return true; + } + return false; + } + }// class VS /** @@ -212,15 +283,7 @@ final VS us = state.getState(u); - synchronized (us) { - - // Set distance to zero for starting vertex. - us.dist = 0; - - // Must be true to trigger scatter in the 1st round! - us.changed = true; - - } + us.setStartingVertex(); } @@ -278,18 +341,8 @@ // Get the state for that vertex. final SSSP.VS us = state.getState(u); - final int minDist = sum; - - synchronized(us) { - us.changed = false; - if (us.dist > minDist) { - us.dist = minDist; - us.changed = true; - if (log.isDebugEnabled()) - log.debug("u=" + u + ", us=" + us + ", minDist=" + minDist); - return us; - } - } + return us.apply(u, sum); + } // No change. @@ -351,26 +404,26 @@ final VS otherState = state.getState(other); - // last observed distance for the remote vertex. - final int otherDist = otherState.dist(); - // new distance for the remote vertex. final int newDist = selfState.dist() + EDGE_LENGTH; + // last observed distance for the remote vertex. + final int otherDist = otherState.dist(); + if (newDist < otherDist) { - synchronized (otherState) { - otherState.dist = newDist; - otherState.changed = true; + if (otherState.scatter(u/* predecessor */, newDist)) { + + if (log.isDebugEnabled()) + log.debug("u=" + u + " @ " + selfState.dist() + + ", scheduling: " + other + " with newDist=" + + newDist); + + // Then add the remote vertex to the next frontier. + sch.schedule(e.getObject()); + } - - if (log.isDebugEnabled()) - log.debug("u=" + u + " @ " + selfState.dist() - + ", scheduling: " + other + " with newDist=" + newDist); - // Then add the remote vertex to the next frontier. - sch.schedule(e.getObject()); - } } @@ -400,7 +453,7 @@ @Override public int getIndex() { - return 1; + return Bindings.DISTANCE; } @Override @@ -417,4 +470,18 @@ } + /** + * Additional {@link IBinder}s exposed by {@link SSSP}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + public interface Bindings extends BaseGASProgram.Bindings { + + /** + * The shortest distance to the vertex. + */ + int DISTANCE = 1; + + } + } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 16:43:25 UTC (rev 7966) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 20:59:25 UTC (rev 7967) @@ -222,48 +222,55 @@ } /** - * Return an {@link IBinder} for the vertex itself + * {@inheritDoc} + * <p> + * <dl> + * <dt>{@value Bindings#VISITED}</dt> + * <dd>The visited vertex itself.</dd> + * </dl> */ - private IBinder<VS, ES, ST> getBinder0() { + @Override + public List<IBinder<VS, ES, ST>> getBinderList() { - return new IBinder<VS, ES, ST>() { + final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>(); + tmp.add(new IBinder<VS, ES, ST>() { + @Override public int getIndex() { - - return 0; - + + return Bindings.VISITED; + } @Override public Value bind(final ValueFactory vf, final IGASState<VS, ES, ST> state, final Value u) { - + return u; - + } - }; - + }); + + return tmp; + } /** - * {@inheritDoc} - * <p> - * <dl> - * <dt>0</dt> - * <dd>The visited vertex itself.</dd> - * </dl> + * Interface declares symbolic constants for the {@link IBinder}s reported + * by {@link BaseGASProgram#getBinderList()}. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - @Override - public List<IBinder<VS, ES, ST>> getBinderList() { + public interface Bindings { + + /** + * The visited vertex identifier. + */ + int VISITED = 0; - final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>(); - - tmp.add(getBinder0()); - - return tmp; - } - + } Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java =================================================================== --- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 16:43:25 UTC (rev 7966) +++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 20:59:25 UTC (rev 7967) @@ -70,12 +70,19 @@ gasContext.call(); assertEquals(0, gasState.getState(p.getMike()).depth()); + assertEquals(null, gasState.getState(p.getMike()).predecessor()); assertEquals(1, gasState.getState(p.getFoafPerson()).depth()); + assertEquals(p.getMike(), gasState.getState(p.getFoafPerson()) + .predecessor()); assertEquals(1, gasState.getState(p.getBryan()).depth()); + assertEquals(p.getMike(), gasState.getState(p.getBryan()) + .predecessor()); assertEquals(2, gasState.getState(p.getMartyn()).depth()); + assertEquals(p.getBryan(), gasState.getState(p.getMartyn()) + .predecessor()); } finally { Modified: branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java =================================================================== --- branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java 2014-03-14 16:43:25 UTC (rev 7966) +++ branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java 2014-03-14 20:59:25 UTC (rev 7967) @@ -71,12 +71,19 @@ gasContext.call(); assertEquals(0, gasState.getState(p.getMike()).depth()); + assertEquals(null, gasState.getState(p.getMike()).predecessor()); assertEquals(1, gasState.getState(p.getFoafPerson()).depth()); + assertEquals(p.getMike(), gasState.getState(p.getFoafPerson()) + .predecessor()); assertEquals(1, gasState.getState(p.getBryan()).depth()); + assertEquals(p.getMike(), gasState.getState(p.getBryan()) + .predecessor()); assertEquals(2, gasState.getState(p.getMartyn()).depth()); + assertEquals(p.getBryan(), gasState.getState(p.getMartyn()) + .predecessor()); } finally { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |