From: <mrp...@us...> - 2014-04-03 21:17:43
|
Revision: 8036 http://sourceforge.net/p/bigdata/code/8036 Author: mrpersonick Date: 2014-04-03 21:17:40 +0000 (Thu, 03 Apr 2014) Log Message: ----------- added an analytic to produce a connected subgraph between a source and one or more targets Added Paths: ----------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PATHS.java Added: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PATHS.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PATHS.java (rev 0) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PATHS.java 2014-04-03 21:17:40 UTC (rev 8036) @@ -0,0 +1,561 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.analytics; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; + +import com.bigdata.rdf.graph.EdgesEnum; +import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; +import com.bigdata.rdf.graph.IBinder; +import com.bigdata.rdf.graph.IBindingExtractor; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IPredecessor; +import com.bigdata.rdf.graph.impl.BaseGASProgram; + +/** + * Breadth First Search (BFS) is an iterative graph traversal primitive. The + * frontier is expanded iteratively until no new vertices are discovered. Each + * visited vertex is marked with the round (origin ZERO) in which it was + * visited. This is its distance from the initial frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class PATHS extends BaseGASProgram<PATHS.VS, PATHS.ES, Void> implements + IPredecessor<PATHS.VS, PATHS.ES, Void> { + + private static final Logger log = Logger.getLogger(PATHS.class); + + public static class VS { + + /** + * <code>-1</code> until visited. When visited, set to the current round + * in order to assign each vertex its traversal depth. + * <p> + * Note: It is possible that the same vertex may be visited multiple + * times in a given expansion (from one or more source vertices that all + * target the same destination vertex). However, in this case the same + * value will be assigned by each visitor. Thus, synchronization is only + * required for visibility of the update within the round. As long as + * one thread reports that it modified the depth, the vertex will be + * scheduled. + */ + private final AtomicInteger depth = new AtomicInteger(-1); + + /** + * The predecessors are the first source vertex to visit a given target + * vertex. + */ + private final Set<Value> predecessors = + Collections.synchronizedSet(new LinkedHashSet<Value>()); + + /** + * The depth at which this vertex was first visited (origin ZERO) and + * <code>-1</code> if the vertex has not been visited. + */ + public int depth() { +// synchronized (this) { + return depth.get(); +// } + } + + /** + * Return the first vertex to discover this vertex during BFS traversal. + */ + public Set<Value> predecessors() { + + return predecessors; + + } + + /** + * Note: This marks the vertex at the current traversal depth. + * + * @return <code>true</code> if the vertex was visited for the first + * time in this round and the calling thread is the thread that + * first visited the vertex (this helps to avoid multiple + * scheduling of a vertex). + */ + public boolean visit(final int depth, final Value predecessor) { + if (predecessor != null) + this.predecessors.add(predecessor); + if (this.depth.compareAndSet(-1/* expect */, depth/* newValue */)) { + // Scheduled by this thread. + return true; + } + return false; +// synchronized (this) { +// if (this.depth == -1) { +// this.depth = depth; +// return true; +// } +// return false; +// } + } + + @Override + public String toString() { + return "{depth=" + depth() + "}"; + } + + }// class VS + + /** + * Edge state is not used. + */ + public static class ES { + + } + + private static final Factory<Value, PATHS.VS> vertexStateFactory = new Factory<Value, PATHS.VS>() { + + @Override + public PATHS.VS initialValue(final Value value) { + + return new VS(); + + } + + }; + + @Override + public Factory<Value, PATHS.VS> getVertexStateFactory() { + + return vertexStateFactory; + + } + + @Override + public Factory<Statement, PATHS.ES> getEdgeStateFactory() { + + return null; + + } + + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.SingleVertex; + + } + + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.NoEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.OutEdges; + + } + + /** + * Not used. + */ + @Override + public void initVertex(final IGASContext<PATHS.VS, PATHS.ES, Void> ctx, + final IGASState<PATHS.VS, PATHS.ES, Void> state, final Value u) { + + state.getState(u).visit(0, null/* predecessor */); + + } + + /** + * Not used. + */ + @Override + public Void gather(IGASState<PATHS.VS, PATHS.ES, Void> state, Value u, Statement e) { + + throw new UnsupportedOperationException(); + + } + + /** + * Not used. + */ + @Override + public Void sum(final IGASState<PATHS.VS, PATHS.ES, Void> state, + final Void left, final Void right) { + + throw new UnsupportedOperationException(); + + } + + /** + * NOP + */ + @Override + public PATHS.VS apply(final IGASState<PATHS.VS, PATHS.ES, Void> state, final Value u, + final Void sum) { + + return null; + + } + + /** + * Returns <code>true</code>. + */ + @Override + public boolean isChanged(IGASState<VS, ES, Void> state, Value u) { + + return true; + + } + + /** + * The remote vertex is scheduled for activation unless it has already been + * visited. + * <p> + * Note: We are scattering to out-edges. Therefore, this vertex is + * {@link Statement#getSubject()}. The remote vertex is + * {@link Statement#getObject()}. + */ + @Override + public void scatter(final IGASState<PATHS.VS, PATHS.ES, Void> state, + final IGASScheduler sch, final Value u, final Statement e) { + +// if (state.getTargetVertices().contains(u)) { +// // don't schedule any more vertices, we've hit a target +// return; +// } + + // remote vertex state. + final Value v = state.getOtherVertex(u, e); + + final VS otherState = state.getState(v); +// final VS otherState = state.getState(e.getObject()/* v */); + + // visit. + if (otherState.visit(state.round() + 1, u/* predecessor */)) { + + /* + * This is the first visit for the remote vertex. Add it to the + * schedule for the next iteration. + */ + + sch.schedule(v); + + } + + } + + @Override + public boolean nextRound(final IGASContext<PATHS.VS, PATHS.ES, Void> ctx) { + + return true; + + } + + /** + * {@inheritDoc} + * <p> + * <dl> + * <dt>{@value Bindings#DEPTH}</dt> + * <dd>The depth at which the vertex was first encountered during traversal. + * </dd> + * <dt>{@value Bindings#PREDECESSOR}</dt> + * <dd>The predecessor is the first vertex that discovers a given vertex + * during traversal.</dd> + * </dl> + */ + @Override + public List<IBinder<PATHS.VS, PATHS.ES, Void>> getBinderList() { + + final List<IBinder<PATHS.VS, PATHS.ES, Void>> tmp = super.getBinderList(); + + tmp.add(new IBinder<PATHS.VS, PATHS.ES, Void>() { + + @Override + public int getIndex() { + return Bindings.DEPTH; + } + + @Override + public Value bind(final ValueFactory vf, + final IGASState<PATHS.VS, PATHS.ES, Void> state, final Value u) { + + return vf.createLiteral(state.getState(u).depth.get()); + + } + }); + +// tmp.add(new IBinder<PATHS.VS, PATHS.ES, Void>() { +// +// @Override +// public int getIndex() { +// return Bindings.PREDECESSOR; +// } +// +// @Override +// public Value bind(final ValueFactory vf, +// final IGASState<PATHS.VS, PATHS.ES, Void> state, final Value u) { +// +// return state.getState(u).predecessor.get(); +// +// } +// }); + + return tmp; + + } + + /** + * Additional {@link IBindingExtractor.IBinder}s exposed by {@link PATHS}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + public interface Bindings extends BaseGASProgram.Bindings { + + /** + * The depth at which the vertex was visited. + */ + int DEPTH = 1; + +// /** +// * The BFS predecessor is the first vertex to discover a given vertex. +// * +// */ +// int PREDECESSOR = 2; + + } + +// /** +// * Reduce the active vertex state, returning a histogram reporting the #of +// * vertices at each distance from the starting vertex. There will always be +// * one vertex at depth zero - this is the starting vertex. For each +// * successive depth, the #of vertices that were labeled at that depth is +// * reported. This is essentially the same as reporting the size of the +// * frontier in each round of the traversal, but the histograph is reported +// * based on the vertex state. +// * +// * @author <a href="mailto:tho...@us...">Bryan +// * Thompson</a> +// */ +// protected static class HistogramReducer implements +// IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { +// +// private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>(); +// +// @Override +// public void visit(final IGASState<VS, ES, Void> state, final Value u) { +// +// final VS us = state.getState(u); +// +// if (us != null) { +// +// final Integer depth = Integer.valueOf(us.depth()); +// +// AtomicLong newval = values.get(depth); +// +// if (newval == null) { +// +// final AtomicLong oldval = values.putIfAbsent(depth, +// newval = new AtomicLong()); +// +// if (oldval != null) { +// +// // lost data race. +// newval = oldval; +// +// } +// +// } +// +// newval.incrementAndGet(); +// +// } +// +// } +// +// @Override +// public Map<Integer, AtomicLong> get() { +// +// return Collections.unmodifiableMap(values); +// +// } +// +// } + + /* + * TODO Do this in parallel for each specified target vertex. + */ + @Override + public void prunePaths(final IGASContext<VS, ES, Void> ctx, + final Value[] targetVertices) { + + if (ctx == null) + throw new IllegalArgumentException(); + + if (targetVertices == null) + throw new IllegalArgumentException(); + + final IGASState<PATHS.VS, PATHS.ES, Void> gasState = ctx.getGASState(); + +// for (Value v : gasState.values()) { +// log.trace(v); +// } + + final Set<Value> retainSet = new HashSet<Value>(); + + for (Value v : targetVertices) { + + if (!gasState.isVisited(v)) { + + // This target was not reachable. + continue; + + } + + /* + * Walk the precessors back to a starting vertex. + */ + retainSet.add(v); + + visitPredecessors(gasState, v, retainSet); + +// Value current = v; +// +// while (current != null) { +// +// retainSet.add(current); +// +// final PATHS.VS currentState = gasState.getState(current); +// +// final Value predecessor = currentState.predecessor(); +// +// current = predecessor; +// +// } + + } // next target vertex. + + gasState.retainAll(retainSet); + + } + + protected void visitPredecessors( + final IGASState<PATHS.VS, PATHS.ES, Void> gasState, final Value v, + final Set<Value> retainSet) { + + final PATHS.VS currentState = gasState.getState(v); + + for (Value pred : currentState.predecessors()) { + + if (pred == null) { + + continue; + + } + + if (retainSet.contains(pred)) { + + continue; + + } + + retainSet.add(pred); + + visitPredecessors(gasState, pred, retainSet); + + } + + } + +// @Override +// public <T> IReducer<VS, ES, Void, T> getDefaultAfterOp() { +// +// class NV implements Comparable<NV> { +// public final int n; +// public final long v; +// public NV(final int n, final long v) { +// this.n = n; +// this.v = v; +// } +// @Override +// public int compareTo(final NV o) { +// if (o.n > this.n) +// return -1; +// if (o.n < this.n) +// return 1; +// return 0; +// } +// } +// +// final IReducer<VS, ES, Void, T> outerReducer = new IReducer<VS, ES, Void, T>() { +// +// final HistogramReducer innerReducer = new HistogramReducer(); +// +// @Override +// public void visit(IGASState<VS, ES, Void> state, Value u) { +// +// innerReducer.visit(state, u); +// +// } +// +// @Override +// public T get() { +// +// final Map<Integer, AtomicLong> h = innerReducer.get(); +// +// final NV[] a = new NV[h.size()]; +// +// int i = 0; +// +// for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) { +// +// a[i++] = new NV(e.getKey().intValue(), e.getValue().get()); +// +// } +// +// Arrays.sort(a); +// +// System.out.println("distance, frontierSize, sumFrontierSize"); +// long sum = 0L; +// for (NV t : a) { +// +// System.out.println(t.n + ", " + t.v + ", " + sum); +// +// sum += t.v; +// +// } +// +// return null; +// } +// +// }; +// +// return outerReducer; +// +// } + +} Property changes on: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PATHS.java ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +text/plain \ No newline at end of property This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |