From: <tho...@us...> - 2014-02-23 00:14:32
|
Revision: 7873 http://sourceforge.net/p/bigdata/code/7873 Author: thompsonbry Date: 2014-02-23 00:14:26 +0000 (Sun, 23 Feb 2014) Log Message: ----------- Integrated the maxIterations and maxVertices constraints into IGASContext, GASContext, and GASService. The algorithm now halts if those thresholds are reached. We could also do this for #edges visited since that is tracked by IGASStats. Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-22 22:38:43 UTC (rev 7872) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2014-02-23 00:14:26 UTC (rev 7873) @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -*/ + */ package com.bigdata.rdf.graph; import java.util.concurrent.Callable; @@ -48,13 +48,47 @@ * The computation state. */ IGASState<VS, ES, ST> getGASState(); - + /** * The graph access object. */ IGraphAccessor getGraphAccessor(); - + /** + * Specify the maximum number of iterations for the algorithm. + * + * @param newValue + * The maximum number of iterations. + * + * @throws IllegalArgumentException + * if the new value is non-positive. + */ + void setMaxIterations(int newValue); + + /** + * Return the maximum number iterations for the algorithm. + */ + int getMaxIterations(); + + /** + * Specify the maximum number of vertices that may be visited. The algorithm + * will halt if this value is exceeded. + * + * @param newValue + * The maximum number of vertices in the frontier. + * + * @throws IllegalArgumentException + * if the new value is non-positive. + */ + void setMaxVisited(int newValue); + + /** + * Return the maximum number of vertices that may be visited. The algorithm + * will halt if this value is exceeded. + */ + int getMaxVisited(); + + /** * Execute one iteration. * * @param stats @@ -65,11 +99,11 @@ */ boolean doRound(IGASStats stats) throws Exception, ExecutionException, InterruptedException; - + /** * Execute the associated {@link IGASProgram}. */ @Override IGASStats call() throws Exception; - + } \ No newline at end of file Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-22 22:38:43 UTC (rev 7872) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2014-02-23 00:14:26 UTC (rev 7873) @@ -19,6 +19,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.openrdf.model.Statement; @@ -57,6 +58,18 @@ private final IGASProgram<VS, ES, ST> program; /** + * The maximum number of iterations (defaults to {@link Integer#MAX_VALUE}). + */ + private final AtomicInteger maxIterations = new AtomicInteger( + Integer.MAX_VALUE); + + /** + * The maximum number of vertices (defaults to {@link Integer#MAX_VALUE}). + */ + private final AtomicInteger maxVertices = new AtomicInteger( + Integer.MAX_VALUE); + + /** * * @param namespace * The namespace of the graph (KB instance). @@ -117,6 +130,31 @@ while (!gasState.frontier().isEmpty()) { + /* + * Check halting conditions. + * + * Note: We could also halt on maxEdges since that is tracked in the + * GASStats. + */ + + if (total.getNRounds() >= getMaxIterations()) { + + log.warn("Halting: maxIterations=" + getMaxIterations() + + ", #rounds=" + total.getNRounds()); + + break; + + } + + if (total.getFrontierSize() >= getMaxVisited()) { + + log.warn("Halting: maxVertices=" + getMaxVisited() + + ", frontierSize=" + total.getFrontierSize()); + + break; + + } + final GASStats roundStats = new GASStats(); doRound(roundStats); @@ -656,4 +694,38 @@ } // GatherTask + @Override + public void setMaxIterations(final int newValue) { + + if (newValue <= 0) + throw new IllegalArgumentException(); + + this.maxIterations.set(newValue); + + } + + @Override + public int getMaxIterations() { + + return maxIterations.get(); + + } + + @Override + public void setMaxVisited(int newValue) { + + if (newValue <= 0) + throw new IllegalArgumentException(); + + this.maxVertices.set(newValue); + + } + + @Override + public int getMaxVisited() { + + return maxVertices.get(); + + } + } // GASContext Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-22 22:38:43 UTC (rev 7872) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-23 00:14:26 UTC (rev 7873) @@ -53,10 +53,13 @@ import com.bigdata.rdf.graph.IGASStats; import com.bigdata.rdf.graph.IGraphAccessor; import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.analytics.BFS; import com.bigdata.rdf.graph.impl.GASEngine; import com.bigdata.rdf.graph.impl.GASState; import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.internal.impl.literal.XSDNumericIV; import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; import com.bigdata.rdf.sparql.ast.GraphPatternGroup; @@ -223,6 +226,12 @@ */ URI OUT = new URIImpl(NAMESPACE + "out"); + /** + * The state of the visited vertex (algorithm dependent, but something + * like traversal depth is common). + */ + URI STATE = new URIImpl(NAMESPACE + "state"); + } static private transient final Logger log = Logger @@ -323,12 +332,13 @@ // options extracted from the SERVICE's graph pattern. private final int nthreads; - private final int maxIterations; // FIXME set as limit on GASState. - private final int maxVisited; // FIXME set as limit on GASState. + private final int maxIterations; + private final int maxVisited; private final Class<IGASProgram<VS, ES, ST>> gasClass; private final Class<IGASSchedulerImpl> schedulerClass; private final Value[] initialFrontier; private final IVariable<?> outVar; + private final IVariable<?> stateVar; public GASServiceCall(final AbstractTripleStore store, final ServiceNode serviceNode, @@ -434,6 +444,9 @@ // The output variable (bound to the visited set). this.outVar = getVar(Options.PROGRAM, Options.OUT); + // The state variable (bound to the state associated with each visited vertex). + this.stateVar = getVar(Options.PROGRAM, Options.STATE); + } /** @@ -652,6 +665,10 @@ final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext( graphAccessor, gasProgram); + gasContext.setMaxIterations(maxIterations); + + gasContext.setMaxVisited(maxVisited); + final IGASState<VS, ES, ST> gasState = gasContext.getGASState(); // TODO We should look at this when extracting the parameters from the SERVICE's graph pattern. @@ -710,17 +727,48 @@ } }); + /* + * Bind output variables (if any). + */ final IBindingSet[] out = new IBindingSet[visitedSet.size()]; { - final IVariable[] vars = new IVariable[] { outVar }; + + final List<IVariable> tmp = new LinkedList<IVariable>(); + + if (outVar != null) + tmp.add(outVar); + + if (stateVar != null) + tmp.add(stateVar); + + final IVariable[] vars = tmp.toArray(new IVariable[tmp + .size()]); + + final IConstant[] vals = new IConstant[vars.length]; + int i = 0; + for (Value v : visitedSet) { - out[i++] = new ListBindingSet(vars, - new IConstant[] { new Constant(v) }); + int j = 0; + if (outVar != null) { + vals[j++] = new Constant(v); + } + if (stateVar != null) { + /* + * FIXME Need an API for self-reporting of an IV by + * the IGASProgram. + */ + final int depth = ((BFS.VS)gasState.getState(v)).depth(); + final IV depthIV = new XSDNumericIV(depth); + vals[j++] = new Constant(depthIV); + } + out[i++] = new ListBindingSet(vars, vals); + } + } return new ChunkedArrayIterator<IBindingSet>(out); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |