|
From: <tho...@us...> - 2014-02-23 00:14:32
|
Revision: 7873
http://sourceforge.net/p/bigdata/code/7873
Author: thompsonbry
Date: 2014-02-23 00:14:26 +0000 (Sun, 23 Feb 2014)
Log Message:
-----------
Integrated the maxIterations and maxVertices constraints into IGASContext, GASContext, and GASService. The algorithm now halts if those thresholds are reached.
We could also do this for #edges visited since that is tracked by IGASStats.
Modified Paths:
--------------
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-22 22:38:43 UTC (rev 7872)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-23 00:14:26 UTC (rev 7873)
@@ -12,7 +12,7 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-*/
+ */
package com.bigdata.rdf.graph;
import java.util.concurrent.Callable;
@@ -48,13 +48,47 @@
* The computation state.
*/
IGASState<VS, ES, ST> getGASState();
-
+
/**
* The graph access object.
*/
IGraphAccessor getGraphAccessor();
-
+
/**
+ * Specify the maximum number of iterations for the algorithm.
+ *
+ * @param newValue
+ * The maximum number of iterations.
+ *
+ * @throws IllegalArgumentException
+ * if the new value is non-positive.
+ */
+ void setMaxIterations(int newValue);
+
+ /**
+ * Return the maximum number iterations for the algorithm.
+ */
+ int getMaxIterations();
+
+ /**
+ * Specify the maximum number of vertices that may be visited. The algorithm
+ * will halt if this value is exceeded.
+ *
+ * @param newValue
+ * The maximum number of vertices in the frontier.
+ *
+ * @throws IllegalArgumentException
+ * if the new value is non-positive.
+ */
+ void setMaxVisited(int newValue);
+
+ /**
+ * Return the maximum number of vertices that may be visited. The algorithm
+ * will halt if this value is exceeded.
+ */
+ int getMaxVisited();
+
+ /**
* Execute one iteration.
*
* @param stats
@@ -65,11 +99,11 @@
*/
boolean doRound(IGASStats stats) throws Exception, ExecutionException,
InterruptedException;
-
+
/**
* Execute the associated {@link IGASProgram}.
*/
@Override
IGASStats call() throws Exception;
-
+
}
\ No newline at end of file
Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java
===================================================================
--- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-22 22:38:43 UTC (rev 7872)
+++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-23 00:14:26 UTC (rev 7873)
@@ -19,6 +19,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.openrdf.model.Statement;
@@ -57,6 +58,18 @@
private final IGASProgram<VS, ES, ST> program;
/**
+ * The maximum number of iterations (defaults to {@link Integer#MAX_VALUE}).
+ */
+ private final AtomicInteger maxIterations = new AtomicInteger(
+ Integer.MAX_VALUE);
+
+ /**
+ * The maximum number of vertices (defaults to {@link Integer#MAX_VALUE}).
+ */
+ private final AtomicInteger maxVertices = new AtomicInteger(
+ Integer.MAX_VALUE);
+
+ /**
*
* @param namespace
* The namespace of the graph (KB instance).
@@ -117,6 +130,31 @@
while (!gasState.frontier().isEmpty()) {
+ /*
+ * Check halting conditions.
+ *
+ * Note: We could also halt on maxEdges since that is tracked in the
+ * GASStats.
+ */
+
+ if (total.getNRounds() >= getMaxIterations()) {
+
+ log.warn("Halting: maxIterations=" + getMaxIterations()
+ + ", #rounds=" + total.getNRounds());
+
+ break;
+
+ }
+
+ if (total.getFrontierSize() >= getMaxVisited()) {
+
+ log.warn("Halting: maxVertices=" + getMaxVisited()
+ + ", frontierSize=" + total.getFrontierSize());
+
+ break;
+
+ }
+
final GASStats roundStats = new GASStats();
doRound(roundStats);
@@ -656,4 +694,38 @@
} // GatherTask
+ @Override
+ public void setMaxIterations(final int newValue) {
+
+ if (newValue <= 0)
+ throw new IllegalArgumentException();
+
+ this.maxIterations.set(newValue);
+
+ }
+
+ @Override
+ public int getMaxIterations() {
+
+ return maxIterations.get();
+
+ }
+
+ @Override
+ public void setMaxVisited(int newValue) {
+
+ if (newValue <= 0)
+ throw new IllegalArgumentException();
+
+ this.maxVertices.set(newValue);
+
+ }
+
+ @Override
+ public int getMaxVisited() {
+
+ return maxVertices.get();
+
+ }
+
} // GASContext
Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java
===================================================================
--- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-22 22:38:43 UTC (rev 7872)
+++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-23 00:14:26 UTC (rev 7873)
@@ -53,10 +53,13 @@
import com.bigdata.rdf.graph.IGASStats;
import com.bigdata.rdf.graph.IGraphAccessor;
import com.bigdata.rdf.graph.IReducer;
+import com.bigdata.rdf.graph.analytics.BFS;
import com.bigdata.rdf.graph.impl.GASEngine;
import com.bigdata.rdf.graph.impl.GASState;
import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor;
import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.internal.impl.literal.XSDNumericIV;
import com.bigdata.rdf.model.BigdataValue;
import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection;
import com.bigdata.rdf.sparql.ast.GraphPatternGroup;
@@ -223,6 +226,12 @@
*/
URI OUT = new URIImpl(NAMESPACE + "out");
+ /**
+ * The state of the visited vertex (algorithm dependent, but something
+ * like traversal depth is common).
+ */
+ URI STATE = new URIImpl(NAMESPACE + "state");
+
}
static private transient final Logger log = Logger
@@ -323,12 +332,13 @@
// options extracted from the SERVICE's graph pattern.
private final int nthreads;
- private final int maxIterations; // FIXME set as limit on GASState.
- private final int maxVisited; // FIXME set as limit on GASState.
+ private final int maxIterations;
+ private final int maxVisited;
private final Class<IGASProgram<VS, ES, ST>> gasClass;
private final Class<IGASSchedulerImpl> schedulerClass;
private final Value[] initialFrontier;
private final IVariable<?> outVar;
+ private final IVariable<?> stateVar;
public GASServiceCall(final AbstractTripleStore store,
final ServiceNode serviceNode,
@@ -434,6 +444,9 @@
// The output variable (bound to the visited set).
this.outVar = getVar(Options.PROGRAM, Options.OUT);
+ // The state variable (bound to the state associated with each visited vertex).
+ this.stateVar = getVar(Options.PROGRAM, Options.STATE);
+
}
/**
@@ -652,6 +665,10 @@
final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext(
graphAccessor, gasProgram);
+ gasContext.setMaxIterations(maxIterations);
+
+ gasContext.setMaxVisited(maxVisited);
+
final IGASState<VS, ES, ST> gasState = gasContext.getGASState();
// TODO We should look at this when extracting the parameters from the SERVICE's graph pattern.
@@ -710,17 +727,48 @@
}
});
+ /*
+ * Bind output variables (if any).
+ */
final IBindingSet[] out = new IBindingSet[visitedSet.size()];
{
- final IVariable[] vars = new IVariable[] { outVar };
+
+ final List<IVariable> tmp = new LinkedList<IVariable>();
+
+ if (outVar != null)
+ tmp.add(outVar);
+
+ if (stateVar != null)
+ tmp.add(stateVar);
+
+ final IVariable[] vars = tmp.toArray(new IVariable[tmp
+ .size()]);
+
+ final IConstant[] vals = new IConstant[vars.length];
+
int i = 0;
+
for (Value v : visitedSet) {
- out[i++] = new ListBindingSet(vars,
- new IConstant[] { new Constant(v) });
+ int j = 0;
+ if (outVar != null) {
+ vals[j++] = new Constant(v);
+ }
+ if (stateVar != null) {
+ /*
+ * FIXME Need an API for self-reporting of an IV by
+ * the IGASProgram.
+ */
+ final int depth = ((BFS.VS)gasState.getState(v)).depth();
+ final IV depthIV = new XSDNumericIV(depth);
+ vals[j++] = new Constant(depthIV);
+ }
+ out[i++] = new ListBindingSet(vars, vals);
+
}
+
}
return new ChunkedArrayIterator<IBindingSet>(out);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|