|
From: <tho...@us...> - 2014-03-14 23:44:41
|
Revision: 7972
http://sourceforge.net/p/bigdata/code/7972
Author: thompsonbry
Date: 2014-03-14 23:44:38 +0000 (Fri, 14 Mar 2014)
Log Message:
-----------
Added support for the control over directed versus undirected edge traversal semantics. There is a new isDirectedTraversal() option and setDirectedTraversal() option on IGASContext and a gas:directedTraversal option for the GASService.
I have written a test of this functionality for BFS. I found and fixed some assumptions in BFS and SSP where they used e.getSubject() or e.getObject() rather than u and gasState.getOtherVertex(u,e). The former do not correctly handle the case where the traversal assumptions change from either in-edges or out-edges to all-edges. The latter (u and getOtherVertex(u,e)) does.
See #810 (GAS Service)
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java
branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/EdgesEnum.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -83,4 +83,27 @@
}
}
+ /**
+ * Promote an {@link EdgesEnum} value that was specified with the assumption
+ * that the graph is directed into an {@link EdgesEnum} value that should be
+ * used when the graph is undirected. There is no change for
+ * {@link #NoEdges} and {@link #AllEdges}. If the value is either
+ * {@link #InEdges} or {@link #OutEdges} then it is promoted to
+ * {@link #AllEdges}.
+ */
+ public EdgesEnum asUndirectedTraversal() {
+ switch (this) {
+ case NoEdges:
+ case AllEdges:
+ // No change.
+ return this;
+ case InEdges:
+ case OutEdges:
+ // promote to AllEdges.
+ return AllEdges;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
}
\ No newline at end of file
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -67,6 +67,22 @@
IGraphAccessor getGraphAccessor();
/**
+ * Specify whether the visited edges of the graph are to be interpreted as
+ * directed or undirected (default <code>directed</code>).
+ * <p>
+ * The value specified here is used to determine how the {@link EdgesEnum}
+ * will be interpreted for the GATHER and SCATTER phases. See
+ * {@link EdgesEnum#asUndirectedTraversal()}.
+ */
+ void setDirectedTraversal(boolean newVal);
+
+ /**
+ * Return <code>true</code> if the graph should be interpreted as a directed
+ * graph.
+ */
+ boolean isDirectedTraversal();
+
+ /**
* Specify the maximum number of iterations for the algorithm. A value of
* ONE means that the algorithm will halt after the first round.
*
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -50,8 +50,10 @@
EdgesEnum getSampleEdgesFilter();
/**
- * Return the set of edges to which the GATHER is applied -or-
- * {@link EdgesEnum#NoEdges} to skip the GATHER phase.
+ * Return the set of edges to which the GATHER is applied for a
+ * <em>directed</em> graph -or- {@link EdgesEnum#NoEdges} to skip the GATHER
+ * phase. This will be interpreted based on the value reported by
+ * {@link IGASContext#isDirectedTraversal()}.
*
* TODO We may need to set dynamically when visting the vertex in the
* frontier rather than having it be a one-time property of the vertex
@@ -60,8 +62,10 @@
EdgesEnum getGatherEdges();
/**
- * Return the set of edges to which the SCATTER is applied -or-
- * {@link EdgesEnum#NoEdges} to skip the SCATTER phase.
+ * Return the set of edges to which the SCATTER is applied for a
+ * <em>directed</em> graph -or- {@link EdgesEnum#NoEdges} to skip the
+ * SCATTER phase. This will be interpreted based on the value reported by
+ * {@link IGASContext#isDirectedTraversal()}.
*/
EdgesEnum getScatterEdges();
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -239,7 +239,9 @@
final IGASScheduler sch, final Value u, final Statement e) {
// remote vertex state.
- final VS otherState = state.getState(e.getObject()/* v */);
+ final Value v = state.getOtherVertex(u, e);
+ final VS otherState = state.getState(v);
+// final VS otherState = state.getState(e.getObject()/* v */);
// visit.
if (otherState.visit(state.round() + 1, u/* predecessor */)) {
@@ -249,7 +251,7 @@
* schedule for the next iteration.
*/
- sch.schedule(e.getObject());
+ sch.schedule(v);
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -291,10 +291,6 @@
/**
* The remote vertex is scheduled for activation unless it has already been
* visited.
- * <p>
- * Note: We are scattering to out-edges. Therefore, this vertex is
- * {@link Statement#getSubject()}. The remote vertex is
- * {@link Statement#getObject()}.
*/
@Override
public void scatter(final IGASState<CC.VS, CC.ES, Value> state,
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -296,9 +296,10 @@
public Integer gather(final IGASState<SSSP.VS, SSSP.ES, Integer> state,
final Value u, final Statement e) {
-// assert e.o().equals(u);
+// assert e.getObject().equals(u);
- final VS src = state.getState(e.getSubject());
+// final VS src = state.getState(e.getSubject());
+ final VS src = state.getState(u);
final int d = src.dist();
@@ -420,7 +421,7 @@
+ newDist);
// Then add the remote vertex to the next frontier.
- sch.schedule(e.getObject());
+ sch.schedule(other);
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -32,7 +32,6 @@
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGASState;
-import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
/**
@@ -65,6 +64,8 @@
* The default implementation returns {@link #getGatherEdges()} and the
* {@link #getScatterEdges()} if {@link #getGatherEdges()} returns
* {@value EdgesEnum#NoEdges}.
+ *
+ * TODO This ignores {@link IGASContext#isDirectedTraversal()}
*/
@Override
public EdgesEnum getSampleEdgesFilter() {
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -19,6 +19,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -63,6 +64,12 @@
private final IGASProgram<VS, ES, ST> program;
/**
+ * Whether or not the edges of the graph will be traversed with directed
+ * graph semantics (default is TRUE).
+ */
+ private final AtomicBoolean directedGraph = new AtomicBoolean(true);
+
+ /**
* The maximum number of iterations (defaults to {@link Integer#MAX_VALUE}).
*/
private final AtomicInteger maxIterations = new AtomicInteger(
@@ -251,8 +258,12 @@
* APPLY is done before the SCATTER - this would not work if we pushed
* down the APPLY into the SCATTER).
*/
- final EdgesEnum gatherEdges = program.getGatherEdges();
- final EdgesEnum scatterEdges = program.getScatterEdges();
+ final EdgesEnum gatherEdges = isDirectedTraversal() ? program
+ .getGatherEdges() : program.getGatherEdges()
+ .asUndirectedTraversal();
+ final EdgesEnum scatterEdges = isDirectedTraversal() ? program
+ .getScatterEdges() : program.getScatterEdges()
+ .asUndirectedTraversal();
final boolean pushDownApplyInGather;
final boolean pushDownApplyInScatter;
final boolean runApplyStage;
@@ -805,6 +816,20 @@
}
@Override
+ public boolean isDirectedTraversal() {
+
+ return directedGraph.get();
+
+ }
+
+ @Override
+ public void setDirectedTraversal(final boolean newVal) {
+
+ directedGraph.set(newVal);
+
+ }
+
+ @Override
public int getMaxIterations() {
return maxIterations.get();
Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -102,4 +102,151 @@
}
+ /**
+ * Variant test in which we choose a vertex (<code>foaf:person</code>) in
+ * the middle of the graph and insist on directed edges. Since the edges
+ * point from the person to the <code>foaf:person</code> vertex, this BSF
+ * traversal does not discover any connected vertices.
+ */
+ public void testBFS_directed() throws Exception {
+
+ final SmallGraphProblem p = setupSmallGraphProblem();
+
+ final IGASEngine gasEngine = getGraphFixture()
+ .newGASEngine(1/* nthreads */);
+
+ try {
+
+ final SailConnection cxn = getGraphFixture().getSail()
+ .getConnection();
+
+ try {
+
+ final IGraphAccessor graphAccessor = getGraphFixture()
+ .newGraphAccessor(cxn);
+
+ final IGASContext<BFS.VS, BFS.ES, Void> gasContext = gasEngine
+ .newGASContext(graphAccessor, new BFS());
+
+ final IGASState<BFS.VS, BFS.ES, Void> gasState = gasContext
+ .getGASState();
+
+ // Initialize the froniter.
+ gasState.setFrontier(gasContext, p.getFoafPerson());
+
+ // directed traversal.
+ gasContext.setDirectedTraversal(true);
+
+ // Converge.
+ gasContext.call();
+
+ // starting vertex at (0,null).
+ assertEquals(0, gasState.getState(p.getFoafPerson()).depth());
+ assertEquals(null, gasState.getState(p.getFoafPerson())
+ .predecessor());
+
+ // no other vertices are visited.
+ assertEquals(-1, gasState.getState(p.getMike()).depth());
+ assertEquals(null, gasState.getState(p.getMike()).predecessor());
+
+ assertEquals(-1, gasState.getState(p.getBryan()).depth());
+ assertEquals(null, gasState.getState(p.getBryan())
+ .predecessor());
+
+ assertEquals(-1, gasState.getState(p.getMartyn()).depth());
+ assertEquals(null, gasState.getState(p.getMartyn())
+ .predecessor());
+
+ } finally {
+
+ try {
+ cxn.rollback();
+ } finally {
+ cxn.close();
+ }
+
+ }
+
+ } finally {
+
+ gasEngine.shutdownNow();
+
+ }
+
+ }
+
+ /**
+ * Variant test in which we choose a vertex (<code>foaf:person</code>) in
+ * the middle of the graph and insist on directed edges. Since the edges
+ * point from the person to the <code>foaf:person</code> vertex, this BSF
+ * traversal does not discover any connected vertices.
+ */
+ public void testBFS_undirected() throws Exception {
+
+ final SmallGraphProblem p = setupSmallGraphProblem();
+
+ final IGASEngine gasEngine = getGraphFixture()
+ .newGASEngine(1/* nthreads */);
+
+ try {
+
+ final SailConnection cxn = getGraphFixture().getSail()
+ .getConnection();
+
+ try {
+
+ final IGraphAccessor graphAccessor = getGraphFixture()
+ .newGraphAccessor(cxn);
+
+ final IGASContext<BFS.VS, BFS.ES, Void> gasContext = gasEngine
+ .newGASContext(graphAccessor, new BFS());
+
+ final IGASState<BFS.VS, BFS.ES, Void> gasState = gasContext
+ .getGASState();
+
+ // Initialize the froniter.
+ gasState.setFrontier(gasContext, p.getFoafPerson());
+
+ // undirected traversal.
+ gasContext.setDirectedTraversal(false);
+
+ // Converge.
+ gasContext.call();
+
+ // starting vertex at (0,null).
+ assertEquals(0, gasState.getState(p.getFoafPerson()).depth());
+ assertEquals(null, gasState.getState(p.getFoafPerson())
+ .predecessor());
+
+ // All other vertices are 1-hop.
+ assertEquals(1, gasState.getState(p.getMike()).depth());
+ assertEquals(p.getFoafPerson(), gasState.getState(p.getMike())
+ .predecessor());
+
+ assertEquals(1, gasState.getState(p.getBryan()).depth());
+ assertEquals(p.getFoafPerson(), gasState.getState(p.getBryan())
+ .predecessor());
+
+ assertEquals(1, gasState.getState(p.getMartyn()).depth());
+ assertEquals(p.getFoafPerson(), gasState
+ .getState(p.getMartyn()).predecessor());
+
+ } finally {
+
+ try {
+ cxn.rollback();
+ } finally {
+ cxn.close();
+ }
+
+ }
+
+ } finally {
+
+ gasEngine.shutdownNow();
+
+ }
+
+ }
+
}
Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
===================================================================
--- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-14 23:21:07 UTC (rev 7971)
+++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-14 23:44:38 UTC (rev 7972)
@@ -185,6 +185,16 @@
int DEFAULT_NTHREADS = 4;
/**
+ * This option determines whether the traversal of the graph will
+ * interpret the edges as directed or undirected.
+ *
+ * @see IGASContext#setDirectedTraversal(boolean)
+ */
+ URI DIRECTED_TRAVERSAL = new URIImpl(NAMESPACE + "directedTraversal");
+
+ boolean DEFAULT_DIRECTED_TRAVERSAL = true;
+
+ /**
* The maximum #of iterations for the GAS program (optional, default
* {@value #DEFAULT_MAX_ITERATIONS}).
*
@@ -375,6 +385,7 @@
// options extracted from the SERVICE's graph pattern.
private final int nthreads;
+ private final boolean directedTraversal;
private final int maxIterations;
private final int maxVisited;
private final URI linkType, linkAttrType;
@@ -408,6 +419,11 @@
store.getValueFactory().createLiteral(
Options.DEFAULT_NTHREADS))).intValue();
+ this.directedTraversal = ((Literal) getOnlyArg(Options.PROGRAM,
+ Options.DIRECTED_TRAVERSAL, store.getValueFactory()
+ .createLiteral(Options.DEFAULT_DIRECTED_TRAVERSAL)))
+ .booleanValue();
+
this.maxIterations = ((Literal) getOnlyArg(Options.PROGRAM,
Options.MAX_ITERATIONS, store.getValueFactory()
.createLiteral(Options.DEFAULT_MAX_ITERATIONS)))
@@ -728,6 +744,8 @@
final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext(
graphAccessor, gasProgram);
+ gasContext.setDirectedTraversal(directedTraversal);
+
gasContext.setMaxIterations(maxIterations);
gasContext.setMaxVisited(maxVisited);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|