|
From: <tho...@us...> - 2014-03-15 13:31:38
|
Revision: 7981
http://sourceforge.net/p/bigdata/code/7981
Author: thompsonbry
Date: 2014-03-15 13:31:34 +0000 (Sat, 15 Mar 2014)
Log Message:
-----------
Added the IPredecessor interface. This interface can be used to remove vertices from the visited vertex set if they do not lie along a path to a specified target vertex. This interface is only supported by BFS right now since SSSP does not yet support the concept of a predecessor (we need to reimplement SSSP as a push-style scatter).
See #810 (GAS Service)
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
Added Paths:
-----------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2014-03-15 10:58:29 UTC (rev 7980)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2014-03-15 13:31:34 UTC (rev 7981)
@@ -15,6 +15,8 @@
*/
package com.bigdata.rdf.graph;
+import java.util.Set;
+
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
@@ -260,4 +262,12 @@
*/
int compareTo(Value u, Value v);
+ /**
+ * Retain only those vertices in the visited set that are found in the
+ * specified collection.
+ *
+ * @param retainSet The set of vertices to be retained.
+ */
+ void retainAll(Set<Value> retainSet);
+
}
Added: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java (rev 0)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java 2014-03-15 13:31:34 UTC (rev 7981)
@@ -0,0 +1,45 @@
+/**
+ Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.bigdata.rdf.graph;
+
+import org.openrdf.model.Value;
+
+/**
+ * A interface for {@link IGASProgram}s that compute paths and track a
+ * predecessor relationship among the visited vertices. This interface can be
+ * used to eliminate vertices from the visited set that are not on a path to a
+ * set of specified target vertices.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ */
+public interface IPredecessor<VS, ES, ST> {
+
+ /**
+ * Remove any vertices from the visited set that do not line on path that
+ * leads to at least one of the target vertices.
+ *
+ * @param ctx
+ * The {@link IGASContext}.
+ * @param targetVertices
+ * An array of zero or more target vertices.
+ *
+ * @throws IllegalArgumentException
+ * if either argument is <code>null</code>.
+ */
+ public void prunePaths(final IGASContext<VS, ES, ST> ctx,
+ final Value[] targetVertices);
+
+}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-15 10:58:29 UTC (rev 7980)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-15 13:31:34 UTC (rev 7981)
@@ -15,12 +15,10 @@
*/
package com.bigdata.rdf.graph.analytics;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.openrdf.model.Statement;
@@ -35,7 +33,7 @@
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASScheduler;
import com.bigdata.rdf.graph.IGASState;
-import com.bigdata.rdf.graph.IReducer;
+import com.bigdata.rdf.graph.IPredecessor;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
/**
@@ -46,7 +44,8 @@
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*/
-public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> {
+public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> implements
+ IPredecessor<BFS.VS, BFS.ES, Void> {
// private static final Logger log = Logger.getLogger(BFS.class);
@@ -339,68 +338,112 @@
}
- /**
- * Reduce the active vertex state, returning a histogram reporting the #of
- * vertices at each distance from the starting vertex. There will always be
- * one vertex at depth zero - this is the starting vertex. For each
- * successive depth, the #of vertices that were labeled at that depth is
- * reported. This is essentially the same as reporting the size of the
- * frontier in each round of the traversal, but the histograph is reported
- * based on the vertex state.
- *
- * @author <a href="mailto:tho...@us...">Bryan
- * Thompson</a>
- *
- * TODO Do another reducer that reports the actual BFS tree rather
- * than a histogram. We need to store the predecessor for this. That
- * will allow us to trivially report the BFS route between any two
- * vertices.
+// /**
+// * Reduce the active vertex state, returning a histogram reporting the #of
+// * vertices at each distance from the starting vertex. There will always be
+// * one vertex at depth zero - this is the starting vertex. For each
+// * successive depth, the #of vertices that were labeled at that depth is
+// * reported. This is essentially the same as reporting the size of the
+// * frontier in each round of the traversal, but the histograph is reported
+// * based on the vertex state.
+// *
+// * @author <a href="mailto:tho...@us...">Bryan
+// * Thompson</a>
+// */
+// protected static class HistogramReducer implements
+// IReducer<VS, ES, Void, Map<Integer, AtomicLong>> {
+//
+// private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>();
+//
+// @Override
+// public void visit(final IGASState<VS, ES, Void> state, final Value u) {
+//
+// final VS us = state.getState(u);
+//
+// if (us != null) {
+//
+// final Integer depth = Integer.valueOf(us.depth());
+//
+// AtomicLong newval = values.get(depth);
+//
+// if (newval == null) {
+//
+// final AtomicLong oldval = values.putIfAbsent(depth,
+// newval = new AtomicLong());
+//
+// if (oldval != null) {
+//
+// // lost data race.
+// newval = oldval;
+//
+// }
+//
+// }
+//
+// newval.incrementAndGet();
+//
+// }
+//
+// }
+//
+// @Override
+// public Map<Integer, AtomicLong> get() {
+//
+// return Collections.unmodifiableMap(values);
+//
+// }
+//
+// }
+
+ /*
+ * TODO Do this in parallel for each specified target vertex.
*/
- protected static class HistogramReducer implements
- IReducer<VS, ES, Void, Map<Integer, AtomicLong>> {
+ @Override
+ public void prunePaths(final IGASContext<VS, ES, Void> ctx,
+ final Value[] targetVertices) {
- private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>();
+ if (ctx == null)
+ throw new IllegalArgumentException();
- @Override
- public void visit(final IGASState<VS, ES, Void> state, final Value u) {
+ if (targetVertices == null)
+ throw new IllegalArgumentException();
+
+ final IGASState<BFS.VS, BFS.ES, Void> gasState = ctx.getGASState();
- final VS us = state.getState(u);
+ final Set<Value> retainSet = new HashSet<Value>();
- if (us != null) {
+ for (Value v : targetVertices) {
- final Integer depth = Integer.valueOf(us.depth());
+ if (!gasState.isVisited(v)) {
- AtomicLong newval = values.get(depth);
+ // This target was not reachable.
+ continue;
- if (newval == null) {
+ }
- final AtomicLong oldval = values.putIfAbsent(depth,
- newval = new AtomicLong());
+ /*
+ * Walk the precessors back to a starting vertex.
+ */
+ Value current = v;
- if (oldval != null) {
+ while (current != null) {
- // lost data race.
- newval = oldval;
+ retainSet.add(current);
- }
+ final BFS.VS currentState = gasState.getState(current);
- }
+ final Value predecessor = currentState.predecessor();
- newval.incrementAndGet();
+ current = predecessor;
}
-
- }
-
- @Override
- public Map<Integer, AtomicLong> get() {
-
- return Collections.unmodifiableMap(values);
- }
+ } // next target vertex.
+ gasState.retainAll(retainSet);
+
}
-
+
// @Override
// public <T> IReducer<VS, ES, Void, T> getDefaultAfterOp() {
//
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-03-15 10:58:29 UTC (rev 7980)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-03-15 13:31:34 UTC (rev 7981)
@@ -241,7 +241,25 @@
}
+ /*
+ * TODO batch parallel in java 8.
+ */
@Override
+ public void retainAll(final Set<Value> retainSet) {
+
+ for (Value v : vertexState.keySet()) {
+
+ if (!retainSet.contains(v)) {
+
+ vertexState.remove(v);
+
+ }
+
+ }
+
+ }
+
+ @Override
public int round() {
return round.get();
Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
===================================================================
--- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-15 10:58:29 UTC (rev 7980)
+++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-15 13:31:34 UTC (rev 7981)
@@ -53,7 +53,10 @@
import com.bigdata.rdf.graph.IGASState;
import com.bigdata.rdf.graph.IGASStats;
import com.bigdata.rdf.graph.IGraphAccessor;
+import com.bigdata.rdf.graph.IPredecessor;
import com.bigdata.rdf.graph.IReducer;
+import com.bigdata.rdf.graph.analytics.CC;
+import com.bigdata.rdf.graph.analytics.PR;
import com.bigdata.rdf.graph.impl.GASEngine;
import com.bigdata.rdf.graph.impl.GASState;
import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor;
@@ -112,41 +115,20 @@
* }
* </pre>
*
- * FIXME Also allow the execution of gas workflows, such as FuzzySSSP. A workflow
- * would be more along the lines of a Callable, but one where the initial source
- * and/or target vertices could be identified. Or have an interface that wraps
- * the analytics (including things like FuzzySSSP) so they can declare their own
- * arguments for invocation as a SERVICE.
+ * FIXME Also allow the execution of gas workflows, such as FuzzySSSP. A
+ * workflow would be more along the lines of a Callable, but one where the
+ * initial source and/or target vertices could be identified. Or have an
+ * interface that wraps the analytics (including things like FuzzySSSP) so they
+ * can declare their own arguments for invocation as a SERVICE.
*
* TODO The input frontier could be a variable, in which case we would pull out
* the column for that variable rather than running the algorithm once per
* source binding set, right? Or maybe not.
*
- * TODO Allow {@link IReducer} that binds the visited vertex and also the
- * dynamic state associated with that vertex. For BFS and SSSP, this could be
- * depth/distance and the predecessor (for path information). For BFS and SSSP,
- * we could also have a specific target vertex (or vertices) and then report out
- * the path for that vertex/vertices. This would significantly reduce the data
- * reported back. (Could we run SSSP in both directions to accelerate the
- * convergence?)
+ * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*
- * TODO Also support export. This could be easily done using a SPARQL SELECT
- *
- * <pre>
- * SELECT ?src ?tgt ?edgeWeight {
- * <<?src linkType ?tgt> propertyType ?edgeWeight>
- * }
- * </pre>
- *
- * or (if you have a simple topology without edge weights)
- *
- * <pre>
- * SELECT ?src ?tgt bind(?edgeWeight,1) {
- * ?src linkType ?tgt
- * }
- * </pre>
- *
- * @author <a href="mailto:tho...@us...">Bryan Thompson</a>
+ * @see <a href="http://wiki.bigdata.com/wiki/index.php/RDF_GAS_API">RDF GAS
+ * API</a>
*/
public class GASService implements CustomServiceFactory {
@@ -248,11 +230,42 @@
Class<? extends IGASSchedulerImpl> DEFAULT_SCHEDULER = CHMScheduler.class;
/**
- * Magic predicate used to specify a vertex in the initial frontier.
+ * Magic predicate used to specify one (or more) vertices in the initial
+ * frontier.
+ * <p>
+ * Note: Algorithms such as {@link CC} and {@link PR} automatically
+ * place all vertices into the initial frontier. For such algorithms,
+ * you do not need to specify {@link #IN}.
*/
URI IN = new URIImpl(NAMESPACE + "in");
/**
+ * Magic predicate used to specify one (or more) target vertices. This
+ * may be used in combination with algorithms that compute paths in a
+ * graph to filter the visited vertices after the traversal in order to
+ * remove any vertex that is not part of a path to one or more of the
+ * specified target vertices.
+ * <p>
+ * In order to support this, the algorithm has to have a concept of a
+ * <code>predecessor</code>. For each <code>target</code>, the set of
+ * visited vertices is checked to see if the target was reachable. If it
+ * was reachable, then the predecessors are walked backwards until a
+ * starting vertex is reached (predecessor:=null). Each such predecessor
+ * is added to a list of vertices to be retained. This is repeated for
+ * each target. Once we have identified the combined list of vertices to
+ * be reained, all vertices NOT in that list are removed from the
+ * visited vertex state. This causes the algorithm to only report on
+ * those paths that lead to at least one of the specified target
+ * vertices.
+ * <p>
+ * Note: If you do not care about the distance between two vertices, but
+ * only whether they are reachable from one another, you can put both
+ * vertices into the initial frontier. The algorithm will then work from
+ * both points which can accelerate convergence.
+ */
+ URI TARGET = new URIImpl(NAMESPACE + "target");
+
+ /**
* Magic predicate used to specify a variable that will become bound to
* each vertex in the visited set for the analytic. {@link #OUT} is
* always bound to the visited vertices. The other "out" variables are
@@ -392,6 +405,7 @@
private final Class<IGASProgram<VS, ES, ST>> gasClass;
private final Class<IGASSchedulerImpl> schedulerClass;
private final Value[] initialFrontier;
+ private final Value[] targetVertices;
private final IVariable<?>[] outVars;
public GASServiceCall(final AbstractTripleStore store,
@@ -506,6 +520,9 @@
// Initial frontier.
this.initialFrontier = getArg(Options.PROGRAM, Options.IN);
+ // Target vertices
+ this.targetVertices = getArg(Options.PROGRAM, Options.TARGET);
+
/*
* The output variable (bound to the visited set).
*
@@ -760,10 +777,6 @@
final IGASState<VS, ES, ST> gasState = gasContext.getGASState();
- // TODO We should look at this when extracting the parameters from the SERVICE's graph pattern.
-// final FrontierEnum frontierEnum = gasProgram
-// .getInitialFrontierEnum();
-
if (initialFrontier != null) {
/*
@@ -774,16 +787,9 @@
* necessary since this is an internal, high performance,
* and close to the indices operation.
*/
- final IV[] tmp = new IV[initialFrontier.length];
-
- // Setup the initial frontier.
- int i = 0;
- for (Value startingVertex : initialFrontier) {
-
- tmp[i++] = ((BigdataValue) startingVertex).getIV();
-
- }
-
+ @SuppressWarnings("rawtypes")
+ final IV[] tmp = toIV(initialFrontier);
+
// set the frontier.
gasState.setFrontier(gasContext, tmp);
@@ -792,6 +798,32 @@
// Run the analytic.
final IGASStats stats = (IGASStats) gasContext.call();
+ if (targetVertices != null
+ && gasProgram instanceof IPredecessor) {
+
+ /*
+ * Remove vertices from the visited set that are not on a
+ * path leading to at least one of the specified target
+ * vertices.
+ *
+ * FIXME Why can't we pass in the Value (with a defined IV)
+ * and not the IV? This should work. Passing in the IV is
+ * against the grain of the API and the generalized
+ * abstraction as Values. Of course, having the IV is
+ * necessary since this is an internal, high performance,
+ * and close to the indices operation.
+ */
+
+ @SuppressWarnings("rawtypes")
+ final IV[] tmp = toIV(targetVertices);
+
+ @SuppressWarnings("unchecked")
+ final IPredecessor<VS, ES, ST> t = (IPredecessor<VS, ES, ST>) gasProgram;
+
+ t.prunePaths(gasContext, tmp);
+
+ }
+
if (log.isInfoEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("GAS");
@@ -828,6 +860,27 @@
}
/**
+ * Convert a {@link Value}[] of {@link BigdataValue} instances into an
+ * {@link IV}[].
+ */
+ private static IV[] toIV(final Value[] values) {
+
+ @SuppressWarnings("rawtypes")
+ final IV[] tmp = new IV[values.length];
+
+ // Setup the initial frontier.
+ int i = 0;
+ for (Value v : values) {
+
+ tmp[i++] = ((BigdataValue) v).getIV();
+
+ }
+
+ return tmp;
+
+ }
+
+ /**
* Class used to report {@link IBindingSet}s to the {@link GASService}.
* {@link IGASProgram}s can customize the way in which they interpret
* the declared variables by subclassing this class.
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|