From: <tho...@us...> - 2014-02-24 01:52:19
|
Revision: 7878 http://sourceforge.net/p/bigdata/code/7878 Author: thompsonbry Date: 2014-02-24 01:52:13 +0000 (Mon, 24 Feb 2014) Log Message: ----------- Checkpoint on the GASService. See #810. Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestGather.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/PrefixDeclProcessor.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -18,6 +18,11 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import org.openrdf.model.URI; +import org.openrdf.model.Value; + +import cutthecrap.utils.striterators.IStriterator; + /** * Execution context for an {@link IGASProgram}. This is distinct from the * {@link IGASEngine} so we can support distributed evaluation and concurrent @@ -36,6 +41,15 @@ * the generic type for the per-edge state, but that is not always * true. The SUM type is scoped to the GATHER + SUM operation (NOT * the computation). + * + * TODO Add option to order the vertices to provide a serializable + * execution plan (like GraphChi). I believe that this reduces to + * computing a DAG over the frontier before executing the GATHER and + * then executing the frontier such that the parallel execution is + * constrained by arcs in the DAG that do not have mutual + * dependencies. This would have to place a partial ordering over the + * vertices in the frontier and then process the frontier with + * limited parallelism based on that partial ordering. */ public interface IGASContext<VS, ES, ST> extends Callable<IGASStats> { @@ -90,6 +104,73 @@ int getMaxVisited(); /** + * Return non-<code>null</code> iff there is a single link type to be + * visited. This corresponds to a view of the graph as sparse connectivity + * matrix. The {@link IGASEngine} can optimize traversal patterns using the + * <code>POS</code> index. + * <p> + * Note: When this option is used, the scatter and gather will not visit the + * property set for the vertex. Instead, the graph is treated as if it were + * an unattributed graph and only mined for the connectivity data. + * + * @return The {@link Value} for the predicate that identifies the desired + * link type (there can be many types of links - the return value + * specifies which attribute is of interest). + * + * FIXME define getLinkAttribType() (RDR) + */ + URI getLinkType(); + + /** + * Set an optional constraint on the type of the visited links. + * <p> + * Note: When this option is used, the scatter and gather will not visit the + * property set for the vertex. Instead, the graph is treated as if it were + * an unattributed graph and only mined for the connectivity data (which may + * include a link weight). + * + * @param linkType + * The link type to visit (optional). When <code>null</code>, all + * link types are visited. + */ + void setLinkType(URI linkType); + + /** + * Set an optional {@link IReducer} that will run after the + * {@link IGASProgram} is terminated. This may be used to extract results + * from the visited vertices. + * + * @param afterOp + * The {@link IReducer}. + */ + <T> void setRunAfterOp(IReducer<VS, ES, ST, T> afterOp); + + /** + * Return an optional {@link IReducer} that will run after the + * {@link IGASProgram} is terminated. This may be used to extract results + * from the visited vertices. + */ + <T> IReducer<VS, ES, ST, T> getRunAfterOp(); + + /** + * Hook to impose a constraint on the visited edges and/or property values. + * + * @param itr + * The iterator visiting those edges and/or property values. + * + * @return Either the same iterator or a constrained iterator. + * + * TODO Rename as constrainEdgeFilter or even split into a + * constrainGatherFilter and a constraintScatterFilter. + * + * FIXME APPLY : If we need access to the vertex property values in + * APPLY (which we probably do, at least optionally), then there + * should be a similar method to decide whether the property values + * for the vertex are made available during the APPLY. + */ + IStriterator constrainFilter(IStriterator eitr); + + /** * Execute one iteration. * * @param stats Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -16,24 +16,15 @@ package com.bigdata.rdf.graph; import org.openrdf.model.Statement; -import org.openrdf.model.URI; import org.openrdf.model.Value; -import cutthecrap.utils.striterators.IStriterator; +import com.bigdata.rdf.graph.analytics.CC; +import com.bigdata.rdf.graph.impl.util.GASRunnerBase; /** * Interface for options that are understood by the {@link IGASEngine} and which * may be declared by the {@link IGASProgram}. * - * TODO Add option to order the vertices to provide a serializable execution - * plan (like GraphChi). I believe that this reduces to computing a DAG over the - * frontier before executing the GATHER and then executing the frontier such - * that the parallel execution is constrained by arcs in the DAG that do not - * have mutual dependencies. This is really an option that would be implemented - * by the {@link IGASContext}, which would have to place a partial ordering over - * the vertices in the frontier and then process the frontier with limited - * parallelism based on that partial ordering. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ public interface IGASOptions<VS, ES, ST> { @@ -51,6 +42,10 @@ * sample all vertices regardless of their edges, specify * {@value EdgesEnum#NoEdges}. To require that each vertex has at least one * in-edge and one out-edge, specify {@link EdgesEnum#AllEdges}. + * + * FIXME This should be moved into {@link GASRunnerBase}. The only class + * that customizes this is {@link CC}. (For {@link CC} we need to put all + * vertices into the frontier, even those without edges.) */ EdgesEnum getSampleEdgesFilter(); @@ -86,40 +81,4 @@ */ Factory<Statement, ES> getEdgeStateFactory(); - /** - * Return non-<code>null</code> iff there is a single link type to be - * visited. This corresponds to a view of the graph as sparse connectivity - * matrix. The {@link IGASEngine} can optimize traversal patterns using the - * <code>POS</code> index. - * <p> - * Note: When this option is used, the scatter and gather will not visit the - * property set for the vertex. The graph is treated as if it were an - * unattributed graph and only mined for the connectivity data. - * - * @return The {@link Value} for the predicate that identifies the desired - * link type (there can be many types of links - the return value - * specifies which attribute is of interest). - * - * @see #getLinkAttribType() - */ - URI getLinkType(); - - /** - * Hook to impose a constraint on the visited edges and/or property values. - * - * @param itr - * The iterator visiting those edges and/or property values. - * - * @return Either the same iterator or a constrained iterator. - * - * TODO Rename as constrainEdgeFilter or even split into a - * constrainGatherFilter and a constraintScatterFilter. - * - * FIXME APPLY : If we need access to the vertex property values in - * APPLY (which we probably do, at least optionally), then there - * should be a similar method to decide whether the property values - * for the vertex are made available during the APPLY. - */ - IStriterator constrainFilter(IGASContext<VS, ES, ST> ctx, IStriterator eitr); - } \ No newline at end of file Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -15,8 +15,11 @@ */ package com.bigdata.rdf.graph; +import java.util.List; + import org.openrdf.model.Statement; import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; /** * Abstract interface for GAS programs. @@ -51,12 +54,13 @@ void before(IGASContext<VS, ES, ST> ctx); /** - * One time initialization after the {@link IGASProgram} is executed. + * Return a default reduction that will be applied after the + * {@link IGASProgram} is executed. * - * @param ctx - * The evaluation context. + * @return The default reduction -or- <code>null</code> if no such reduction + * is defined. */ - void after(IGASContext<VS, ES, ST> ctx); + <T> IReducer<VS, ES, ST, T> getDefaultAfterOp(); /** * Callback to initialize the state for each vertex in the initial frontier @@ -200,5 +204,42 @@ * the frontier is non-empty). */ boolean nextRound(IGASContext<VS, ES, ST> ctx); + + /** + * Return a list of interfaces that may be used to extract variable bindings + * for the vertices visited by the algorithm. + */ + List<IBinder<VS, ES, ST>> getBinderList(); + /** + * An interface that may be used to extract variable bindings for the + * vertices visited by the algorithm. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public interface IBinder<VS, ES, ST> { + + /** + * The ordinal index of the variable that is bound by this + * {@link IBinder}. By convention, index ZERO is the vertex. Indices + * greater than ZERO are typically aspects of the state of the vertex. + */ + int getIndex(); + + /** + * @param vf + * The {@link ValueFactory} used to create the return + * {@link Value}. + * @param u + * The vertex. + * + * @return The {@link Value} for that ordinal variable or + * <code>null</code> if there is no binding for that ordinal + * variable. + */ + Value bind(ValueFactory vf, final IGASState<VS, ES, ST> state, Value u); + + } + } \ No newline at end of file Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -26,7 +26,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: IResultHandler.java 2265 2009-10-26 12:51:06Z thompsonbry $ */ -public interface IReducer<VS,ES, ST, T> { +public interface IReducer<VS, ES, ST, T> { /** * Method is invoked for each result and is responsible for combining the Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -15,8 +15,8 @@ */ package com.bigdata.rdf.graph.analytics; -import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -24,6 +24,7 @@ import org.openrdf.model.Statement; import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; @@ -34,8 +35,6 @@ import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.BaseGASProgram; -import cutthecrap.utils.striterators.IStriterator; - /** * Breadth First Search (BFS) is an iterative graph traversal primitive. The * frontier is expanded iteratively until no new vertices are discovered. Each @@ -158,19 +157,6 @@ } /** - * {@inheritDoc} - * <p> - * Overridden to only visit the edges of the graph. - */ - @Override - public IStriterator constrainFilter( - final IGASContext<BFS.VS, BFS.ES, Void> ctx, final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - /** * Not used. */ @Override @@ -260,6 +246,39 @@ } /** + * {@inheritDoc} + * <p> + * <dl> + * <dt>1</dt> + * <dd>The depth at which the vertex was first encountered during traversal.</dd> + * </dl> + */ + @Override + public List<IBinder<BFS.VS, BFS.ES, Void>> getBinderList() { + + final List<IBinder<BFS.VS, BFS.ES, Void>> tmp = super.getBinderList(); + + tmp.add(new IBinder<BFS.VS, BFS.ES, Void>() { + + @Override + public int getIndex() { + return 1; + } + + @Override + public Value bind(final ValueFactory vf, + final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { + + return vf.createLiteral(state.getState(u).depth.get()); + + } + }); + + return tmp; + + } + + /** * Reduce the active vertex state, returning a histogram reporting the #of * vertices at each distance from the starting vertex. There will always be * one vertex at depth zero - this is the starting vertex. For each @@ -272,11 +291,9 @@ * Thompson</a> * * TODO Do another reducer that reports the actual BFS tree rather - * than a histogram. For each depth, it needs to have the set of - * vertices that are at that number of hops from the starting - * vertex. So, there is an outer map from depth to set. The inner - * set should also be concurrent if we allow concurrent reduction of - * the activated vertex state. + * than a histogram. We need to store the predecessor for this. That + * will allow us to trivially report the BFS route between any two + * vertices. */ protected static class HistogramReducer implements IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { @@ -323,54 +340,71 @@ } - @Override - public void after(final IGASContext<BFS.VS, BFS.ES, Void> ctx) { +// @Override +// public <T> IReducer<VS, ES, Void, T> getDefaultAfterOp() { +// +// class NV implements Comparable<NV> { +// public final int n; +// public final long v; +// public NV(final int n, final long v) { +// this.n = n; +// this.v = v; +// } +// @Override +// public int compareTo(final NV o) { +// if (o.n > this.n) +// return -1; +// if (o.n < this.n) +// return 1; +// return 0; +// } +// } +// +// final IReducer<VS, ES, Void, T> outerReducer = new IReducer<VS, ES, Void, T>() { +// +// final HistogramReducer innerReducer = new HistogramReducer(); +// +// @Override +// public void visit(IGASState<VS, ES, Void> state, Value u) { +// +// innerReducer.visit(state, u); +// +// } +// +// @Override +// public T get() { +// +// final Map<Integer, AtomicLong> h = innerReducer.get(); +// +// final NV[] a = new NV[h.size()]; +// +// int i = 0; +// +// for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) { +// +// a[i++] = new NV(e.getKey().intValue(), e.getValue().get()); +// +// } +// +// Arrays.sort(a); +// +// System.out.println("distance, frontierSize, sumFrontierSize"); +// long sum = 0L; +// for (NV t : a) { +// +// System.out.println(t.n + ", " + t.v + ", " + sum); +// +// sum += t.v; +// +// } +// +// return null; +// } +// +// }; +// +// return outerReducer; +// +// } - final HistogramReducer r = new HistogramReducer(); - - ctx.getGASState().reduce(r); - - class NV implements Comparable<NV> { - public final int n; - public final long v; - public NV(final int n, final long v) { - this.n = n; - this.v = v; - } - @Override - public int compareTo(final NV o) { - if (o.n > this.n) - return -1; - if (o.n < this.n) - return 1; - return 0; - } - } - - final Map<Integer, AtomicLong> h = r.get(); - - final NV[] a = new NV[h.size()]; - - int i = 0; - - for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) { - - a[i++] = new NV(e.getKey().intValue(), e.getValue().get()); - - } - - Arrays.sort(a); - - System.out.println("distance, frontierSize, sumFrontierSize"); - long sum = 0L; - for (NV t : a) { - - System.out.println(t.n + ", " + t.v + ", " + sum); - - sum += t.v; - - } - - } - } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -15,7 +15,6 @@ */ package com.bigdata.rdf.graph.analytics; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,14 +28,11 @@ import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; import com.bigdata.rdf.graph.FrontierEnum; -import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.BaseGASProgram; -import cutthecrap.utils.striterators.IStriterator; - /** * Connected components computes the distinct sets of non-overlapping subgraphs * within a graph. All vertices within a connected component are connected along @@ -190,19 +186,6 @@ /** * {@inheritDoc} * <p> - * Overridden to only visit the edges of the graph. - */ - @Override - public IStriterator constrainFilter( - final IGASContext<CC.VS, CC.ES, Value> ctx, final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - /** - * {@inheritDoc} - * <p> * Return the label of the remote vertex. */ @Override @@ -325,87 +308,95 @@ * Returns a map containing the labels assigned to each connected component * (which gives you a vertex in that connected component) and the #of * vertices in each connected component. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - public Map<Value, AtomicInteger> getConnectedComponents( - final IGASState<CC.VS, CC.ES, Value> state) { + public class ConnectedComponentsReducer implements IReducer<CC.VS,CC.ES,Value,Map<Value,AtomicInteger>> { final ConcurrentHashMap<Value, AtomicInteger> labels = new ConcurrentHashMap<Value, AtomicInteger>(); - return state - .reduce(new IReducer<CC.VS, CC.ES, Value, Map<Value, AtomicInteger>>() { + @Override + public void visit(final IGASState<VS, ES, Value> state, final Value u) { - @Override - public void visit(final IGASState<VS, ES, Value> state, - final Value u) { + final VS us = state.getState(u); - final VS us = state.getState(u); + if (us != null) { - if (us != null) { + final Value label = us.getLabel(); - final Value label = us.getLabel(); + if (log.isDebugEnabled()) + log.debug("v=" + u + ", label=" + label); - if (log.isDebugEnabled()) - log.debug("v=" + u + ", label=" + label); + final AtomicInteger oldval = labels.putIfAbsent(label, + new AtomicInteger(1)); - final AtomicInteger oldval = labels.putIfAbsent( - label, new AtomicInteger(1)); + if (oldval != null) { - if (oldval != null) { + // lost race. increment existing counter. + oldval.incrementAndGet(); - // lost race. increment existing counter. - oldval.incrementAndGet(); - - } - - } + } - } + } - @Override - public Map<Value, AtomicInteger> get() { + } - return Collections.unmodifiableMap(labels); + @Override + public Map<Value, AtomicInteger> get() { - } - }); + return Collections.unmodifiableMap(labels); + } + } - @Override - public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) { + /** + * Returns a map containing the labels assigned to each connected component + * (which gives you a vertex in that connected component) and the #of + * vertices in each connected component. + */ + public Map<Value, AtomicInteger> getConnectedComponents( + final IGASState<CC.VS, CC.ES, Value> state) { - final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx - .getGASState()); - - System.out.println("There are " + labels.size() - + " connected components"); - - class NV implements Comparable<NV> { - public final int n; - public final Value v; - public NV(int n, Value v) { - this.n = n; - this.v = v; - } - @Override - public int compareTo(final NV o) { - return o.n - this.n; - } - } - - final NV[] a = new NV[labels.size()]; - int i = 0; - for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) { - a[i++] = new NV(e.getValue().intValue(), e.getKey()); - } - - Arrays.sort(a); - - System.out.println("size, label"); - for(NV t : a) { - System.out.println(t.n + ", " + t.v); - } - + return state.reduce(new ConnectedComponentsReducer()); } +// @Override +// public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) { +// +// final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx +// .getGASState()); +// +// System.out.println("There are " + labels.size() +// + " connected components"); +// +// class NV implements Comparable<NV> { +// public final int n; +// public final Value v; +// public NV(int n, Value v) { +// this.n = n; +// this.v = v; +// } +// @Override +// public int compareTo(final NV o) { +// return o.n - this.n; +// } +// } +// +// final NV[] a = new NV[labels.size()]; +// int i = 0; +// for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) { +// a[i++] = new NV(e.getValue().intValue(), e.getKey()); +// } +// +// Arrays.sort(a); +// +// System.out.println("size, label"); +// for(NV t : a) { +// System.out.println(t.n + ", " + t.v); +// } +// +// } + } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -15,7 +15,6 @@ */ package com.bigdata.rdf.graph.analytics; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -33,8 +32,6 @@ import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.BaseGASProgram; -import cutthecrap.utils.striterators.IStriterator; - /** * Page rank assigns weights to the vertices in a graph based by on the relative * "importance" as determined by the patterns of directed links in the graph. @@ -186,19 +183,6 @@ /** * {@inheritDoc} * <p> - * Overridden to only visit the edges of the graph. - */ - @Override - public IStriterator constrainFilter( - final IGASContext<PR.VS, PR.ES, Double> ctx, final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - /** - * {@inheritDoc} - * <p> * Each vertex is initialized to the reset probability. * * FIXME We need to do this efficiently. E.g., using a scan to find all of @@ -332,97 +316,107 @@ } - @Override - public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) { + /** + * Class reports a map containing the page rank associated with each visited + * vertex. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public class PageRankReducer implements IReducer<PR.VS, PR.ES, Double, Map<Value,Double>> { - final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>(); + private final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>(); + + @Override + public void visit(final IGASState<VS, ES, Double> state, + final Value u) { - ctx.getGASState().reduce( - new IReducer<PR.VS, PR.ES, Double, Map<Value, Double>>() { + final VS us = state.getState(u); - @Override - public void visit(final IGASState<VS, ES, Double> state, - final Value u) { + if (us != null) { - final VS us = state.getState(u); + final double pageRank = us.getValue(); - if (us != null) { + // FIXME Why are NaNs showing up? + if (Double.isNaN(pageRank)) + return; - final double pageRank = us.getValue(); + // FIXME Do infinite values show up? + if (Double.isInfinite(pageRank)) + return; + + if (pageRank < minPageRank) { + // Ignore small values. + return; + } - // FIXME Why are NaNs showing up? - if (Double.isNaN(pageRank)) - return; + /* + * Only report the larger ranked values. + */ - // FIXME Do infinite values show up? - if (Double.isInfinite(pageRank)) - return; - - if (pageRank < minPageRank) { - // Ignore small values. - return; - } + if (log.isDebugEnabled()) + log.debug("v=" + u + ", pageRank=" + pageRank); - /* - * Only report the larger ranked values. - */ + values.put(u, Double.valueOf(pageRank)); - if (log.isDebugEnabled()) - log.debug("v=" + u + ", pageRank=" + pageRank); - - values.put(u, Double.valueOf(pageRank)); - - } - - } - - @Override - public Map<Value, Double> get() { - - return Collections.unmodifiableMap(values); - - } - }); - - class NV implements Comparable<NV> { - public final double n; - public final Value v; - public NV(double n, Value v) { - this.n = n; - this.v = v; } - @Override - public int compareTo(final NV o) { - if (o.n > this.n) - return 1; - if (o.n < this.n) - return -1; - return 0; - } - } - final NV[] a = new NV[values.size()]; - - int i = 0; - - for (Map.Entry<Value, Double> e : values.entrySet()) { - - a[i++] = new NV(e.getValue().doubleValue(), e.getKey()); - } - Arrays.sort(a); + @Override + public Map<Value, Double> get() { - System.out.println("rank, pageRank, vertex"); - i = 0; - for (NV t : a) { + return Collections.unmodifiableMap(values); - System.out.println(i + ", " + t.n + ", " + t.v); - - i++; - } - + } + +// @Override +// public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) { +// +// final Map<Value, Double> values = ctx.getGASState().reduce( +// new PageRankReducer()); +// +// class NV implements Comparable<NV> { +// public final double n; +// public final Value v; +// public NV(double n, Value v) { +// this.n = n; +// this.v = v; +// } +// @Override +// public int compareTo(final NV o) { +// if (o.n > this.n) +// return 1; +// if (o.n < this.n) +// return -1; +// return 0; +// } +// } +// +// final NV[] a = new NV[values.size()]; +// +// int i = 0; +// +// for (Map.Entry<Value, Double> e : values.entrySet()) { +// +// a[i++] = new NV(e.getValue().doubleValue(), e.getKey()); +// +// } +// +// Arrays.sort(a); +// +// System.out.println("rank, pageRank, vertex"); +// i = 0; +// for (NV t : a) { +// +// System.out.println(i + ", " + t.n + ", " + t.v); +// +// i++; +// +// } +// +// } } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -15,9 +15,12 @@ */ package com.bigdata.rdf.graph.analytics; +import java.util.List; + import org.apache.log4j.Logger; import org.openrdf.model.Statement; import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; @@ -27,8 +30,6 @@ import com.bigdata.rdf.graph.IGASState; import com.bigdata.rdf.graph.impl.BaseGASProgram; -import cutthecrap.utils.striterators.IStriterator; - /** * SSSP (Single Source, Shortest Path). This analytic computes the shortest path * to each connected vertex in the graph starting from the given vertex. Only @@ -52,9 +53,10 @@ * phase is executed to update the state of the distinct vertices in the * frontier. * - * TODO Add a reducer to report the actual minimum length paths. This is - * similar to a BFS tree, but the path lengths are not integer values so - * we need a different data structure to collect them. + * FIXME Add a reducer to report the actual minimum length paths. This + * is similar to a BFS tree, but the path lengths are not integer values + * so we need a different data structure to collect them (we need to + * store the predecesor when we run SSSP to do this). */ public class SSSP extends BaseGASProgram<SSSP.VS, SSSP.ES, Integer/* dist */> { @@ -200,20 +202,6 @@ } /** - * {@inheritDoc} - * <p> - * Overridden to only visit the edges of the graph. - */ - @Override - public IStriterator constrainFilter( - final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx, - final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - /** * Set the {@link VS#dist()} to ZERO (0). * <p> * {@inheritDoc} @@ -394,4 +382,39 @@ } + /** + * {@inheritDoc} + * <p> + * <dl> + * <dt>1</dt> + * <dd>The shortest distance from the initial frontier to the vertex.</dd> + * </dl> + */ + @Override + public List<IBinder<SSSP.VS, SSSP.ES, Integer>> getBinderList() { + + final List<IBinder<SSSP.VS, SSSP.ES, Integer>> tmp = super + .getBinderList(); + + tmp.add(new IBinder<SSSP.VS, SSSP.ES, Integer>() { + + @Override + public int getIndex() { + return 1; + } + + @Override + public Value bind(final ValueFactory vf, + final IGASState<SSSP.VS, SSSP.ES, Integer> state, + final Value u) { + + return vf.createLiteral(state.getState(u).dist()); + + } + }); + + return tmp; + + } + } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -16,13 +16,15 @@ package com.bigdata.rdf.graph.impl; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Random; import org.apache.log4j.Logger; import org.openrdf.model.Resource; import org.openrdf.model.Statement; -import org.openrdf.model.URI; import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; @@ -30,12 +32,9 @@ import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.util.VertexDistribution; -import cutthecrap.utils.striterators.Filter; -import cutthecrap.utils.striterators.IFilter; -import cutthecrap.utils.striterators.IStriterator; - /** * Abstract base class with some useful defaults. * @@ -49,103 +48,6 @@ private static final Logger log = Logger.getLogger(BaseGASProgram.class); - /** - * {@inheritDoc} - * <p> - * The default implementation does not restrict the visitation to a - * connectivity matrix (returns <code>null</code>). - */ - @Override - public URI getLinkType() { - - return null; - - } - - /** - * {@inheritDoc} - * <p> - * The default implementation returns its argument. - */ - @Override - public IStriterator constrainFilter(final IGASContext<VS, ES, ST> ctx, - final IStriterator itr) { - - return itr; - - } - - /** - * Return an {@link IFilter} that will only visit the edges of the graph. - * - * @see IGASState#isEdge(Statement) - */ - protected IFilter getEdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) { - - return new EdgeOnlyFilter(ctx); - - } - - /** - * Filter visits only edges (filters out attribute values). - * <p> - * Note: This filter is pushed down onto the AP and evaluated close to the - * data. - */ - private class EdgeOnlyFilter extends Filter { - private static final long serialVersionUID = 1L; - private final IGASState<VS, ES, ST> gasState; - private EdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) { - this.gasState = ctx.getGASState(); - } - @Override - public boolean isValid(final Object e) { - return gasState.isEdge((Statement) e); - } - }; - - /** - * Return a filter that only visits the edges of graph that are instances of - * the specified link attribute type. - * <p> - * Note: For bigdata, the visited edges can be decoded to recover the - * original link as well. - * - * @see IGASState#isLinkAttrib(Statement, URI) - * @see IGASState#decodeStatement(Value) - */ - protected IFilter getLinkAttribFilter(final IGASContext<VS, ES, ST> ctx, - final URI linkAttribType) { - - return new LinkAttribFilter(ctx, linkAttribType); - - } - - /** - * Filter visits only edges where the {@link Statement} is an instance of - * the specified link attribute type. For bigdata, the visited edges can be - * decoded to recover the original link as well. - */ - private class LinkAttribFilter extends Filter { - private static final long serialVersionUID = 1L; - - private final IGASState<VS, ES, ST> gasState; - private final URI linkAttribType; - - public LinkAttribFilter(final IGASContext<VS, ES, ST> ctx, - final URI linkAttribType) { - if (linkAttribType == null) - throw new IllegalArgumentException(); - this.gasState = ctx.getGASState(); - this.linkAttribType = linkAttribType; - } - - @Override - public boolean isValid(final Object e) { - return gasState.isLinkAttrib((Statement) e, linkAttribType); - } - } - // /** // * If the vertex is actually an edge, then return the decoded edge. // * @@ -229,9 +131,9 @@ * The default implementation is a NOP. */ @Override - public void after(final IGASContext<VS, ES, ST> ctx) { + public <T> IReducer<VS, ES, ST, T> getDefaultAfterOp() { - // NOP + return null; // NOP } @@ -319,4 +221,49 @@ } + /** + * Return an {@link IBinder} for the vertex itself + */ + private IBinder<VS, ES, ST> getBinder0() { + + return new IBinder<VS, ES, ST>() { + + @Override + public int getIndex() { + + return 0; + + } + + @Override + public Value bind(final ValueFactory vf, + final IGASState<VS, ES, ST> state, final Value u) { + + return u; + + } + + }; + + } + + /** + * {@inheritDoc} + * <p> + * <dl> + * <dt>0</dt> + * <dd>The visited vertex itself.</dd> + * </dl> + */ + @Override + public List<IBinder<VS, ES, ST>> getBinderList() { + + final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>(); + + tmp.add(getBinder0()); + + return tmp; + + } + } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -20,9 +20,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.openrdf.model.Statement; +import org.openrdf.model.URI; import org.openrdf.model.Value; import com.bigdata.rdf.graph.EdgesEnum; @@ -36,6 +38,10 @@ import com.bigdata.rdf.graph.IStaticFrontier; import com.bigdata.rdf.graph.util.GASUtil; +import cutthecrap.utils.striterators.Filter; +import cutthecrap.utils.striterators.IFilter; +import cutthecrap.utils.striterators.IStriterator; + public class GASContext<VS, ES, ST> implements IGASContext<VS, ES, ST> { private static final Logger log = Logger.getLogger(GASContext.class); @@ -70,6 +76,18 @@ Integer.MAX_VALUE); /** + * An optional constraint on the type of the visited links. + */ + private final AtomicReference<URI> linkType = new AtomicReference<URI>(null); + + /** + * An optional {@link IReducer} that will executed after the + * {@link IGASProgram}. + */ + private final AtomicReference<IReducer<VS, ES, ST, ?>> afterOp = new AtomicReference<IReducer<VS, ES, ST, ?>>( + null); + + /** * * @param namespace * The namespace of the graph (KB instance). @@ -168,8 +186,19 @@ gasState.traceState(); - program.after(this); - + // Optional post-reduction. + { + + final IReducer<VS, ES, ST, ?> op = getRunAfterOp(); + + if (op != null) { + + gasState.reduce(op); + + } + + } + // Done return total; @@ -374,26 +403,93 @@ /** * Do APPLY. * - * TODO The apply() should be parallelized. For some algorithms, there is a - * moderate amount of work per vertex in apply(). Use {@link #nthreads} to - * set the parallelism. - * <p> - * Note: This is very similar to the {@link IGASState#reduce(IReducer)} - * operation. This operates over the frontier. reduce() operates over the - * activated vertices. Both need fine grained parallelism. Both can have - * either light or moderately heavy operations (a dot product would be an - * example of a heavier operation). + * @return The #of vertices for which the operation was executed. + * + * @throws Exception */ - private void apply(final IStaticFrontier f) { + private void apply(final IStaticFrontier f) throws Exception { - for (Value u : f) { +// for (Value u : f) { +// +// program.apply(gasState, u, null/* sum */); +// +// } - program.apply(gasState, u, null/* sum */); + // Note: Return value of ApplyReducer is currently ignored. + reduceOverFrontier(f, new ApplyReducer<Void>()); + + } + private class ApplyReducer<T> implements IReducer<VS, ES, ST, T> { + + @Override + public void visit(final IGASState<VS, ES, ST> state, final Value u) { + + program.apply(state, u, null/* sum */); + } + @Override + public T get() { + + // Note: Nothing returned right now. + return null; + + } + } + + /** + * Reduce over the frontier (used for apply()). + * + * @param f + * The frontier. + * @param op + * The {@link IReducer}. + * + * @return The {@link IReducer#get() result}. + * + * @throws Exception + */ + public <T> T reduceOverFrontier(final IStaticFrontier f, + final IReducer<VS, ES, ST, T> op) throws Exception { + if (f == null) + throw new IllegalArgumentException(); + + if (op == null) + throw new IllegalArgumentException(); + + class ReduceVertexTaskFactory implements VertexTaskFactory<Long> { + + @Override + public Callable<Long> newVertexTask(final Value u) { + + return new Callable<Long>() { + + @Override + public Long call() { + + // program.apply(gasState, u, null/* sum */); + op.visit(gasState, u); + + // Nothing returned by visit(). + return ONE; + + }; + }; + + }; + } + + gasEngine.newFrontierStrategy(new ReduceVertexTaskFactory(), f).call(); + + // Return reduction. + return op.get(); + + } + private static final Long ONE = Long.valueOf(1L); + /** * @param inEdges * when <code>true</code> the GATHER is over the in-edges. @@ -728,4 +824,122 @@ } + /** + * {@inheritDoc} + * <p> + * The default implementation does not restrict the visitation to a + * connectivity matrix (returns <code>null</code>). + */ + @Override + public URI getLinkType() { + + return linkType.get(); + + } + + @Override + public void setLinkType(final URI linkType) { + + this.linkType.set(linkType); + + } + + /** + * {@inheritDoc} + * <p> + * The default implementation only visits the edges. + */ + @Override + public IStriterator constrainFilter(final IStriterator itr) { + + return itr.addFilter(getEdgeOnlyFilter()); + + } + + /** + * Return an {@link IFilter} that will only visit the edges of the graph. + * + * @see IGASState#isEdge(Statement) + */ + protected IFilter getEdgeOnlyFilter() { + + return new EdgeOnlyFilter(this); + + } + + /** + * Filter visits only edges (filters out attribute values). + * <p> + * Note: This filter is pushed down onto the AP and evaluated close to the + * data. + */ + private class EdgeOnlyFilter extends Filter { + private static final long serialVersionUID = 1L; + private final IGASState<VS, ES, ST> gasState; + private EdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) { + this.gasState = ctx.getGASState(); + } + @Override + public boolean isValid(final Object e) { + return gasState.isEdge((Statement) e); + } + }; + + /** + * Return a filter that only visits the edges of graph that are instances of + * the specified link attribute type. + * <p> + * Note: For bigdata, the visited edges can be decoded to recover the + * original link as well. + * + * @see IGASState#isLinkAttrib(Statement, URI) + * @see IGASState#decodeStatement(Value) + */ + protected IFilter getLinkAttribFilter(final IGASContext<VS, ES, ST> ctx, + final URI linkAttribType) { + + return new LinkAttribFilter(ctx, linkAttribType); + + } + + /** + * Filter visits only edges where the {@link Statement} is an instance of + * the specified link attribute type. For bigdata, the visited edges can be + * decoded to recover the original link as well. + */ + private class LinkAttribFilter extends Filter { + private static final long serialVersionUID = 1L; + + private final IGASState<VS, ES, ST> gasState; + private final URI linkAttribType; + + public LinkAttribFilter(final IGASContext<VS, ES, ST> ctx, + final URI linkAttribType) { + if (linkAttribType == null) + throw new IllegalArgumentException(); + this.gasState = ctx.getGASState(); + this.linkAttribType = linkAttribType; + } + + @Override + public boolean isValid(final Object e) { + return gasState.isLinkAttrib((Statement) e, linkAttribType); + } + } + + @Override + public <T> void setRunAfterOp(final IReducer<VS, ES, ST, T> afterOp) { + + this.afterOp.set(afterOp); + + } + + @SuppressWarnings("unchecked") + @Override + public <T> IReducer<VS, ES, ST, T> getRunAfterOp() { + + return (IReducer<VS, ES, ST, T>) afterOp.get(); + + } + } // GASContext Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -210,6 +210,7 @@ } + @Override public Long call() throws Exception { long nedges = 0L; Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -320,6 +320,10 @@ * * TODO REDUCE : parallelize with nthreads. The reduce operations are often * lightweight, so maybe a fork/join pool would work better? + * <p> + * Note: We can not do a parallel reduction right now because the backing + * class does not expose a parallel iterator, e.g., a segment-wise iterator. + * The reduction over the {@link #vertexState} is quite slow as a result. */ @Override public <T> T reduce(final IReducer<VS, ES, ST, T> op) { Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -32,7 +32,6 @@ import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.IGASContext; -import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGraphAccessor; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.util.VertexDistribution; @@ -325,12 +324,11 @@ } - @SuppressWarnings({ "unchecked", "rawtypes" }) private IStriterator getEdges(final boolean inEdges, final IGASContext<?, ?, ?> ctx, final Value u) throws SailException { - final URI linkTypeIV = (URI) ctx.getGASProgram().getLinkType(); + final URI linkTypeIV = (URI) ctx.getLinkType(); if(linkTypeIV != null) { /* * FIXME RDR: We need to use a union of access paths for link @@ -351,8 +349,7 @@ /* * Optionally wrap the program specified filter. */ - return ((IGASProgram) ctx.getGASProgram()).constrainFilter(ctx, - sitr); + return ctx.constrainFilter(sitr); } Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -31,7 +31,6 @@ import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.IGASContext; -import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGraphAccessor; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.util.VertexDistribution; @@ -148,7 +147,7 @@ final IGASContext<?, ?, ?> ctx, final Value u) throws SailException { - final URI linkTypeIV = (URI) ctx.getGASProgram().getLinkType(); + final URI linkTypeIV = (URI) ctx.getLinkType(); if(linkTypeIV != null) { /* * FIXME RDR: We need to use a union of access paths for link @@ -176,7 +175,7 @@ * since only one is optimized. */ final boolean posOptimization = linkTypeIV != null - && !inEdges; + && inEdges; final CloseableIteration<? extends Statement, SailException> citr; if (posOptimization) { @@ -238,9 +237,9 @@ * much more efficient. (If the index is local, then simply stacking * striterators is just as efficient.) */ - return ((IGASProgram) ctx.getGASProgram()).constrainFilter(ctx, - sitr); + return ctx.constrainFilter(sitr); + } @Override Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java =================================================================== --- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -36,8 +36,6 @@ import com.bigdata.rdf.graph.impl.ram.RAMGASEngine.RAMGraph; import com.bigdata.rdf.graph.impl.ram.RAMGASEngine.RAMGraphAccessor; -import cutthecrap.utils.striterators.IStriterator; - /** * Test class for GATHER. * @@ -89,21 +87,7 @@ return EdgesEnum.NoEdges; } - /** - * {@inheritDoc} - * <p> - * Overridden to only visit the edges of the graph. - */ @Override - public IStriterator constrainFilter( - final IGASContext<Set<Statement>, Set<Statement>, Set<Statement>> ctx, - final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - @Override public Factory<Value, Set<Statement>> getVertexStateFactory() { return new Factory<Value, Set<Statement>>() { @Override Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java =================================================================== --- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -35,8 +35,6 @@ import com.bigdata.rdf.graph.impl.BaseGASProgram; import com.bigdata.rdf.graph.impl.GASStats; -import cutthecrap.utils.striterators.IStriterator; - /** * Test class for GATHER. * @@ -87,22 +85,8 @@ public EdgesEnum getScatterEdges() { return EdgesEnum.NoEdges; } - - /** - * {@inheritDoc} - * <p> - * Overridden to only visit the edges of the graph. - */ + @Override - public IStriterator constrainFilter( - final IGASContext<Set<Statement>, Set<Statement>, Set<Statement>> ctx, - final IStriterator itr) { - - return itr.addFilter(getEdgeOnlyFilter(ctx)); - - } - - @Override public Factory<Value, Set<Statement>> getVertexStateFactory() { return new Factory<Value, Set<Statement>>() { @Override Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878) @@ -35,6 +35,8 @@ import com.bigdata.rdf.graph.impl.util.VertexDistribution; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.IVUtility; +import com.bigdata.rdf.internal.NotMaterializedException; +import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.sail.BigdataSail; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPOKeyOrder; @@ -361,7 +363,7 @@ this.ctx = ctx; this.u = u; - linkTypeIV = (IV) ctx.getGASProgram().getLinkType(); + linkTypeIV = getIV(ctx.getLinkType()); final IKeyBuilder keyBuilder; /* @@ -371,7 +373,7 @@ * * [u] gets bound on O. * - * We use... [truncated message content] |