From: <tho...@us...> - 2014-03-14 23:44:41
|
Revision: 7972 http://sourceforge.net/p/bigdata/code/7972 Author: thompsonbry Date: 2014-03-14 23:44:38 +0000 (Fri, 14 Mar 2014) Log Message: ----------- Added support for the control over directed versus undirected edge traversal semantics. There is a new isDirectedTraversal() option and setDirectedTraversal() option on IGASContext and a gas:directedTraversal option for the GASService. I have written a test of this functionality for BFS. I found and fixed some assumptions in BFS and SSP where they used e.getSubject() or e.getObject() rather than u and gasState.getOtherVertex(u,e). The former do not correctly handle the case where the traversal assumptions change from either in-edges or out-edges to all-edges. The latter (u and getOtherVertex(u,e)) does. See #810 (GAS Service) Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -83,4 +83,27 @@ } } + /** + * Promote an {@link EdgesEnum} value that was specified with the assumption + * that the graph is directed into an {@link EdgesEnum} value that should be + * used when the graph is undirected. There is no change for + * {@link #NoEdges} and {@link #AllEdges}. If the value is either + * {@link #InEdges} or {@link #OutEdges} then it is promoted to + * {@link #AllEdges}. + */ + public EdgesEnum asUndirectedTraversal() { + switch (this) { + case NoEdges: + case AllEdges: + // No change. + return this; + case InEdges: + case OutEdges: + // promote to AllEdges. + return AllEdges; + default: + throw new UnsupportedOperationException(); + } + } + } \ No newline at end of file Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -67,6 +67,22 @@ IGraphAccessor getGraphAccessor(); /** + * Specify whether the visited edges of the graph are to be interpreted as + * directed or undirected (default <code>directed</code>). + * <p> + * The value specified here is used to determine how the {@link EdgesEnum} + * will be interpreted for the GATHER and SCATTER phases. See + * {@link EdgesEnum#asUndirectedTraversal()}. + */ + void setDirectedTraversal(boolean newVal); + + /** + * Return <code>true</code> if the graph should be interpreted as a directed + * graph. + */ + boolean isDirectedTraversal(); + + /** * Specify the maximum number of iterations for the algorithm. A value of * ONE means that the algorithm will halt after the first round. * Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -50,8 +50,10 @@ EdgesEnum getSampleEdgesFilter(); /** - * Return the set of edges to which the GATHER is applied -or- - * {@link EdgesEnum#NoEdges} to skip the GATHER phase. + * Return the set of edges to which the GATHER is applied for a + * <em>directed</em> graph -or- {@link EdgesEnum#NoEdges} to skip the GATHER + * phase. This will be interpreted based on the value reported by + * {@link IGASContext#isDirectedTraversal()}. * * TODO We may need to set dynamically when visting the vertex in the * frontier rather than having it be a one-time property of the vertex @@ -60,8 +62,10 @@ EdgesEnum getGatherEdges(); /** - * Return the set of edges to which the SCATTER is applied -or- - * {@link EdgesEnum#NoEdges} to skip the SCATTER phase. + * Return the set of edges to which the SCATTER is applied for a + * <em>directed</em> graph -or- {@link EdgesEnum#NoEdges} to skip the + * SCATTER phase. This will be interpreted based on the value reported by + * {@link IGASContext#isDirectedTraversal()}. */ EdgesEnum getScatterEdges(); Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -239,7 +239,9 @@ final IGASScheduler sch, final Value u, final Statement e) { // remote vertex state. - final VS otherState = state.getState(e.getObject()/* v */); + final Value v = state.getOtherVertex(u, e); + final VS otherState = state.getState(v); +// final VS otherState = state.getState(e.getObject()/* v */); // visit. if (otherState.visit(state.round() + 1, u/* predecessor */)) { @@ -249,7 +251,7 @@ * schedule for the next iteration. */ - sch.schedule(e.getObject()); + sch.schedule(v); } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -291,10 +291,6 @@ /** * The remote vertex is scheduled for activation unless it has already been * visited. - * <p> - * Note: We are scattering to out-edges. Therefore, this vertex is - * {@link Statement#getSubject()}. The remote vertex is - * {@link Statement#getObject()}. */ @Override public void scatter(final IGASState<CC.VS, CC.ES, Value> state, Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -296,9 +296,10 @@ public Integer gather(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u, final Statement e) { -// assert e.o().equals(u); +// assert e.getObject().equals(u); - final VS src = state.getState(e.getSubject()); +// final VS src = state.getState(e.getSubject()); + final VS src = state.getState(u); final int d = src.dist(); @@ -420,7 +421,7 @@ + newDist); // Then add the remote vertex to the next frontier. - sch.schedule(e.getObject()); + sch.schedule(other); } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -32,7 +32,6 @@ import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASState; -import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.util.VertexDistribution; /** @@ -65,6 +64,8 @@ * The default implementation returns {@link #getGatherEdges()} and the * {@link #getScatterEdges()} if {@link #getGatherEdges()} returns * {@value EdgesEnum#NoEdges}. + * + * TODO This ignores {@link IGASContext#isDirectedTraversal()} */ @Override public EdgesEnum getSampleEdgesFilter() { Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -19,6 +19,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -63,6 +64,12 @@ private final IGASProgram<VS, ES, ST> program; /** + * Whether or not the edges of the graph will be traversed with directed + * graph semantics (default is TRUE). + */ + private final AtomicBoolean directedGraph = new AtomicBoolean(true); + + /** * The maximum number of iterations (defaults to {@link Integer#MAX_VALUE}). */ private final AtomicInteger maxIterations = new AtomicInteger( @@ -251,8 +258,12 @@ * APPLY is done before the SCATTER - this would not work if we pushed * down the APPLY into the SCATTER). */ - final EdgesEnum gatherEdges = program.getGatherEdges(); - final EdgesEnum scatterEdges = program.getScatterEdges(); + final EdgesEnum gatherEdges = isDirectedTraversal() ? program + .getGatherEdges() : program.getGatherEdges() + .asUndirectedTraversal(); + final EdgesEnum scatterEdges = isDirectedTraversal() ? program + .getScatterEdges() : program.getScatterEdges() + .asUndirectedTraversal(); final boolean pushDownApplyInGather; final boolean pushDownApplyInScatter; final boolean runApplyStage; @@ -805,6 +816,20 @@ } @Override + public boolean isDirectedTraversal() { + + return directedGraph.get(); + + } + + @Override + public void setDirectedTraversal(final boolean newVal) { + + directedGraph.set(newVal); + + } + + @Override public int getMaxIterations() { return maxIterations.get(); Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java =================================================================== --- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -102,4 +102,151 @@ } + /** + * Variant test in which we choose a vertex (<code>foaf:person</code>) in + * the middle of the graph and insist on directed edges. Since the edges + * point from the person to the <code>foaf:person</code> vertex, this BSF + * traversal does not discover any connected vertices. + */ + public void testBFS_directed() throws Exception { + + final SmallGraphProblem p = setupSmallGraphProblem(); + + final IGASEngine gasEngine = getGraphFixture() + .newGASEngine(1/* nthreads */); + + try { + + final SailConnection cxn = getGraphFixture().getSail() + .getConnection(); + + try { + + final IGraphAccessor graphAccessor = getGraphFixture() + .newGraphAccessor(cxn); + + final IGASContext<BFS.VS, BFS.ES, Void> gasContext = gasEngine + .newGASContext(graphAccessor, new BFS()); + + final IGASState<BFS.VS, BFS.ES, Void> gasState = gasContext + .getGASState(); + + // Initialize the froniter. + gasState.setFrontier(gasContext, p.getFoafPerson()); + + // directed traversal. + gasContext.setDirectedTraversal(true); + + // Converge. + gasContext.call(); + + // starting vertex at (0,null). + assertEquals(0, gasState.getState(p.getFoafPerson()).depth()); + assertEquals(null, gasState.getState(p.getFoafPerson()) + .predecessor()); + + // no other vertices are visited. + assertEquals(-1, gasState.getState(p.getMike()).depth()); + assertEquals(null, gasState.getState(p.getMike()).predecessor()); + + assertEquals(-1, gasState.getState(p.getBryan()).depth()); + assertEquals(null, gasState.getState(p.getBryan()) + .predecessor()); + + assertEquals(-1, gasState.getState(p.getMartyn()).depth()); + assertEquals(null, gasState.getState(p.getMartyn()) + .predecessor()); + + } finally { + + try { + cxn.rollback(); + } finally { + cxn.close(); + } + + } + + } finally { + + gasEngine.shutdownNow(); + + } + + } + + /** + * Variant test in which we choose a vertex (<code>foaf:person</code>) in + * the middle of the graph and insist on directed edges. Since the edges + * point from the person to the <code>foaf:person</code> vertex, this BSF + * traversal does not discover any connected vertices. + */ + public void testBFS_undirected() throws Exception { + + final SmallGraphProblem p = setupSmallGraphProblem(); + + final IGASEngine gasEngine = getGraphFixture() + .newGASEngine(1/* nthreads */); + + try { + + final SailConnection cxn = getGraphFixture().getSail() + .getConnection(); + + try { + + final IGraphAccessor graphAccessor = getGraphFixture() + .newGraphAccessor(cxn); + + final IGASContext<BFS.VS, BFS.ES, Void> gasContext = gasEngine + .newGASContext(graphAccessor, new BFS()); + + final IGASState<BFS.VS, BFS.ES, Void> gasState = gasContext + .getGASState(); + + // Initialize the froniter. + gasState.setFrontier(gasContext, p.getFoafPerson()); + + // undirected traversal. + gasContext.setDirectedTraversal(false); + + // Converge. + gasContext.call(); + + // starting vertex at (0,null). + assertEquals(0, gasState.getState(p.getFoafPerson()).depth()); + assertEquals(null, gasState.getState(p.getFoafPerson()) + .predecessor()); + + // All other vertices are 1-hop. + assertEquals(1, gasState.getState(p.getMike()).depth()); + assertEquals(p.getFoafPerson(), gasState.getState(p.getMike()) + .predecessor()); + + assertEquals(1, gasState.getState(p.getBryan()).depth()); + assertEquals(p.getFoafPerson(), gasState.getState(p.getBryan()) + .predecessor()); + + assertEquals(1, gasState.getState(p.getMartyn()).depth()); + assertEquals(p.getFoafPerson(), gasState + .getState(p.getMartyn()).predecessor()); + + } finally { + + try { + cxn.rollback(); + } finally { + cxn.close(); + } + + } + + } finally { + + gasEngine.shutdownNow(); + + } + + } + } Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-14 23:21:07 UTC (rev 7971) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-14 23:44:38 UTC (rev 7972) @@ -185,6 +185,16 @@ int DEFAULT_NTHREADS = 4; /** + * This option determines whether the traversal of the graph will + * interpret the edges as directed or undirected. + * + * @see IGASContext#setDirectedTraversal(boolean) + */ + URI DIRECTED_TRAVERSAL = new URIImpl(NAMESPACE + "directedTraversal"); + + boolean DEFAULT_DIRECTED_TRAVERSAL = true; + + /** * The maximum #of iterations for the GAS program (optional, default * {@value #DEFAULT_MAX_ITERATIONS}). * @@ -375,6 +385,7 @@ // options extracted from the SERVICE's graph pattern. private final int nthreads; + private final boolean directedTraversal; private final int maxIterations; private final int maxVisited; private final URI linkType, linkAttrType; @@ -408,6 +419,11 @@ store.getValueFactory().createLiteral( Options.DEFAULT_NTHREADS))).intValue(); + this.directedTraversal = ((Literal) getOnlyArg(Options.PROGRAM, + Options.DIRECTED_TRAVERSAL, store.getValueFactory() + .createLiteral(Options.DEFAULT_DIRECTED_TRAVERSAL))) + .booleanValue(); + this.maxIterations = ((Literal) getOnlyArg(Options.PROGRAM, Options.MAX_ITERATIONS, store.getValueFactory() .createLiteral(Options.DEFAULT_MAX_ITERATIONS))) @@ -728,6 +744,8 @@ final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext( graphAccessor, gasProgram); + gasContext.setDirectedTraversal(directedTraversal); + gasContext.setMaxIterations(maxIterations); gasContext.setMaxVisited(maxVisited); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |