From: <tho...@us...> - 2014-03-15 13:31:38
|
Revision: 7981 http://sourceforge.net/p/bigdata/code/7981 Author: thompsonbry Date: 2014-03-15 13:31:34 +0000 (Sat, 15 Mar 2014) Log Message: ----------- Added the IPredecessor interface. This interface can be used to remove vertices from the visited vertex set if they do not lie along a path to a specified target vertex. This interface is only supported by BFS right now since SSSP does not yet support the concept of a predecessor (we need to reimplement SSSP as a push-style scatter). See #810 (GAS Service) Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java Added Paths: ----------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2014-03-15 10:58:29 UTC (rev 7980) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2014-03-15 13:31:34 UTC (rev 7981) @@ -15,6 +15,8 @@ */ package com.bigdata.rdf.graph; +import java.util.Set; + import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; @@ -260,4 +262,12 @@ */ int compareTo(Value u, Value v); + /** + * Retain only those vertices in the visited set that are found in the + * specified collection. + * + * @param retainSet The set of vertices to be retained. + */ + void retainAll(Set<Value> retainSet); + } Added: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java (rev 0) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IPredecessor.java 2014-03-15 13:31:34 UTC (rev 7981) @@ -0,0 +1,45 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph; + +import org.openrdf.model.Value; + +/** + * A interface for {@link IGASProgram}s that compute paths and track a + * predecessor relationship among the visited vertices. This interface can be + * used to eliminate vertices from the visited set that are not on a path to a + * set of specified target vertices. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public interface IPredecessor<VS, ES, ST> { + + /** + * Remove any vertices from the visited set that do not line on path that + * leads to at least one of the target vertices. + * + * @param ctx + * The {@link IGASContext}. + * @param targetVertices + * An array of zero or more target vertices. + * + * @throws IllegalArgumentException + * if either argument is <code>null</code>. + */ + public void prunePaths(final IGASContext<VS, ES, ST> ctx, + final Value[] targetVertices); + +} Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-15 10:58:29 UTC (rev 7980) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-03-15 13:31:34 UTC (rev 7981) @@ -15,12 +15,10 @@ */ package com.bigdata.rdf.graph.analytics; -import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.openrdf.model.Statement; @@ -35,7 +33,7 @@ import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; -import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.IPredecessor; import com.bigdata.rdf.graph.impl.BaseGASProgram; /** @@ -46,7 +44,8 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ -public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> { +public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> implements + IPredecessor<BFS.VS, BFS.ES, Void> { // private static final Logger log = Logger.getLogger(BFS.class); @@ -339,68 +338,112 @@ } - /** - * Reduce the active vertex state, returning a histogram reporting the #of - * vertices at each distance from the starting vertex. There will always be - * one vertex at depth zero - this is the starting vertex. For each - * successive depth, the #of vertices that were labeled at that depth is - * reported. This is essentially the same as reporting the size of the - * frontier in each round of the traversal, but the histograph is reported - * based on the vertex state. - * - * @author <a href="mailto:tho...@us...">Bryan - * Thompson</a> - * - * TODO Do another reducer that reports the actual BFS tree rather - * than a histogram. We need to store the predecessor for this. That - * will allow us to trivially report the BFS route between any two - * vertices. +// /** +// * Reduce the active vertex state, returning a histogram reporting the #of +// * vertices at each distance from the starting vertex. There will always be +// * one vertex at depth zero - this is the starting vertex. For each +// * successive depth, the #of vertices that were labeled at that depth is +// * reported. This is essentially the same as reporting the size of the +// * frontier in each round of the traversal, but the histograph is reported +// * based on the vertex state. +// * +// * @author <a href="mailto:tho...@us...">Bryan +// * Thompson</a> +// */ +// protected static class HistogramReducer implements +// IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { +// +// private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>(); +// +// @Override +// public void visit(final IGASState<VS, ES, Void> state, final Value u) { +// +// final VS us = state.getState(u); +// +// if (us != null) { +// +// final Integer depth = Integer.valueOf(us.depth()); +// +// AtomicLong newval = values.get(depth); +// +// if (newval == null) { +// +// final AtomicLong oldval = values.putIfAbsent(depth, +// newval = new AtomicLong()); +// +// if (oldval != null) { +// +// // lost data race. +// newval = oldval; +// +// } +// +// } +// +// newval.incrementAndGet(); +// +// } +// +// } +// +// @Override +// public Map<Integer, AtomicLong> get() { +// +// return Collections.unmodifiableMap(values); +// +// } +// +// } + + /* + * TODO Do this in parallel for each specified target vertex. */ - protected static class HistogramReducer implements - IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { + @Override + public void prunePaths(final IGASContext<VS, ES, Void> ctx, + final Value[] targetVertices) { - private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>(); + if (ctx == null) + throw new IllegalArgumentException(); - @Override - public void visit(final IGASState<VS, ES, Void> state, final Value u) { + if (targetVertices == null) + throw new IllegalArgumentException(); + + final IGASState<BFS.VS, BFS.ES, Void> gasState = ctx.getGASState(); - final VS us = state.getState(u); + final Set<Value> retainSet = new HashSet<Value>(); - if (us != null) { + for (Value v : targetVertices) { - final Integer depth = Integer.valueOf(us.depth()); + if (!gasState.isVisited(v)) { - AtomicLong newval = values.get(depth); + // This target was not reachable. + continue; - if (newval == null) { + } - final AtomicLong oldval = values.putIfAbsent(depth, - newval = new AtomicLong()); + /* + * Walk the precessors back to a starting vertex. + */ + Value current = v; - if (oldval != null) { + while (current != null) { - // lost data race. - newval = oldval; + retainSet.add(current); - } + final BFS.VS currentState = gasState.getState(current); - } + final Value predecessor = currentState.predecessor(); - newval.incrementAndGet(); + current = predecessor; } - - } - - @Override - public Map<Integer, AtomicLong> get() { - - return Collections.unmodifiableMap(values); - } + } // next target vertex. + gasState.retainAll(retainSet); + } - + // @Override // public <T> IReducer<VS, ES, Void, T> getDefaultAfterOp() { // Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-03-15 10:58:29 UTC (rev 7980) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-03-15 13:31:34 UTC (rev 7981) @@ -241,7 +241,25 @@ } + /* + * TODO batch parallel in java 8. + */ @Override + public void retainAll(final Set<Value> retainSet) { + + for (Value v : vertexState.keySet()) { + + if (!retainSet.contains(v)) { + + vertexState.remove(v); + + } + + } + + } + + @Override public int round() { return round.get(); Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-15 10:58:29 UTC (rev 7980) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-03-15 13:31:34 UTC (rev 7981) @@ -53,7 +53,10 @@ import com.bigdata.rdf.graph.IGASState; import com.bigdata.rdf.graph.IGASStats; import com.bigdata.rdf.graph.IGraphAccessor; +import com.bigdata.rdf.graph.IPredecessor; import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.analytics.CC; +import com.bigdata.rdf.graph.analytics.PR; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.GASState; import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; @@ -112,41 +115,20 @@ * } * </pre> * - * FIXME Also allow the execution of gas workflows, such as FuzzySSSP. A workflow - * would be more along the lines of a Callable, but one where the initial source - * and/or target vertices could be identified. Or have an interface that wraps - * the analytics (including things like FuzzySSSP) so they can declare their own - * arguments for invocation as a SERVICE. + * FIXME Also allow the execution of gas workflows, such as FuzzySSSP. A + * workflow would be more along the lines of a Callable, but one where the + * initial source and/or target vertices could be identified. Or have an + * interface that wraps the analytics (including things like FuzzySSSP) so they + * can declare their own arguments for invocation as a SERVICE. * * TODO The input frontier could be a variable, in which case we would pull out * the column for that variable rather than running the algorithm once per * source binding set, right? Or maybe not. * - * TODO Allow {@link IReducer} that binds the visited vertex and also the - * dynamic state associated with that vertex. For BFS and SSSP, this could be - * depth/distance and the predecessor (for path information). For BFS and SSSP, - * we could also have a specific target vertex (or vertices) and then report out - * the path for that vertex/vertices. This would significantly reduce the data - * reported back. (Could we run SSSP in both directions to accelerate the - * convergence?) + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * - * TODO Also support export. This could be easily done using a SPARQL SELECT - * - * <pre> - * SELECT ?src ?tgt ?edgeWeight { - * <<?src linkType ?tgt> propertyType ?edgeWeight> - * } - * </pre> - * - * or (if you have a simple topology without edge weights) - * - * <pre> - * SELECT ?src ?tgt bind(?edgeWeight,1) { - * ?src linkType ?tgt - * } - * </pre> - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @see <a href="http://wiki.bigdata.com/wiki/index.php/RDF_GAS_API">RDF GAS + * API</a> */ public class GASService implements CustomServiceFactory { @@ -248,11 +230,42 @@ Class<? extends IGASSchedulerImpl> DEFAULT_SCHEDULER = CHMScheduler.class; /** - * Magic predicate used to specify a vertex in the initial frontier. + * Magic predicate used to specify one (or more) vertices in the initial + * frontier. + * <p> + * Note: Algorithms such as {@link CC} and {@link PR} automatically + * place all vertices into the initial frontier. For such algorithms, + * you do not need to specify {@link #IN}. */ URI IN = new URIImpl(NAMESPACE + "in"); /** + * Magic predicate used to specify one (or more) target vertices. This + * may be used in combination with algorithms that compute paths in a + * graph to filter the visited vertices after the traversal in order to + * remove any vertex that is not part of a path to one or more of the + * specified target vertices. + * <p> + * In order to support this, the algorithm has to have a concept of a + * <code>predecessor</code>. For each <code>target</code>, the set of + * visited vertices is checked to see if the target was reachable. If it + * was reachable, then the predecessors are walked backwards until a + * starting vertex is reached (predecessor:=null). Each such predecessor + * is added to a list of vertices to be retained. This is repeated for + * each target. Once we have identified the combined list of vertices to + * be reained, all vertices NOT in that list are removed from the + * visited vertex state. This causes the algorithm to only report on + * those paths that lead to at least one of the specified target + * vertices. + * <p> + * Note: If you do not care about the distance between two vertices, but + * only whether they are reachable from one another, you can put both + * vertices into the initial frontier. The algorithm will then work from + * both points which can accelerate convergence. + */ + URI TARGET = new URIImpl(NAMESPACE + "target"); + + /** * Magic predicate used to specify a variable that will become bound to * each vertex in the visited set for the analytic. {@link #OUT} is * always bound to the visited vertices. The other "out" variables are @@ -392,6 +405,7 @@ private final Class<IGASProgram<VS, ES, ST>> gasClass; private final Class<IGASSchedulerImpl> schedulerClass; private final Value[] initialFrontier; + private final Value[] targetVertices; private final IVariable<?>[] outVars; public GASServiceCall(final AbstractTripleStore store, @@ -506,6 +520,9 @@ // Initial frontier. this.initialFrontier = getArg(Options.PROGRAM, Options.IN); + // Target vertices + this.targetVertices = getArg(Options.PROGRAM, Options.TARGET); + /* * The output variable (bound to the visited set). * @@ -760,10 +777,6 @@ final IGASState<VS, ES, ST> gasState = gasContext.getGASState(); - // TODO We should look at this when extracting the parameters from the SERVICE's graph pattern. -// final FrontierEnum frontierEnum = gasProgram -// .getInitialFrontierEnum(); - if (initialFrontier != null) { /* @@ -774,16 +787,9 @@ * necessary since this is an internal, high performance, * and close to the indices operation. */ - final IV[] tmp = new IV[initialFrontier.length]; - - // Setup the initial frontier. - int i = 0; - for (Value startingVertex : initialFrontier) { - - tmp[i++] = ((BigdataValue) startingVertex).getIV(); - - } - + @SuppressWarnings("rawtypes") + final IV[] tmp = toIV(initialFrontier); + // set the frontier. gasState.setFrontier(gasContext, tmp); @@ -792,6 +798,32 @@ // Run the analytic. final IGASStats stats = (IGASStats) gasContext.call(); + if (targetVertices != null + && gasProgram instanceof IPredecessor) { + + /* + * Remove vertices from the visited set that are not on a + * path leading to at least one of the specified target + * vertices. + * + * FIXME Why can't we pass in the Value (with a defined IV) + * and not the IV? This should work. Passing in the IV is + * against the grain of the API and the generalized + * abstraction as Values. Of course, having the IV is + * necessary since this is an internal, high performance, + * and close to the indices operation. + */ + + @SuppressWarnings("rawtypes") + final IV[] tmp = toIV(targetVertices); + + @SuppressWarnings("unchecked") + final IPredecessor<VS, ES, ST> t = (IPredecessor<VS, ES, ST>) gasProgram; + + t.prunePaths(gasContext, tmp); + + } + if (log.isInfoEnabled()) { final StringBuilder sb = new StringBuilder(); sb.append("GAS"); @@ -828,6 +860,27 @@ } /** + * Convert a {@link Value}[] of {@link BigdataValue} instances into an + * {@link IV}[]. + */ + private static IV[] toIV(final Value[] values) { + + @SuppressWarnings("rawtypes") + final IV[] tmp = new IV[values.length]; + + // Setup the initial frontier. + int i = 0; + for (Value v : values) { + + tmp[i++] = ((BigdataValue) v).getIV(); + + } + + return tmp; + + } + + /** * Class used to report {@link IBindingSet}s to the {@link GASService}. * {@link IGASProgram}s can customize the way in which they interpret * the declared variables by subclassing this class. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |