|
From: <tho...@us...> - 2014-02-24 01:52:19
|
Revision: 7878
http://sourceforge.net/p/bigdata/code/7878
Author: thompsonbry
Date: 2014-02-24 01:52:13 +0000 (Mon, 24 Feb 2014)
Log Message:
-----------
Checkpoint on the GASService. See #810.
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java
branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java
branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java
branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java
branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java
branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestGather.java
branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/PrefixDeclProcessor.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -18,6 +18,11 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+
+import cutthecrap.utils.striterators.IStriterator;
+
/**
* Execution context for an {@link IGASProgram}. This is distinct from the
* {@link IGASEngine} so we can support distributed evaluation and concurrent
@@ -36,6 +41,15 @@
* the generic type for the per-edge state, but that is not always
* true. The SUM type is scoped to the GATHER + SUM operation (NOT
* the computation).
+ *
+ * TODO Add option to order the vertices to provide a serializable
+ * execution plan (like GraphChi). I believe that this reduces to
+ * computing a DAG over the frontier before executing the GATHER and
+ * then executing the frontier such that the parallel execution is
+ * constrained by arcs in the DAG that do not have mutual
+ * dependencies. This would have to place a partial ordering over the
+ * vertices in the frontier and then process the frontier with
+ * limited parallelism based on that partial ordering.
*/
public interface IGASContext<VS, ES, ST> extends Callable<IGASStats> {
@@ -90,6 +104,73 @@
int getMaxVisited();
/**
+ * Return non-<code>null</code> iff there is a single link type to be
+ * visited. This corresponds to a view of the graph as sparse connectivity
+ * matrix. The {@link IGASEngine} can optimize traversal patterns using the
+ * <code>POS</code> index.
+ * <p>
+ * Note: When this option is used, the scatter and gather will not visit the
+ * property set for the vertex. Instead, the graph is treated as if it were
+ * an unattributed graph and only mined for the connectivity data.
+ *
+ * @return The {@link Value} for the predicate that identifies the desired
+ * link type (there can be many types of links - the return value
+ * specifies which attribute is of interest).
+ *
+ * FIXME define getLinkAttribType() (RDR)
+ */
+ URI getLinkType();
+
+ /**
+ * Set an optional constraint on the type of the visited links.
+ * <p>
+ * Note: When this option is used, the scatter and gather will not visit the
+ * property set for the vertex. Instead, the graph is treated as if it were
+ * an unattributed graph and only mined for the connectivity data (which may
+ * include a link weight).
+ *
+ * @param linkType
+ * The link type to visit (optional). When <code>null</code>, all
+ * link types are visited.
+ */
+ void setLinkType(URI linkType);
+
+ /**
+ * Set an optional {@link IReducer} that will run after the
+ * {@link IGASProgram} is terminated. This may be used to extract results
+ * from the visited vertices.
+ *
+ * @param afterOp
+ * The {@link IReducer}.
+ */
+ <T> void setRunAfterOp(IReducer<VS, ES, ST, T> afterOp);
+
+ /**
+ * Return an optional {@link IReducer} that will run after the
+ * {@link IGASProgram} is terminated. This may be used to extract results
+ * from the visited vertices.
+ */
+ <T> IReducer<VS, ES, ST, T> getRunAfterOp();
+
+ /**
+ * Hook to impose a constraint on the visited edges and/or property values.
+ *
+ * @param itr
+ * The iterator visiting those edges and/or property values.
+ *
+ * @return Either the same iterator or a constrained iterator.
+ *
+ * TODO Rename as constrainEdgeFilter or even split into a
+ * constrainGatherFilter and a constraintScatterFilter.
+ *
+ * FIXME APPLY : If we need access to the vertex property values in
+ * APPLY (which we probably do, at least optionally), then there
+ * should be a similar method to decide whether the property values
+ * for the vertex are made available during the APPLY.
+ */
+ IStriterator constrainFilter(IStriterator eitr);
+
+ /**
* Execute one iteration.
*
* @param stats
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -16,24 +16,15 @@
package com.bigdata.rdf.graph;
import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
import org.openrdf.model.Value;
-import cutthecrap.utils.striterators.IStriterator;
+import com.bigdata.rdf.graph.analytics.CC;
+import com.bigdata.rdf.graph.impl.util.GASRunnerBase;
/**
* Interface for options that are understood by the {@link IGASEngine} and which
* may be declared by the {@link IGASProgram}.
*
- * TODO Add option to order the vertices to provide a serializable execution
- * plan (like GraphChi). I believe that this reduces to computing a DAG over the
- * frontier before executing the GATHER and then executing the frontier such
- * that the parallel execution is constrained by arcs in the DAG that do not
- * have mutual dependencies. This is really an option that would be implemented
- * by the {@link IGASContext}, which would have to place a partial ordering over
- * the vertices in the frontier and then process the frontier with limited
- * parallelism based on that partial ordering.
- *
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*/
public interface IGASOptions<VS, ES, ST> {
@@ -51,6 +42,10 @@
* sample all vertices regardless of their edges, specify
* {@value EdgesEnum#NoEdges}. To require that each vertex has at least one
* in-edge and one out-edge, specify {@link EdgesEnum#AllEdges}.
+ *
+ * FIXME This should be moved into {@link GASRunnerBase}. The only class
+ * that customizes this is {@link CC}. (For {@link CC} we need to put all
+ * vertices into the frontier, even those without edges.)
*/
EdgesEnum getSampleEdgesFilter();
@@ -86,40 +81,4 @@
*/
Factory<Statement, ES> getEdgeStateFactory();
- /**
- * Return non-<code>null</code> iff there is a single link type to be
- * visited. This corresponds to a view of the graph as sparse connectivity
- * matrix. The {@link IGASEngine} can optimize traversal patterns using the
- * <code>POS</code> index.
- * <p>
- * Note: When this option is used, the scatter and gather will not visit the
- * property set for the vertex. The graph is treated as if it were an
- * unattributed graph and only mined for the connectivity data.
- *
- * @return The {@link Value} for the predicate that identifies the desired
- * link type (there can be many types of links - the return value
- * specifies which attribute is of interest).
- *
- * @see #getLinkAttribType()
- */
- URI getLinkType();
-
- /**
- * Hook to impose a constraint on the visited edges and/or property values.
- *
- * @param itr
- * The iterator visiting those edges and/or property values.
- *
- * @return Either the same iterator or a constrained iterator.
- *
- * TODO Rename as constrainEdgeFilter or even split into a
- * constrainGatherFilter and a constraintScatterFilter.
- *
- * FIXME APPLY : If we need access to the vertex property values in
- * APPLY (which we probably do, at least optionally), then there
- * should be a similar method to decide whether the property values
- * for the vertex are made available during the APPLY.
- */
- IStriterator constrainFilter(IGASContext<VS, ES, ST> ctx, IStriterator eitr);
-
}
\ No newline at end of file
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -15,8 +15,11 @@
*/
package com.bigdata.rdf.graph;
+import java.util.List;
+
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
/**
* Abstract interface for GAS programs.
@@ -51,12 +54,13 @@
void before(IGASContext<VS, ES, ST> ctx);
/**
- * One time initialization after the {@link IGASProgram} is executed.
+ * Return a default reduction that will be applied after the
+ * {@link IGASProgram} is executed.
*
- * @param ctx
- * The evaluation context.
+ * @return The default reduction -or- <code>null</code> if no such reduction
+ * is defined.
*/
- void after(IGASContext<VS, ES, ST> ctx);
+ <T> IReducer<VS, ES, ST, T> getDefaultAfterOp();
/**
* Callback to initialize the state for each vertex in the initial frontier
@@ -200,5 +204,42 @@
* the frontier is non-empty).
*/
boolean nextRound(IGASContext<VS, ES, ST> ctx);
+
+ /**
+ * Return a list of interfaces that may be used to extract variable bindings
+ * for the vertices visited by the algorithm.
+ */
+ List<IBinder<VS, ES, ST>> getBinderList();
+ /**
+ * An interface that may be used to extract variable bindings for the
+ * vertices visited by the algorithm.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ public interface IBinder<VS, ES, ST> {
+
+ /**
+ * The ordinal index of the variable that is bound by this
+ * {@link IBinder}. By convention, index ZERO is the vertex. Indices
+ * greater than ZERO are typically aspects of the state of the vertex.
+ */
+ int getIndex();
+
+ /**
+ * @param vf
+ * The {@link ValueFactory} used to create the return
+ * {@link Value}.
+ * @param u
+ * The vertex.
+ *
+ * @return The {@link Value} for that ordinal variable or
+ * <code>null</code> if there is no binding for that ordinal
+ * variable.
+ */
+ Value bind(ValueFactory vf, final IGASState<VS, ES, ST> state, Value u);
+
+ }
+
}
\ No newline at end of file
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -26,7 +26,7 @@
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id: IResultHandler.java 2265 2009-10-26 12:51:06Z thompsonbry $
*/
-public interface IReducer<VS,ES, ST, T> {
+public interface IReducer<VS, ES, ST, T> {
/**
* Method is invoked for each result and is responsible for combining the
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -15,8 +15,8 @@
*/
package com.bigdata.rdf.graph.analytics;
-import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +24,7 @@
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
@@ -34,8 +35,6 @@
import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Breadth First Search (BFS) is an iterative graph traversal primitive. The
* frontier is expanded iteratively until no new vertices are discovered. Each
@@ -158,19 +157,6 @@
}
/**
- * {@inheritDoc}
- * <p>
- * Overridden to only visit the edges of the graph.
- */
- @Override
- public IStriterator constrainFilter(
- final IGASContext<BFS.VS, BFS.ES, Void> ctx, final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- /**
* Not used.
*/
@Override
@@ -260,6 +246,39 @@
}
/**
+ * {@inheritDoc}
+ * <p>
+ * <dl>
+ * <dt>1</dt>
+ * <dd>The depth at which the vertex was first encountered during traversal.</dd>
+ * </dl>
+ */
+ @Override
+ public List<IBinder<BFS.VS, BFS.ES, Void>> getBinderList() {
+
+ final List<IBinder<BFS.VS, BFS.ES, Void>> tmp = super.getBinderList();
+
+ tmp.add(new IBinder<BFS.VS, BFS.ES, Void>() {
+
+ @Override
+ public int getIndex() {
+ return 1;
+ }
+
+ @Override
+ public Value bind(final ValueFactory vf,
+ final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) {
+
+ return vf.createLiteral(state.getState(u).depth.get());
+
+ }
+ });
+
+ return tmp;
+
+ }
+
+ /**
* Reduce the active vertex state, returning a histogram reporting the #of
* vertices at each distance from the starting vertex. There will always be
* one vertex at depth zero - this is the starting vertex. For each
@@ -272,11 +291,9 @@
* Thompson</a>
*
* TODO Do another reducer that reports the actual BFS tree rather
- * than a histogram. For each depth, it needs to have the set of
- * vertices that are at that number of hops from the starting
- * vertex. So, there is an outer map from depth to set. The inner
- * set should also be concurrent if we allow concurrent reduction of
- * the activated vertex state.
+ * than a histogram. We need to store the predecessor for this. That
+ * will allow us to trivially report the BFS route between any two
+ * vertices.
*/
protected static class HistogramReducer implements
IReducer<VS, ES, Void, Map<Integer, AtomicLong>> {
@@ -323,54 +340,71 @@
}
- @Override
- public void after(final IGASContext<BFS.VS, BFS.ES, Void> ctx) {
+// @Override
+// public <T> IReducer<VS, ES, Void, T> getDefaultAfterOp() {
+//
+// class NV implements Comparable<NV> {
+// public final int n;
+// public final long v;
+// public NV(final int n, final long v) {
+// this.n = n;
+// this.v = v;
+// }
+// @Override
+// public int compareTo(final NV o) {
+// if (o.n > this.n)
+// return -1;
+// if (o.n < this.n)
+// return 1;
+// return 0;
+// }
+// }
+//
+// final IReducer<VS, ES, Void, T> outerReducer = new IReducer<VS, ES, Void, T>() {
+//
+// final HistogramReducer innerReducer = new HistogramReducer();
+//
+// @Override
+// public void visit(IGASState<VS, ES, Void> state, Value u) {
+//
+// innerReducer.visit(state, u);
+//
+// }
+//
+// @Override
+// public T get() {
+//
+// final Map<Integer, AtomicLong> h = innerReducer.get();
+//
+// final NV[] a = new NV[h.size()];
+//
+// int i = 0;
+//
+// for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) {
+//
+// a[i++] = new NV(e.getKey().intValue(), e.getValue().get());
+//
+// }
+//
+// Arrays.sort(a);
+//
+// System.out.println("distance, frontierSize, sumFrontierSize");
+// long sum = 0L;
+// for (NV t : a) {
+//
+// System.out.println(t.n + ", " + t.v + ", " + sum);
+//
+// sum += t.v;
+//
+// }
+//
+// return null;
+// }
+//
+// };
+//
+// return outerReducer;
+//
+// }
- final HistogramReducer r = new HistogramReducer();
-
- ctx.getGASState().reduce(r);
-
- class NV implements Comparable<NV> {
- public final int n;
- public final long v;
- public NV(final int n, final long v) {
- this.n = n;
- this.v = v;
- }
- @Override
- public int compareTo(final NV o) {
- if (o.n > this.n)
- return -1;
- if (o.n < this.n)
- return 1;
- return 0;
- }
- }
-
- final Map<Integer, AtomicLong> h = r.get();
-
- final NV[] a = new NV[h.size()];
-
- int i = 0;
-
- for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) {
-
- a[i++] = new NV(e.getKey().intValue(), e.getValue().get());
-
- }
-
- Arrays.sort(a);
-
- System.out.println("distance, frontierSize, sumFrontierSize");
- long sum = 0L;
- for (NV t : a) {
-
- System.out.println(t.n + ", " + t.v + ", " + sum);
-
- sum += t.v;
-
- }
-
- }
-
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -15,7 +15,6 @@
*/
package com.bigdata.rdf.graph.analytics;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,14 +28,11 @@
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
import com.bigdata.rdf.graph.FrontierEnum;
-import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASScheduler;
import com.bigdata.rdf.graph.IGASState;
import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Connected components computes the distinct sets of non-overlapping subgraphs
* within a graph. All vertices within a connected component are connected along
@@ -190,19 +186,6 @@
/**
* {@inheritDoc}
* <p>
- * Overridden to only visit the edges of the graph.
- */
- @Override
- public IStriterator constrainFilter(
- final IGASContext<CC.VS, CC.ES, Value> ctx, final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- /**
- * {@inheritDoc}
- * <p>
* Return the label of the remote vertex.
*/
@Override
@@ -325,87 +308,95 @@
* Returns a map containing the labels assigned to each connected component
* (which gives you a vertex in that connected component) and the #of
* vertices in each connected component.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
*/
- public Map<Value, AtomicInteger> getConnectedComponents(
- final IGASState<CC.VS, CC.ES, Value> state) {
+ public class ConnectedComponentsReducer implements IReducer<CC.VS,CC.ES,Value,Map<Value,AtomicInteger>> {
final ConcurrentHashMap<Value, AtomicInteger> labels = new ConcurrentHashMap<Value, AtomicInteger>();
- return state
- .reduce(new IReducer<CC.VS, CC.ES, Value, Map<Value, AtomicInteger>>() {
+ @Override
+ public void visit(final IGASState<VS, ES, Value> state, final Value u) {
- @Override
- public void visit(final IGASState<VS, ES, Value> state,
- final Value u) {
+ final VS us = state.getState(u);
- final VS us = state.getState(u);
+ if (us != null) {
- if (us != null) {
+ final Value label = us.getLabel();
- final Value label = us.getLabel();
+ if (log.isDebugEnabled())
+ log.debug("v=" + u + ", label=" + label);
- if (log.isDebugEnabled())
- log.debug("v=" + u + ", label=" + label);
+ final AtomicInteger oldval = labels.putIfAbsent(label,
+ new AtomicInteger(1));
- final AtomicInteger oldval = labels.putIfAbsent(
- label, new AtomicInteger(1));
+ if (oldval != null) {
- if (oldval != null) {
+ // lost race. increment existing counter.
+ oldval.incrementAndGet();
- // lost race. increment existing counter.
- oldval.incrementAndGet();
-
- }
-
- }
+ }
- }
+ }
- @Override
- public Map<Value, AtomicInteger> get() {
+ }
- return Collections.unmodifiableMap(labels);
+ @Override
+ public Map<Value, AtomicInteger> get() {
- }
- });
+ return Collections.unmodifiableMap(labels);
+ }
+
}
- @Override
- public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) {
+ /**
+ * Returns a map containing the labels assigned to each connected component
+ * (which gives you a vertex in that connected component) and the #of
+ * vertices in each connected component.
+ */
+ public Map<Value, AtomicInteger> getConnectedComponents(
+ final IGASState<CC.VS, CC.ES, Value> state) {
- final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx
- .getGASState());
-
- System.out.println("There are " + labels.size()
- + " connected components");
-
- class NV implements Comparable<NV> {
- public final int n;
- public final Value v;
- public NV(int n, Value v) {
- this.n = n;
- this.v = v;
- }
- @Override
- public int compareTo(final NV o) {
- return o.n - this.n;
- }
- }
-
- final NV[] a = new NV[labels.size()];
- int i = 0;
- for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) {
- a[i++] = new NV(e.getValue().intValue(), e.getKey());
- }
-
- Arrays.sort(a);
-
- System.out.println("size, label");
- for(NV t : a) {
- System.out.println(t.n + ", " + t.v);
- }
-
+ return state.reduce(new ConnectedComponentsReducer());
}
+// @Override
+// public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) {
+//
+// final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx
+// .getGASState());
+//
+// System.out.println("There are " + labels.size()
+// + " connected components");
+//
+// class NV implements Comparable<NV> {
+// public final int n;
+// public final Value v;
+// public NV(int n, Value v) {
+// this.n = n;
+// this.v = v;
+// }
+// @Override
+// public int compareTo(final NV o) {
+// return o.n - this.n;
+// }
+// }
+//
+// final NV[] a = new NV[labels.size()];
+// int i = 0;
+// for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) {
+// a[i++] = new NV(e.getValue().intValue(), e.getKey());
+// }
+//
+// Arrays.sort(a);
+//
+// System.out.println("size, label");
+// for(NV t : a) {
+// System.out.println(t.n + ", " + t.v);
+// }
+//
+// }
+
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -15,7 +15,6 @@
*/
package com.bigdata.rdf.graph.analytics;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,8 +32,6 @@
import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Page rank assigns weights to the vertices in a graph based by on the relative
* "importance" as determined by the patterns of directed links in the graph.
@@ -186,19 +183,6 @@
/**
* {@inheritDoc}
* <p>
- * Overridden to only visit the edges of the graph.
- */
- @Override
- public IStriterator constrainFilter(
- final IGASContext<PR.VS, PR.ES, Double> ctx, final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- /**
- * {@inheritDoc}
- * <p>
* Each vertex is initialized to the reset probability.
*
* FIXME We need to do this efficiently. E.g., using a scan to find all of
@@ -332,97 +316,107 @@
}
- @Override
- public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) {
+ /**
+ * Class reports a map containing the page rank associated with each visited
+ * vertex.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ public class PageRankReducer implements IReducer<PR.VS, PR.ES, Double, Map<Value,Double>> {
- final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>();
+ private final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>();
+
+ @Override
+ public void visit(final IGASState<VS, ES, Double> state,
+ final Value u) {
- ctx.getGASState().reduce(
- new IReducer<PR.VS, PR.ES, Double, Map<Value, Double>>() {
+ final VS us = state.getState(u);
- @Override
- public void visit(final IGASState<VS, ES, Double> state,
- final Value u) {
+ if (us != null) {
- final VS us = state.getState(u);
+ final double pageRank = us.getValue();
- if (us != null) {
+ // FIXME Why are NaNs showing up?
+ if (Double.isNaN(pageRank))
+ return;
- final double pageRank = us.getValue();
+ // FIXME Do infinite values show up?
+ if (Double.isInfinite(pageRank))
+ return;
+
+ if (pageRank < minPageRank) {
+ // Ignore small values.
+ return;
+ }
- // FIXME Why are NaNs showing up?
- if (Double.isNaN(pageRank))
- return;
+ /*
+ * Only report the larger ranked values.
+ */
- // FIXME Do infinite values show up?
- if (Double.isInfinite(pageRank))
- return;
-
- if (pageRank < minPageRank) {
- // Ignore small values.
- return;
- }
+ if (log.isDebugEnabled())
+ log.debug("v=" + u + ", pageRank=" + pageRank);
- /*
- * Only report the larger ranked values.
- */
+ values.put(u, Double.valueOf(pageRank));
- if (log.isDebugEnabled())
- log.debug("v=" + u + ", pageRank=" + pageRank);
-
- values.put(u, Double.valueOf(pageRank));
-
- }
-
- }
-
- @Override
- public Map<Value, Double> get() {
-
- return Collections.unmodifiableMap(values);
-
- }
- });
-
- class NV implements Comparable<NV> {
- public final double n;
- public final Value v;
- public NV(double n, Value v) {
- this.n = n;
- this.v = v;
}
- @Override
- public int compareTo(final NV o) {
- if (o.n > this.n)
- return 1;
- if (o.n < this.n)
- return -1;
- return 0;
- }
- }
- final NV[] a = new NV[values.size()];
-
- int i = 0;
-
- for (Map.Entry<Value, Double> e : values.entrySet()) {
-
- a[i++] = new NV(e.getValue().doubleValue(), e.getKey());
-
}
- Arrays.sort(a);
+ @Override
+ public Map<Value, Double> get() {
- System.out.println("rank, pageRank, vertex");
- i = 0;
- for (NV t : a) {
+ return Collections.unmodifiableMap(values);
- System.out.println(i + ", " + t.n + ", " + t.v);
-
- i++;
-
}
-
+
}
+
+// @Override
+// public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) {
+//
+// final Map<Value, Double> values = ctx.getGASState().reduce(
+// new PageRankReducer());
+//
+// class NV implements Comparable<NV> {
+// public final double n;
+// public final Value v;
+// public NV(double n, Value v) {
+// this.n = n;
+// this.v = v;
+// }
+// @Override
+// public int compareTo(final NV o) {
+// if (o.n > this.n)
+// return 1;
+// if (o.n < this.n)
+// return -1;
+// return 0;
+// }
+// }
+//
+// final NV[] a = new NV[values.size()];
+//
+// int i = 0;
+//
+// for (Map.Entry<Value, Double> e : values.entrySet()) {
+//
+// a[i++] = new NV(e.getValue().doubleValue(), e.getKey());
+//
+// }
+//
+// Arrays.sort(a);
+//
+// System.out.println("rank, pageRank, vertex");
+// i = 0;
+// for (NV t : a) {
+//
+// System.out.println(i + ", " + t.n + ", " + t.v);
+//
+// i++;
+//
+// }
+//
+// }
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -15,9 +15,12 @@
*/
package com.bigdata.rdf.graph.analytics;
+import java.util.List;
+
import org.apache.log4j.Logger;
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
@@ -27,8 +30,6 @@
import com.bigdata.rdf.graph.IGASState;
import com.bigdata.rdf.graph.impl.BaseGASProgram;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* SSSP (Single Source, Shortest Path). This analytic computes the shortest path
* to each connected vertex in the graph starting from the given vertex. Only
@@ -52,9 +53,10 @@
* phase is executed to update the state of the distinct vertices in the
* frontier.
*
- * TODO Add a reducer to report the actual minimum length paths. This is
- * similar to a BFS tree, but the path lengths are not integer values so
- * we need a different data structure to collect them.
+ * FIXME Add a reducer to report the actual minimum length paths. This
+ * is similar to a BFS tree, but the path lengths are not integer values
+ * so we need a different data structure to collect them (we need to
+ * store the predecesor when we run SSSP to do this).
*/
public class SSSP extends BaseGASProgram<SSSP.VS, SSSP.ES, Integer/* dist */> {
@@ -200,20 +202,6 @@
}
/**
- * {@inheritDoc}
- * <p>
- * Overridden to only visit the edges of the graph.
- */
- @Override
- public IStriterator constrainFilter(
- final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx,
- final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- /**
* Set the {@link VS#dist()} to ZERO (0).
* <p>
* {@inheritDoc}
@@ -394,4 +382,39 @@
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * <dl>
+ * <dt>1</dt>
+ * <dd>The shortest distance from the initial frontier to the vertex.</dd>
+ * </dl>
+ */
+ @Override
+ public List<IBinder<SSSP.VS, SSSP.ES, Integer>> getBinderList() {
+
+ final List<IBinder<SSSP.VS, SSSP.ES, Integer>> tmp = super
+ .getBinderList();
+
+ tmp.add(new IBinder<SSSP.VS, SSSP.ES, Integer>() {
+
+ @Override
+ public int getIndex() {
+ return 1;
+ }
+
+ @Override
+ public Value bind(final ValueFactory vf,
+ final IGASState<SSSP.VS, SSSP.ES, Integer> state,
+ final Value u) {
+
+ return vf.createLiteral(state.getState(u).dist());
+
+ }
+ });
+
+ return tmp;
+
+ }
+
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -16,13 +16,15 @@
package com.bigdata.rdf.graph.impl;
import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
import org.apache.log4j.Logger;
import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.Factory;
@@ -30,12 +32,9 @@
import com.bigdata.rdf.graph.IGASContext;
import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGASState;
+import com.bigdata.rdf.graph.IReducer;
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
-import cutthecrap.utils.striterators.Filter;
-import cutthecrap.utils.striterators.IFilter;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Abstract base class with some useful defaults.
*
@@ -49,103 +48,6 @@
private static final Logger log = Logger.getLogger(BaseGASProgram.class);
- /**
- * {@inheritDoc}
- * <p>
- * The default implementation does not restrict the visitation to a
- * connectivity matrix (returns <code>null</code>).
- */
- @Override
- public URI getLinkType() {
-
- return null;
-
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * The default implementation returns its argument.
- */
- @Override
- public IStriterator constrainFilter(final IGASContext<VS, ES, ST> ctx,
- final IStriterator itr) {
-
- return itr;
-
- }
-
- /**
- * Return an {@link IFilter} that will only visit the edges of the graph.
- *
- * @see IGASState#isEdge(Statement)
- */
- protected IFilter getEdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) {
-
- return new EdgeOnlyFilter(ctx);
-
- }
-
- /**
- * Filter visits only edges (filters out attribute values).
- * <p>
- * Note: This filter is pushed down onto the AP and evaluated close to the
- * data.
- */
- private class EdgeOnlyFilter extends Filter {
- private static final long serialVersionUID = 1L;
- private final IGASState<VS, ES, ST> gasState;
- private EdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) {
- this.gasState = ctx.getGASState();
- }
- @Override
- public boolean isValid(final Object e) {
- return gasState.isEdge((Statement) e);
- }
- };
-
- /**
- * Return a filter that only visits the edges of graph that are instances of
- * the specified link attribute type.
- * <p>
- * Note: For bigdata, the visited edges can be decoded to recover the
- * original link as well.
- *
- * @see IGASState#isLinkAttrib(Statement, URI)
- * @see IGASState#decodeStatement(Value)
- */
- protected IFilter getLinkAttribFilter(final IGASContext<VS, ES, ST> ctx,
- final URI linkAttribType) {
-
- return new LinkAttribFilter(ctx, linkAttribType);
-
- }
-
- /**
- * Filter visits only edges where the {@link Statement} is an instance of
- * the specified link attribute type. For bigdata, the visited edges can be
- * decoded to recover the original link as well.
- */
- private class LinkAttribFilter extends Filter {
- private static final long serialVersionUID = 1L;
-
- private final IGASState<VS, ES, ST> gasState;
- private final URI linkAttribType;
-
- public LinkAttribFilter(final IGASContext<VS, ES, ST> ctx,
- final URI linkAttribType) {
- if (linkAttribType == null)
- throw new IllegalArgumentException();
- this.gasState = ctx.getGASState();
- this.linkAttribType = linkAttribType;
- }
-
- @Override
- public boolean isValid(final Object e) {
- return gasState.isLinkAttrib((Statement) e, linkAttribType);
- }
- }
-
// /**
// * If the vertex is actually an edge, then return the decoded edge.
// *
@@ -229,9 +131,9 @@
* The default implementation is a NOP.
*/
@Override
- public void after(final IGASContext<VS, ES, ST> ctx) {
+ public <T> IReducer<VS, ES, ST, T> getDefaultAfterOp() {
- // NOP
+ return null; // NOP
}
@@ -319,4 +221,49 @@
}
+ /**
+ * Return an {@link IBinder} for the vertex itself
+ */
+ private IBinder<VS, ES, ST> getBinder0() {
+
+ return new IBinder<VS, ES, ST>() {
+
+ @Override
+ public int getIndex() {
+
+ return 0;
+
+ }
+
+ @Override
+ public Value bind(final ValueFactory vf,
+ final IGASState<VS, ES, ST> state, final Value u) {
+
+ return u;
+
+ }
+
+ };
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * <dl>
+ * <dt>0</dt>
+ * <dd>The visited vertex itself.</dd>
+ * </dl>
+ */
+ @Override
+ public List<IBinder<VS, ES, ST>> getBinderList() {
+
+ final List<IBinder<VS, ES, ST>> tmp = new LinkedList<IBinder<VS, ES, ST>>();
+
+ tmp.add(getBinder0());
+
+ return tmp;
+
+ }
+
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -20,9 +20,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
import org.openrdf.model.Value;
import com.bigdata.rdf.graph.EdgesEnum;
@@ -36,6 +38,10 @@
import com.bigdata.rdf.graph.IStaticFrontier;
import com.bigdata.rdf.graph.util.GASUtil;
+import cutthecrap.utils.striterators.Filter;
+import cutthecrap.utils.striterators.IFilter;
+import cutthecrap.utils.striterators.IStriterator;
+
public class GASContext<VS, ES, ST> implements IGASContext<VS, ES, ST> {
private static final Logger log = Logger.getLogger(GASContext.class);
@@ -70,6 +76,18 @@
Integer.MAX_VALUE);
/**
+ * An optional constraint on the type of the visited links.
+ */
+ private final AtomicReference<URI> linkType = new AtomicReference<URI>(null);
+
+ /**
+ * An optional {@link IReducer} that will executed after the
+ * {@link IGASProgram}.
+ */
+ private final AtomicReference<IReducer<VS, ES, ST, ?>> afterOp = new AtomicReference<IReducer<VS, ES, ST, ?>>(
+ null);
+
+ /**
*
* @param namespace
* The namespace of the graph (KB instance).
@@ -168,8 +186,19 @@
gasState.traceState();
- program.after(this);
-
+ // Optional post-reduction.
+ {
+
+ final IReducer<VS, ES, ST, ?> op = getRunAfterOp();
+
+ if (op != null) {
+
+ gasState.reduce(op);
+
+ }
+
+ }
+
// Done
return total;
@@ -374,26 +403,93 @@
/**
* Do APPLY.
*
- * TODO The apply() should be parallelized. For some algorithms, there is a
- * moderate amount of work per vertex in apply(). Use {@link #nthreads} to
- * set the parallelism.
- * <p>
- * Note: This is very similar to the {@link IGASState#reduce(IReducer)}
- * operation. This operates over the frontier. reduce() operates over the
- * activated vertices. Both need fine grained parallelism. Both can have
- * either light or moderately heavy operations (a dot product would be an
- * example of a heavier operation).
+ * @return The #of vertices for which the operation was executed.
+ *
+ * @throws Exception
*/
- private void apply(final IStaticFrontier f) {
+ private void apply(final IStaticFrontier f) throws Exception {
- for (Value u : f) {
+// for (Value u : f) {
+//
+// program.apply(gasState, u, null/* sum */);
+//
+// }
- program.apply(gasState, u, null/* sum */);
+ // Note: Return value of ApplyReducer is currently ignored.
+ reduceOverFrontier(f, new ApplyReducer<Void>());
+
+ }
+ private class ApplyReducer<T> implements IReducer<VS, ES, ST, T> {
+
+ @Override
+ public void visit(final IGASState<VS, ES, ST> state, final Value u) {
+
+ program.apply(state, u, null/* sum */);
+
}
+ @Override
+ public T get() {
+
+ // Note: Nothing returned right now.
+ return null;
+
+ }
+
}
+
+ /**
+ * Reduce over the frontier (used for apply()).
+ *
+ * @param f
+ * The frontier.
+ * @param op
+ * The {@link IReducer}.
+ *
+ * @return The {@link IReducer#get() result}.
+ *
+ * @throws Exception
+ */
+ public <T> T reduceOverFrontier(final IStaticFrontier f,
+ final IReducer<VS, ES, ST, T> op) throws Exception {
+ if (f == null)
+ throw new IllegalArgumentException();
+
+ if (op == null)
+ throw new IllegalArgumentException();
+
+ class ReduceVertexTaskFactory implements VertexTaskFactory<Long> {
+
+ @Override
+ public Callable<Long> newVertexTask(final Value u) {
+
+ return new Callable<Long>() {
+
+ @Override
+ public Long call() {
+
+ // program.apply(gasState, u, null/* sum */);
+ op.visit(gasState, u);
+
+ // Nothing returned by visit().
+ return ONE;
+
+ };
+ };
+
+ };
+ }
+
+ gasEngine.newFrontierStrategy(new ReduceVertexTaskFactory(), f).call();
+
+ // Return reduction.
+ return op.get();
+
+ }
+ private static final Long ONE = Long.valueOf(1L);
+
/**
* @param inEdges
* when <code>true</code> the GATHER is over the in-edges.
@@ -728,4 +824,122 @@
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * The default implementation does not restrict the visitation to a
+ * connectivity matrix (returns <code>null</code>).
+ */
+ @Override
+ public URI getLinkType() {
+
+ return linkType.get();
+
+ }
+
+ @Override
+ public void setLinkType(final URI linkType) {
+
+ this.linkType.set(linkType);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * The default implementation only visits the edges.
+ */
+ @Override
+ public IStriterator constrainFilter(final IStriterator itr) {
+
+ return itr.addFilter(getEdgeOnlyFilter());
+
+ }
+
+ /**
+ * Return an {@link IFilter} that will only visit the edges of the graph.
+ *
+ * @see IGASState#isEdge(Statement)
+ */
+ protected IFilter getEdgeOnlyFilter() {
+
+ return new EdgeOnlyFilter(this);
+
+ }
+
+ /**
+ * Filter visits only edges (filters out attribute values).
+ * <p>
+ * Note: This filter is pushed down onto the AP and evaluated close to the
+ * data.
+ */
+ private class EdgeOnlyFilter extends Filter {
+ private static final long serialVersionUID = 1L;
+ private final IGASState<VS, ES, ST> gasState;
+ private EdgeOnlyFilter(final IGASContext<VS, ES, ST> ctx) {
+ this.gasState = ctx.getGASState();
+ }
+ @Override
+ public boolean isValid(final Object e) {
+ return gasState.isEdge((Statement) e);
+ }
+ };
+
+ /**
+ * Return a filter that only visits the edges of graph that are instances of
+ * the specified link attribute type.
+ * <p>
+ * Note: For bigdata, the visited edges can be decoded to recover the
+ * original link as well.
+ *
+ * @see IGASState#isLinkAttrib(Statement, URI)
+ * @see IGASState#decodeStatement(Value)
+ */
+ protected IFilter getLinkAttribFilter(final IGASContext<VS, ES, ST> ctx,
+ final URI linkAttribType) {
+
+ return new LinkAttribFilter(ctx, linkAttribType);
+
+ }
+
+ /**
+ * Filter visits only edges where the {@link Statement} is an instance of
+ * the specified link attribute type. For bigdata, the visited edges can be
+ * decoded to recover the original link as well.
+ */
+ private class LinkAttribFilter extends Filter {
+ private static final long serialVersionUID = 1L;
+
+ private final IGASState<VS, ES, ST> gasState;
+ private final URI linkAttribType;
+
+ public LinkAttribFilter(final IGASContext<VS, ES, ST> ctx,
+ final URI linkAttribType) {
+ if (linkAttribType == null)
+ throw new IllegalArgumentException();
+ this.gasState = ctx.getGASState();
+ this.linkAttribType = linkAttribType;
+ }
+
+ @Override
+ public boolean isValid(final Object e) {
+ return gasState.isLinkAttrib((Statement) e, linkAttribType);
+ }
+ }
+
+ @Override
+ public <T> void setRunAfterOp(final IReducer<VS, ES, ST, T> afterOp) {
+
+ this.afterOp.set(afterOp);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> IReducer<VS, ES, ST, T> getRunAfterOp() {
+
+ return (IReducer<VS, ES, ST, T>) afterOp.get();
+
+ }
+
} // GASContext
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASEngine.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -210,6 +210,7 @@
}
+ @Override
public Long call() throws Exception {
long nedges = 0L;
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -320,6 +320,10 @@
*
* TODO REDUCE : parallelize with nthreads. The reduce operations are often
* lightweight, so maybe a fork/join pool would work better?
+ * <p>
+ * Note: We can not do a parallel reduction right now because the backing
+ * class does not expose a parallel iterator, e.g., a segment-wise iterator.
+ * The reduction over the {@link #vertexState} is quite slow as a result.
*/
@Override
public <T> T reduce(final IReducer<VS, ES, ST, T> op) {
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -32,7 +32,6 @@
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.IGASContext;
-import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGraphAccessor;
import com.bigdata.rdf.graph.impl.GASEngine;
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
@@ -325,12 +324,11 @@
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
private IStriterator getEdges(final boolean inEdges,
final IGASContext<?, ?, ?> ctx, final Value u)
throws SailException {
- final URI linkTypeIV = (URI) ctx.getGASProgram().getLinkType();
+ final URI linkTypeIV = (URI) ctx.getLinkType();
if(linkTypeIV != null) {
/*
* FIXME RDR: We need to use a union of access paths for link
@@ -351,8 +349,7 @@
/*
* Optionally wrap the program specified filter.
*/
- return ((IGASProgram) ctx.getGASProgram()).constrainFilter(ctx,
- sitr);
+ return ctx.constrainFilter(sitr);
}
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -31,7 +31,6 @@
import com.bigdata.rdf.graph.EdgesEnum;
import com.bigdata.rdf.graph.IGASContext;
-import com.bigdata.rdf.graph.IGASProgram;
import com.bigdata.rdf.graph.IGraphAccessor;
import com.bigdata.rdf.graph.impl.GASEngine;
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
@@ -148,7 +147,7 @@
final IGASContext<?, ?, ?> ctx, final Value u)
throws SailException {
- final URI linkTypeIV = (URI) ctx.getGASProgram().getLinkType();
+ final URI linkTypeIV = (URI) ctx.getLinkType();
if(linkTypeIV != null) {
/*
* FIXME RDR: We need to use a union of access paths for link
@@ -176,7 +175,7 @@
* since only one is optimized.
*/
final boolean posOptimization = linkTypeIV != null
- && !inEdges;
+ && inEdges;
final CloseableIteration<? extends Statement, SailException> citr;
if (posOptimization) {
@@ -238,9 +237,9 @@
* much more efficient. (If the index is local, then simply stacking
* striterators is just as efficient.)
*/
- return ((IGASProgram) ctx.getGASProgram()).constrainFilter(ctx,
- sitr);
+ return ctx.constrainFilter(sitr);
+
}
@Override
Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java
===================================================================
--- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -36,8 +36,6 @@
import com.bigdata.rdf.graph.impl.ram.RAMGASEngine.RAMGraph;
import com.bigdata.rdf.graph.impl.ram.RAMGASEngine.RAMGraphAccessor;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Test class for GATHER.
*
@@ -89,21 +87,7 @@
return EdgesEnum.NoEdges;
}
- /**
- * {@inheritDoc}
- * <p>
- * Overridden to only visit the edges of the graph.
- */
@Override
- public IStriterator constrainFilter(
- final IGASContext<Set<Statement>, Set<Statement>, Set<Statement>> ctx,
- final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- @Override
public Factory<Value, Set<Statement>> getVertexStateFactory() {
return new Factory<Value, Set<Statement>>() {
@Override
Modified: branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java
===================================================================
--- branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -35,8 +35,6 @@
import com.bigdata.rdf.graph.impl.BaseGASProgram;
import com.bigdata.rdf.graph.impl.GASStats;
-import cutthecrap.utils.striterators.IStriterator;
-
/**
* Test class for GATHER.
*
@@ -87,22 +85,8 @@
public EdgesEnum getScatterEdges() {
return EdgesEnum.NoEdges;
}
-
- /**
- * {@inheritDoc}
- * <p>
- * Overridden to only visit the edges of the graph.
- */
+
@Override
- public IStriterator constrainFilter(
- final IGASContext<Set<Statement>, Set<Statement>, Set<Statement>> ctx,
- final IStriterator itr) {
-
- return itr.addFilter(getEdgeOnlyFilter(ctx));
-
- }
-
- @Override
public Factory<Value, Set<Statement>> getVertexStateFactory() {
return new Factory<Value, Set<Statement>>() {
@Override
Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java
===================================================================
--- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java 2014-02-23 23:24:57 UTC (rev 7877)
+++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java 2014-02-24 01:52:13 UTC (rev 7878)
@@ -35,6 +35,8 @@
import com.bigdata.rdf.graph.impl.util.VertexDistribution;
import com.bigdata.rdf.internal.IV;
import com.bigdata.rdf.internal.IVUtility;
+import com.bigdata.rdf.internal.NotMaterializedException;
+import com.bigdata.rdf.model.BigdataValue;
import com.bigdata.rdf.sail.BigdataSail;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.SPOKeyOrder;
@@ -361,7 +363,7 @@
this.ctx = ctx;
this.u = u;
- linkTypeIV = (IV) ctx.getGASProgram().getLinkType();
+ linkTypeIV = getIV(ctx.getLinkType());
final IKeyBuilder keyBuilder;
/*
@@ -371,7 +373,7 @@
*
* [u] gets bound on O.
*
- * We use...
[truncated message content] |