From: <tho...@us...> - 2014-04-02 20:53:55
|
Revision: 8029 http://sourceforge.net/p/bigdata/code/8029 Author: thompsonbry Date: 2014-04-02 20:53:52 +0000 (Wed, 02 Apr 2014) Log Message: ----------- Changed SSSP to use the push-style scatter pattern and added a predecessor that is tracked during the SSSP evaluation. Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 16:13:03 UTC (rev 8028) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 20:53:52 UTC (rev 8029) @@ -16,6 +16,7 @@ package com.bigdata.rdf.graph.analytics; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.openrdf.model.Statement; @@ -40,21 +41,6 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * - * TODO Add parameter for directed versus undirected SSSP. When - * undirected, the gather and scatter are for AllEdges. Otherwise, - * gather on in-edges and scatter on out-edges. Also, we need to use a - * getOtherVertex(e) method to figure out the other edge when using - * undirected scatter/gather. Add unit test for undirected. - * - * FIXME New SSSP (push style scatter abstraction with new test case - * based on graph example developed for this). Note: The push style - * scatter on the GPU is implemented by capturing each (src,edge) pair - * as a distint entry in the frontier. This gives us all of the - * necessary variety. We then reduce that variety, applying the binary - * operator to combine the intermediate results. Finally, an APPLY() - * phase is executed to update the state of the distinct vertices in the - * frontier. - * * FIXME Add a reducer to report the actual minimum length paths. This * is similar to a BFS tree, but the path lengths are not integer values * so we need a different data structure to collect them (we need to @@ -97,17 +83,23 @@ */ private Integer dist = Integer.MAX_VALUE; +// /** +// * Note: This flag is cleared by apply() and then conditionally set +// * iff the {@link #dist()} is replaced by the new value from the +// * gather. Thus, if the gather does not reduce the value, then the +// * propagation of the algorithm is halted. However, this causes the +// * algorithm to NOT scatter for round zero, which causes it to halt. +// * I plan to fix the algorithm by doing the "push" style update in +// * the scatter phase. That will completely remove the gather phase +// * of the algorithm. +// */ +// private boolean changed = false; + /** - * Note: This flag is cleared by apply() and then conditionally set - * iff the {@link #dist()} is replaced by the new value from the - * gather. Thus, if the gather does not reduce the value, then the - * propagation of the algorithm is halted. However, this causes the - * algorithm to NOT scatter for round zero, which causes it to halt. - * I plan to fix the algorithm by doing the "push" style update in - * the scatter phase. That will completely remove the gather phase - * of the algorithm. + * The predecessor is the source vertex to visit a given target vertex + * with the minimum observed distance. */ - private boolean changed = false; + private final AtomicReference<Value> predecessor = new AtomicReference<Value>(); // /** // * Set the distance for the vertex to ZERO. This is done for the @@ -120,15 +112,15 @@ // } // } - /** - * Return <code>true</code> if the {@link #dist()} was updated by the - * last APPLY. - */ - public boolean isChanged() { - synchronized (this) { - return changed; - } - } +// /** +// * Return <code>true</code> if the {@link #dist()} was updated by the +// * last APPLY. +// */ +// public boolean isChanged() { +// synchronized (this) { +// return changed; +// } +// } /** * The current estimate of the minimum distance from the starting vertex @@ -144,7 +136,9 @@ @Override public String toString() { - return "{dist=" + dist() + ", changed=" + isChanged() + "}"; + return "{dist=" + dist() + ", predecessor=" + predecessor.get() +// + ", changed=" + isChanged() + + "}"; } @@ -155,47 +149,48 @@ // Set distance to zero for starting vertex. dist = 0; + this.predecessor.set(null); - // Must be true to trigger scatter in the 1st round! - changed = true; +// // Must be true to trigger scatter in the 1st round! +// changed = true; } - /** - * Update the vertex state to the minimum of the combined sum and its - * current state. - * - * @param u - * The vertex that is the owner of this {@link VS vertex - * state} (used only for debug info). - * @param sum - * The combined sum from the gather phase. - * - * @return <code>this</code> iff the vertex state was modified. - * - * FIXME PREDECESSOR: We can not track the predecessor because - * the SSSP algorithm currently uses a GATHER phase and a - * SCATTER phase rather than doing all the work in a push-style - * SCATTER phase. - */ - synchronized private VS apply(final Value u, final Integer sum) { +// /** +// * Update the vertex state to the minimum of the combined sum and its +// * current state. +// * +// * @param u +// * The vertex that is the owner of this {@link VS vertex +// * state} (used only for debug info). +// * @param sum +// * The combined sum from the gather phase. +// * +// * @return <code>this</code> iff the vertex state was modified. +// * +// * FIXME PREDECESSOR: We can not track the predecessor because +// * the SSSP algorithm currently uses a GATHER phase and a +// * SCATTER phase rather than doing all the work in a push-style +// * SCATTER phase. +// */ +// synchronized private VS apply(final Value u, final Integer sum) { +// +// final int minDist = sum; +// +// changed = false; +// if (dist > minDist) { +// dist = minDist; +// changed = true; +// if (log.isDebugEnabled()) +// log.debug("u=" + u + ", us=" + this + ", minDist=" +// + minDist); +// return this; +// } +// +// return null; +// +// } - final int minDist = sum; - - changed = false; - if (dist > minDist) { - dist = minDist; - changed = true; - if (log.isDebugEnabled()) - log.debug("u=" + u + ", us=" + this + ", minDist=" - + minDist); - return this; - } - - return null; - - } - /** * Update the vertex state to the new (reduced) distance. * @@ -213,7 +208,8 @@ */ if (newDist < dist) { dist = newDist; - changed = true; + this.predecessor.set(predecessor); +// changed = true; return true; } return false; @@ -263,7 +259,8 @@ @Override public EdgesEnum getGatherEdges() { - return EdgesEnum.InEdges; +// return EdgesEnum.InEdges; + return EdgesEnum.NoEdges; } @@ -297,57 +294,57 @@ @Override public Integer gather(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u, final Statement e) { + throw new UnsupportedOperationException(); -// assert e.getObject().equals(u); +//// assert e.getObject().equals(u); +// +//// final VS src = state.getState(e.getSubject()); +// final VS src = state.getState(u); +// +// final int d = src.dist(); +// +// if (d == Integer.MAX_VALUE) { +// +// // Note: Avoids overflow (wrapping around to a negative value). +// return d; +// +// } +// +// return d + EDGE_LENGTH; -// final VS src = state.getState(e.getSubject()); - final VS src = state.getState(u); - - final int d = src.dist(); - - if (d == Integer.MAX_VALUE) { - - // Note: Avoids overflow (wrapping around to a negative value). - return d; - - } - - return d + EDGE_LENGTH; - } /** - * MIN + * UNUSED. */ @Override public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Integer left, final Integer right) { + throw new UnsupportedOperationException(); +// return Math.min(left, right); - return Math.min(left, right); - } - /** - * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new - * <i>sum</i>. - * <p> - * {@inheritDoc} - */ + /** NOP. */ +// * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new +// * <i>sum</i>. +// * <p> +// * {@inheritDoc} @Override public SSSP.VS apply(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u, final Integer sum) { - if (sum != null) { +// if (sum != null) { +// +//// log.error("u=" + u + ", us=" + us + ", sum=" + sum); +// +// // Get the state for that vertex. +// final SSSP.VS us = state.getState(u); +// +// return us.apply(u, sum); +// +// } -// log.error("u=" + u + ", us=" + us + ", sum=" + sum); - - // Get the state for that vertex. - final SSSP.VS us = state.getState(u); - - return us.apply(u, sum); - - } - // No change. return null; @@ -370,32 +367,9 @@ // } /** - * The remote vertex is scheduled if this vertex is changed. - * <p> - * Note: We are scattering to out-edges. Therefore, this vertex is - * {@link Statement#getSubect()}. The remote vertex is - * {@link Statement#getObject()}. - * <p> - * {@inheritDoc} - * - * FIXME OPTIMIZE: Test both variations on a variety of data sets and see - * which is better (actually, just replace with a push style Scatter of the - * updates): - * - * <p> - * Zhisong wrote: In the original GASengine, the scatter operator only need - * to access the status of the source: src.changes. - * - * To check the status of destination, it needs to load destination data: - * dst.dist and edge data: e. And then check if new dist is different from - * the old value. - * - * Bryan wrote: I will have to think about this more. It sounds like it - * depends on the fan-out of the scatter at time t versus the fan-in of the - * gather at time t+1. The optimization might only benefit if a reasonable - * fraction of the destination vertices wind up NOT being retriggered. I - * will try on these variations in the Java code as well. - * </p> + * The remote vertex is scheduled the weighted edge from this vertex to the + * remote vertex plus the weight on this vertex is less than the weight on + * the remote vertex. */ @Override public void scatter(final IGASState<SSSP.VS, SSSP.ES, Integer> state, @@ -413,8 +387,10 @@ // last observed distance for the remote vertex. final int otherDist = otherState.dist(); + // Note: test first without lock. if (newDist < otherDist) { + // Tested again inside VS while holding lock. if (otherState.scatter(u/* predecessor */, newDist)) { if (log.isDebugEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |