From: <tho...@us...> - 2014-02-20 14:19:15
|
Revision: 7858 http://sourceforge.net/p/bigdata/code/7858 Author: thompsonbry Date: 2014-02-20 14:19:11 +0000 (Thu, 20 Feb 2014) Log Message: ----------- Basic implementation of the GASService sufficient to invoke BFS, SSSP, CC, or PR. This is not yet tested. It is not sufficient to invoke the FuzzySSSP algorithm since that does not implement the IGASProgram interface. javadoc on ServiceCall. Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceCall.java Added Paths: ----------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-20 14:19:11 UTC (rev 7858) @@ -0,0 +1,774 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.graph.impl.bd; + +import java.lang.reflect.Constructor; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; + +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.journal.IIndexManager; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASEngine; +import com.bigdata.rdf.graph.IGASProgram; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASSchedulerImpl; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IGASStats; +import com.bigdata.rdf.graph.IGraphAccessor; +import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.impl.GASEngine; +import com.bigdata.rdf.graph.impl.GASState; +import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; +import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler; +import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; +import com.bigdata.rdf.sparql.ast.GraphPatternGroup; +import com.bigdata.rdf.sparql.ast.IGroupMemberNode; +import com.bigdata.rdf.sparql.ast.StatementPatternNode; +import com.bigdata.rdf.sparql.ast.service.BigdataNativeServiceOptions; +import com.bigdata.rdf.sparql.ast.service.BigdataServiceCall; +import com.bigdata.rdf.sparql.ast.service.CustomServiceFactory; +import com.bigdata.rdf.sparql.ast.service.IServiceOptions; +import com.bigdata.rdf.sparql.ast.service.ServiceCall; +import com.bigdata.rdf.sparql.ast.service.ServiceCallCreateParams; +import com.bigdata.rdf.sparql.ast.service.ServiceNode; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.striterator.ChunkedArrayIterator; + +import cutthecrap.utils.striterators.ICloseableIterator; + +/** + * A SERVICE that exposes {@link IGASProgram}s for SPARQL execution. + * <p> + * For example, the following would run a depth-limited BFS traversal: + * + * <pre> + * PREFIX gas <http://www.bigdata.com/rdf/gas#> + * #... + * SERVICE <GAS> { + * gas:program gas:gasClass "com.bigdata.rdf.graph.analytics.BFS" + * gas:program gas:in <IRI> # one or more times, specifies the initial frontier. + * gas:program gas:out ?out # exactly once - will be bound to the visited vertices. + * gas:program gas:maxIterations 4 # optional limit on breadth first expansion. + * gas:program gas:maxVisited 2000 # optional limit on the #of visited vertices. + * gas:program gas:nthreads 4 # specify the #of threads to use (optional) + * } + * </pre> + * + * Or the following would run the FuzzySSSP algorithm. + * + * <pre> + * PREFIX gas <http://www.bigdata.com/rdf/gas#> + * #... + * SERVICE <GAS> { + * gas:program gas:gasClass "com.bigdata.rdf.graph.analytics.FuzzySSSP" + * gas:program gas:in <IRI> # one or more times, specifies the initial frontier. + * gas:program gas:target <IRI> # one or more times, identifies the target vertices and hence the paths of interest. + * gas:program gas:out ?out # exactly once - will be bound to the visited vertices laying within N-hops of the shortest paths. + * gas:program gas:maxIterations 4 # optional limit on breadth first expansion. + * gas:program gas:maxVisited 2000 # optional limit on the #of visited vertices. + * } + * </pre> + * + * TODO Also allow the execution of gas workflows. A workflow would be more + * along the lines of a callable, but one where the initial source and/or target + * vertices could be identified. Or have an interface that wraps the analytics + * (including things like FuzzySSSP) so they can declare their own arguments for + * invocation as a SERVICE. + * + * TODO The input frontier could be a variable, in which case we would pull out + * the column for that variable rather than running the algorithm once per + * source binding set, right? Or maybe not. + * + * TODO Also support export. This could be easily done using a SPARQL SELECT + * + * <pre> + * SELECT ?src ?tgt ?edgeWeight { + * <<?src linkType ?tgt> propertyType ?edgeWeight> + * } + * </pre> + * + * or (if you have a simple topology without edge weights) + * + * <pre> + * SELECT ?src ?tgt bind(?edgeWeight,1) { + * ?src linkType ?tgt + * } + * </pre> + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class GASService implements CustomServiceFactory { + + public interface Options { + + /** + * The namespace used for bigdata GAS API. + */ + String NAMESPACE = "http://www.bigdata.com/rdf/gas#"; + + /** + * Used as the subject in the GAS SERVICE invocation pattern. + */ + URI PROGRAM = new URIImpl(NAMESPACE + "program"); + + /** + * Magic predicate identifies the fully qualified class name of the + * {@link IGASProgram} to be executed. + */ + URI GAS_CLASS = new URIImpl(NAMESPACE + "gasClass"); + + /** + * The #of threads that will be used to expand the frontier in each + * iteration of the algorithm (optional, default + * {@value #DEFAULT_NTHREADS}). + * + * @see #DEFAULT_NTHREADS + */ + URI NTHREADS = new URIImpl(NAMESPACE + "nthreads"); + + int DEFAULT_NTHREADS = 4; + + /** + * The maximum #of iterations for the GAS program (optional, default + * {@value #DEFAULT_MAX_ITERATIONS}). + * + * @see #DEFAULT_MAX_ITERATIONS + */ + URI MAX_ITERATIONS = new URIImpl(NAMESPACE + "maxIterations"); + + int DEFAULT_MAX_ITERATIONS = Integer.MAX_VALUE; + + /** + * The maximum #of vertices in the visited set for the GAS program + * (optional, default {@value #DEFAULT_MAX_VISITED}). + * + * @see #DEFAULT_MAX_VISITED + */ + URI MAX_VISITED = new URIImpl(NAMESPACE + "maxVisited"); + + int DEFAULT_MAX_VISITED = Integer.MAX_VALUE; + + /** + * The {@link IGASScheduler} (default is {@link #DEFAULT_SCHEDULER}). + * Class must implement {@link IGASSchedulerImpl}. + */ + URI SCHEDULER_CLASS = new URIImpl(NAMESPACE + "schedulerClass"); + + Class<? extends IGASSchedulerImpl> DEFAULT_SCHEDULER = CHMScheduler.class; + + /** + * Magic predicate used to specify a vertex in the initial frontier. + */ + URI IN = new URIImpl(NAMESPACE + "in"); + + /** + * Magic predicate used to specify a variable that will become bound to + * each vertex in the visited set for the analytic. + * + * TODO This is not always what we want to report. We really need to + * specify the {@link IReducer} to run and then get the output of that + * to the caller. + */ + URI OUT = new URIImpl(NAMESPACE + "out"); + + } + + static private transient final Logger log = Logger + .getLogger(GASService.class); + + private final BigdataNativeServiceOptions serviceOptions; + + public GASService() { + + serviceOptions = new BigdataNativeServiceOptions(); + + /* + * TODO Review decision to make this a runFirst service. The rational is + * that this service can only apply a very limited set of restrictions + * during query, therefore it will often make sense to run it first. + * However, the fromTime and toTime could be bound by the query and the + * service can filter some things more efficiently internally than if we + * generated a bunch of intermediate solutions for those things. + */ + serviceOptions.setRunFirst(true); + + } + + /** + * The known URIs. + * <p> + * Note: We can recognize anything in {@link Options#NAMESPACE}, but the + * predicate still has to be something that we know how to interpret. + */ + static final Set<URI> gasUris; + + static { + + final Set<URI> set = new LinkedHashSet<URI>(); + + set.add(Options.PROGRAM); + + gasUris = Collections.unmodifiableSet(set); + + } + + @Override + public IServiceOptions getServiceOptions() { + + return serviceOptions; + + } + + /** + * NOP + * <p> + * {@inheritDoc} + */ + @Override + public void startConnection(BigdataSailConnection conn) { + // NOP + } + + @Override + public ServiceCall<?> create(final ServiceCallCreateParams params) { + + if (params == null) + throw new IllegalArgumentException(); + + final AbstractTripleStore store = params.getTripleStore(); + + if (store == null) + throw new IllegalArgumentException(); + + /* + * Create and return the ServiceCall object which will execute this + * query. + */ + + return new GASServiceCall(store, params.getServiceNode(), + getServiceOptions()); + + } + + /** + * Execute the service call (run the GAS program). + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * TODO Validate the service call parameters, including whether they + * are understood by the specific algorithm. + * + * TODO Both maxIterations and maxVertexSetSize should be part of + * the GASState to be constraints that are applied for any + * algorithm. + */ + private static class GASServiceCall<VS, ES, ST> implements BigdataServiceCall { + + private final AbstractTripleStore store; + private final GraphPatternGroup<IGroupMemberNode> graphPattern; + private final IServiceOptions serviceOptions; + + // options extracted from the SERVICE's graph pattern. + private final int nthreads; + private final int maxIterations; // FIXME set as limit on GASState. + private final int maxVisited; // FIXME set as limit on GASState. + private final Class<IGASProgram<VS, ES, ST>> gasClass; + private final Class<IGASSchedulerImpl> schedulerClass; + private final Value[] initialFrontier; + private final IVariable<?> outVar; + + public GASServiceCall(final AbstractTripleStore store, + final ServiceNode serviceNode, + final IServiceOptions serviceOptions) { + + if (store == null) + throw new IllegalArgumentException(); + + if (serviceNode == null) + throw new IllegalArgumentException(); + + if (serviceOptions == null) + throw new IllegalArgumentException(); + + this.store = store; + + this.graphPattern = serviceNode.getGraphPattern(); + + this.serviceOptions = serviceOptions; + + this.nthreads = ((Literal) getOnlyArg( + Options.PROGRAM, + Options.NTHREADS, + store.getValueFactory().createLiteral( + Options.DEFAULT_NTHREADS))).intValue(); + + this.maxIterations = ((Literal) getOnlyArg(Options.PROGRAM, + Options.MAX_ITERATIONS, store.getValueFactory() + .createLiteral(Options.DEFAULT_MAX_ITERATIONS))) + .intValue(); + + this.maxVisited = ((Literal) getOnlyArg( + Options.PROGRAM, + Options.MAX_VISITED, + store.getValueFactory().createLiteral( + Options.DEFAULT_MAX_VISITED))).intValue(); + + // GASProgram (required) + { + + final Literal tmp = (Literal) getOnlyArg(Options.PROGRAM, + Options.GAS_CLASS); + + if (tmp == null) + throw new IllegalArgumentException( + "Required predicate not specified: " + + Options.GAS_CLASS); + + final String className = tmp.stringValue(); + + final Class<?> cls; + try { + cls = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("No such class: " + + className); + } + + if (!IGASProgram.class.isAssignableFrom(cls)) + throw new IllegalArgumentException(Options.GAS_CLASS + + " must extend " + IGASProgram.class.getName()); + + this.gasClass = (Class<IGASProgram<VS, ES, ST>>) cls; + + } + + // Scheduler (optional). + { + + final Literal tmp = (Literal) getOnlyArg(Options.PROGRAM, + Options.SCHEDULER_CLASS); + + if (tmp == null) { + + this.schedulerClass = null; + + } else { + + final String className = tmp.stringValue(); + + final Class<?> cls; + try { + cls = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("No such class: " + + className); + } + + if (!IGASSchedulerImpl.class.isAssignableFrom(cls)) + throw new IllegalArgumentException( + Options.SCHEDULER_CLASS + " must extend " + + IGASSchedulerImpl.class.getName()); + + this.schedulerClass = (Class<IGASSchedulerImpl>) cls; + + } + + } + + // Initial frontier. + this.initialFrontier = getArg(Options.PROGRAM, Options.IN); + + // The output variable (bound to the visited set). + this.outVar = getVar(Options.PROGRAM, Options.OUT); + + } + + /** + * Return the variable associated with the first instandce of the + * specified subject and predicate in the service's graph pattern. Only + * the simple {@link StatementPatternNode}s are visited. + * + * @param s + * The subject. + * @param p + * The predicate. + * + * @return The variable -or- <code>null</code> if the specified subject + * and predicate do not appear. + */ + private IVariable<?> getVar(final URI s, final URI p) { + + if (s == null) + throw new IllegalArgumentException(); + if (p == null) + throw new IllegalArgumentException(); + + List<Value> tmp = null; + + final Iterator<IGroupMemberNode> itr = graphPattern.getChildren() + .iterator(); + + while (itr.hasNext()) { + + final IGroupMemberNode child = itr.next(); + + if (!(child instanceof StatementPatternNode)) + continue; + + final StatementPatternNode sp = (StatementPatternNode) child; + + // s and p are constants. + if (!sp.s().isConstant()) + continue; + if (!sp.p().isConstant()) + continue; + + // constants match. + if (!s.equals(sp.s().getValue())) + continue; + if (!p.equals(sp.p().getValue())) + continue; + + if (tmp == null) + tmp = new LinkedList<Value>(); + + // found an o. + return (IVariable<?>) sp.o(); + + } + + return null; // not found. + + } + + /** + * Return the object bindings from the service's graph pattern for the + * specified subject and predicate. Only the simple + * {@link StatementPatternNode}s are visited. + * + * @param s + * The subject. + * @param p + * The predicate. + * + * @return An array containing one or more bindings -or- + * <code>null</code> if the specified subject and predicate do + * not appear. + */ + private Value[] getArg(final URI s, final URI p) { + + if (s == null) + throw new IllegalArgumentException(); + if (p == null) + throw new IllegalArgumentException(); + + List<Value> tmp = null; + + final Iterator<IGroupMemberNode> itr = graphPattern.getChildren() + .iterator(); + + while (itr.hasNext()) { + + final IGroupMemberNode child = itr.next(); + + if (!(child instanceof StatementPatternNode)) + continue; + + final StatementPatternNode sp = (StatementPatternNode) child; + + // s and p are constants. + if (!sp.s().isConstant()) + continue; + if (!sp.p().isConstant()) + continue; + + // constants match. + if (!s.equals(sp.s().getValue())) + continue; + if (!p.equals(sp.p().getValue())) + continue; + + if (tmp == null) + tmp = new LinkedList<Value>(); + + // found an o. + tmp.add(sp.o().getValue()); + + } + + if (tmp == null) + return null; + + return tmp.toArray(new Value[tmp.size()]); + + } + + /** + * Return the sole {@link Value} for the given s and p. + * + * @param s + * The subject. + * @param p + * The predicate. + * + * @return The sole {@link Value} for that s and p -or- + * <code>null</code> if no value was given. + * + * @throws RuntimeException + * if there are multiple values. + */ + private Value getOnlyArg(final URI s, final URI p) { + + final Value[] tmp = getArg(s, p); + + if (tmp == null) + return null; + + if (tmp.length > 1) + throw new IllegalArgumentException("Multiple values: s=" + s + + ", p=" + p); + + return tmp[0]; + + } + + /** + * Return the sole {@link Value} for the given s and p and the default + * value if no value was explicitly provided. + * + * @param s + * The subject. + * @param p + * The predicate. + * @param def + * The default value. + * + * @return The sole {@link Value} for that s and p -or- the default + * value if no value was given. + * + * @throws RuntimeException + * if there are multiple values. + */ + private Value getOnlyArg(final URI s, final URI p, final Value def) { + + final Value tmp = getOnlyArg(s, p); + + if (tmp == null) + return def; + + return tmp; + + } + + @Override + public IServiceOptions getServiceOptions() { + + return serviceOptions; + + } + + /** + * {@inheritDoc} + * + * TODO Join with the source solutions? Or is that handled by the + * caller? + */ + @Override + public ICloseableIterator<IBindingSet> call( + final IBindingSet[] bindingSets) throws Exception { + + /* + * Try/finally pattern to setup the BigdataGASEngine, execute the + * algorithm, and return the results. + */ + IGASEngine gasEngine = null; + + try { + + gasEngine = newGasEngine(store.getIndexManager(), nthreads); + + if (schedulerClass != null) { + + ((GASEngine) gasEngine).setSchedulerClass(schedulerClass); + + } + + final IGASProgram<VS, ES, ST> gasProgram = newGASProgram(gasClass); + + final IGraphAccessor graphAccessor = newGraphAccessor(store); + + final IGASContext<VS, ES, ST> gasContext = gasEngine.newGASContext( + graphAccessor, gasProgram); + + final IGASState<VS, ES, ST> gasState = gasContext.getGASState(); + + // TODO We should look at this when extracting the parameters from the SERVICE's graph pattern. +// final FrontierEnum frontierEnum = gasProgram +// .getInitialFrontierEnum(); + + if (initialFrontier != null) { + + // Setup the initial frontier. + for (Value startingVertex : initialFrontier) { + + gasState.setFrontier(gasContext, startingVertex); + + } + + } + + final IGASStats stats = (IGASStats) gasContext.call(); + + if (log.isInfoEnabled()) { + final StringBuilder sb = new StringBuilder(); + sb.append("GAS"); + sb.append(": analytic=" + gasProgram.getClass().getSimpleName()); + sb.append(", nthreads=" + nthreads); + sb.append(", scheduler=" + ((GASState<VS, ES, ST>)gasState).getScheduler().getClass().getSimpleName()); + sb.append(", gasEngine=" + gasEngine.getClass().getSimpleName()); + sb.append(", stats=" + stats); + log.info(sb.toString()); + } + + /* TODO We should be able to run a REDUCER here, not just + * report what is in the visited set. + */ + final Set<Value> visitedSet = new ConcurrentSkipListSet<Value>(); + + gasState.reduce(new IReducer<VS, ES, ST, Void>() { + + @Override + public void visit(IGASState<VS, ES, ST> state, Value u) { + visitedSet.add(u); + } + + @Override + public Void get() { + return null; + } + }); + + final IBindingSet[] out = new IBindingSet[visitedSet.size()]; + + { + final IVariable[] vars = new IVariable[] { outVar }; + int i = 0; + for (Value v : visitedSet) { + + out[i] = new ListBindingSet(vars, + new IConstant[] { new Constant(v) }); + + } + } + + return new ChunkedArrayIterator<IBindingSet>(out); + + } finally { + + if (gasEngine != null) { + + gasEngine.shutdownNow(); + + gasEngine = null; + + } + + } + + } + + /** + * Factory for the {@link IGASEngine}. + */ + private IGASEngine newGasEngine(final IIndexManager indexManager, + final int nthreads) { + + return new BigdataGASEngine(indexManager, nthreads); + + } + + /** + * Return an instance of the {@link IGASProgram} to be evaluated. + */ + private IGASProgram<VS, ES, ST> newGASProgram( + final Class<IGASProgram<VS, ES, ST>> cls) { + + if (cls == null) + throw new IllegalArgumentException(); + + try { + + final Constructor<IGASProgram<VS, ES, ST>> ctor = cls + .getConstructor(new Class[] {}); + + final IGASProgram<VS, ES, ST> gasProgram = ctor + .newInstance(new Object[] {}); + + return gasProgram; + + } catch (Exception e) { + + throw new RuntimeException(e); + + } + + } + + /** + * Return the object used to access the as-configured graph. + */ + private IGraphAccessor newGraphAccessor(final AbstractTripleStore kb) { + + /* + * Use a read-only view (sampling depends on access to the BTree rather + * than the ReadCommittedIndex). + */ + final BigdataGraphAccessor graphAccessor = new BigdataGraphAccessor( + kb.getIndexManager(), kb.getNamespace(), kb + .getIndexManager().getLastCommitTime()); + + return graphAccessor; + + } + + } + +} Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceCall.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceCall.java 2014-02-20 12:13:07 UTC (rev 7857) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceCall.java 2014-02-20 14:19:11 UTC (rev 7858) @@ -60,13 +60,16 @@ IServiceOptions getServiceOptions(); /** - * Invoke an service. + * Invoke an service. The caller will join the results from the service with + * the solutions in the context in which the service was invoked (using a + * solution set hash join pattern). * * @param bindingSets * The binding sets flowing into the service. * * @return An iterator from which the solutions can be drained. If the * iterator is closed, the service invocation must be cancelled. + * * @throws Exception * * TODO RECHUNKING: This should probably return an This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-02-22 18:34:02
|
Revision: 7869 http://sourceforge.net/p/bigdata/code/7869 Author: mrpersonick Date: 2014-02-22 18:33:55 +0000 (Sat, 22 Feb 2014) Log Message: ----------- get the asBound predicate right when the sidVar is bound in the incoming solution Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java 2014-02-22 18:22:38 UTC (rev 7868) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java 2014-02-22 18:33:55 UTC (rev 7869) @@ -14,7 +14,9 @@ import com.bigdata.rdf.sparql.ast.IReorderableNode; import com.bigdata.rdf.sparql.ast.QueryHints; import com.bigdata.rdf.sparql.ast.QueryRoot; +import com.bigdata.rdf.sparql.ast.StatementPatternNode; import com.bigdata.rdf.sparql.ast.StaticAnalysis; +import com.bigdata.rdf.sparql.ast.VarNode; import com.bigdata.rdf.sparql.ast.eval.AST2BOpContext; import com.bigdata.rdf.sparql.ast.optimizers.ASTStaticJoinOptimizer.Annotations; @@ -231,8 +233,20 @@ order[0] = preferredFirstTail; order[1] = preferredFirstTail == 0 ? 1 : 0; } else { - order[0] = cardinality(0) <= cardinality(1) ? 0 : 1; - order[1] = cardinality(0) <= cardinality(1) ? 1 : 0; + if (cardinality(0) == cardinality(1) && + nodes.get(0) instanceof StatementPatternNode && + nodes.get(1) instanceof StatementPatternNode) { + final VarNode sid0 = ((StatementPatternNode) nodes.get(0)).sid(); + final VarNode sid1 = ((StatementPatternNode) nodes.get(1)).sid(); + if (sid0 != null && sid1 == null) { + order[0] = 1; + order[1] = 0; + } + } + if (order[0] == -1) { + order[0] = cardinality(0) <= cardinality(1) ? 0 : 1; + order[1] = cardinality(0) <= cardinality(1) ? 1 : 0; + } } return computeJoinCardinality(getTail(0), getTail(1)); } Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2014-02-22 18:22:38 UTC (rev 7868) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2014-02-22 18:33:55 UTC (rev 7869) @@ -26,6 +26,7 @@ import java.util.Map; import com.bigdata.bop.BOp; +import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; @@ -33,6 +34,7 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.impl.bnode.SidIV; +import com.bigdata.rdf.sparql.ast.FilterNode; import com.bigdata.relation.rule.IAccessPathExpander; /** @@ -353,6 +355,80 @@ @Override public SPOPredicate asBound(final IBindingSet bindingSet) { + final IVariable<?> sidVar = sid(); + + if (sidVar != null && bindingSet.isBound(sidVar)) { + + final Object obj = bindingSet.get(sidVar).get(); + + // prior predicate bound something other than a sid to a sid var + if (obj instanceof SidIV == false) { + + // inconsistent + return null; + + } + + final SidIV sidIV = (SidIV) obj; + + final ISPO spo = sidIV.getInlineValue(); + + final IV s = spo.s(); + + final IV p = spo.p(); + + final IV o = spo.o(); + + // TODO implement RDR in quads mode +// final IV c = spo.c(); + + if (this.s().isConstant() && !this.s().get().equals(s)) { + + // inconsistent + return null; + + } else { + + bindingSet.set((IVariable) this.s(), new Constant<IV>(s)); + + } + + if (this.p().isConstant() && !this.p().get().equals(p)) { + + // inconsistent + return null; + + } else { + + bindingSet.set((IVariable) this.p(), new Constant<IV>(p)); + + } + + if (this.o().isConstant() && !this.o().get().equals(o)) { + + // inconsistent + return null; + + } else { + + bindingSet.set((IVariable) this.o(), new Constant<IV>(o)); + + } + + // TODO implement RDR in quads mode +// if (this.c().isConstant() && !this.c().get().equals(c)) { +// +// // inconsistent +// return null; +// +// } else { +// +// bindingSet.set((IVariable) this.c(), new Constant<IV>(c)); +// +// } + + } + return (SPOPredicate) new SPOPredicate(argsCopy(), annotationsRef()) ._asBound(bindingSet); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-02-22 22:38:49
|
Revision: 7872 http://sourceforge.net/p/bigdata/code/7872 Author: thompsonbry Date: 2014-02-22 22:38:43 +0000 (Sat, 22 Feb 2014) Log Message: ----------- added GASService. Works. Sort of. The next steps are to interpret maxIterations and maxVertices as GASState constraints, not GASProgram options. The same for the link type URI and the access to the link weights. This will all make it much easier to paramterize the algorithms through the SPARQL SERVICE invocation. Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceRegistry.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-22 22:37:27 UTC (rev 7871) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/GASService.java 2014-02-22 22:38:43 UTC (rev 7872) @@ -57,10 +57,12 @@ import com.bigdata.rdf.graph.impl.GASState; import com.bigdata.rdf.graph.impl.bd.BigdataGASEngine.BigdataGraphAccessor; import com.bigdata.rdf.graph.impl.scheduler.CHMScheduler; +import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; import com.bigdata.rdf.sparql.ast.GraphPatternGroup; import com.bigdata.rdf.sparql.ast.IGroupMemberNode; import com.bigdata.rdf.sparql.ast.StatementPatternNode; +import com.bigdata.rdf.sparql.ast.VarNode; import com.bigdata.rdf.sparql.ast.service.BigdataNativeServiceOptions; import com.bigdata.rdf.sparql.ast.service.BigdataServiceCall; import com.bigdata.rdf.sparql.ast.service.CustomServiceFactory; @@ -79,9 +81,9 @@ * For example, the following would run a depth-limited BFS traversal: * * <pre> - * PREFIX gas <http://www.bigdata.com/rdf/gas#> + * PREFIX gas: <http://www.bigdata.com/rdf/gas#> * #... - * SERVICE <GAS> { + * SERVICE <gas#service> { * gas:program gas:gasClass "com.bigdata.rdf.graph.analytics.BFS" . * gas:program gas:in <IRI> . # one or more times, specifies the initial frontier. * gas:program gas:out ?out . # exactly once - will be bound to the visited vertices. @@ -94,9 +96,9 @@ * Or the following would run the FuzzySSSP algorithm. * * <pre> - * PREFIX gas <http://www.bigdata.com/rdf/gas#> + * PREFIX gas: <http://www.bigdata.com/rdf/gas#> * #... - * SERVICE <GAS> { + * SERVICE <gas:service> { * gas:program gas:gasClass "com.bigdata.rdf.graph.analytics.FuzzySSSP" . * gas:program gas:in <IRI> . # one or more times, specifies the initial frontier. * gas:program gas:target <IRI> . # one or more times, identifies the target vertices and hence the paths of interest. @@ -150,6 +152,11 @@ * The namespace used for bigdata GAS API. */ String NAMESPACE = "http://www.bigdata.com/rdf/gas#"; + + /** + * The URL at which the {@link GASService} will respond. + */ + URI SERVICE_KEY = new URIImpl(NAMESPACE + "service"); /** * Used as the subject in the GAS SERVICE invocation pattern. @@ -479,7 +486,7 @@ tmp = new LinkedList<Value>(); // found an o. - return (IVariable<?>) sp.o(); + return ((VarNode)sp.o()).getValueExpression(); } @@ -614,10 +621,9 @@ } /** + * Execute the GAS program. + * <p> * {@inheritDoc} - * - * TODO Join with the source solutions? Or is that handled by the - * caller? */ @Override public ICloseableIterator<IBindingSet> call( @@ -657,10 +663,19 @@ // Setup the initial frontier. for (Value startingVertex : initialFrontier) { - gasState.setFrontier(gasContext, startingVertex); + /* + * FIXME Why can't we pass in the Value (with a defined + * IV) and not the IV? This should work. Passing in the + * IV is against the grain of the API and the + * generalized abstraction as Values. Of course, having + * the IV is necessary since this is an internal, high + * performance, and close to the indices operation. + */ + gasState.setFrontier(gasContext, + ((BigdataValue) startingVertex).getIV()); } - + } // Run the analytic. @@ -702,7 +717,7 @@ int i = 0; for (Value v : visitedSet) { - out[i] = new ListBindingSet(vars, + out[i++] = new ListBindingSet(vars, new IConstant[] { new Constant(v) }); } Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceRegistry.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceRegistry.java 2014-02-22 22:37:27 UTC (rev 7871) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/ServiceRegistry.java 2014-02-22 22:38:43 UTC (rev 7872) @@ -10,6 +10,7 @@ import org.openrdf.model.URI; import org.openrdf.model.impl.URIImpl; +import com.bigdata.rdf.graph.impl.bd.GASService; import com.bigdata.rdf.sparql.ast.QueryHints; import com.bigdata.rdf.sparql.ast.cache.DescribeServiceFactory; import com.bigdata.rdf.sparql.ast.eval.SampleServiceFactory; @@ -112,6 +113,9 @@ } + // The Gather-Apply-Scatter RDF Graph Mining service. + add(GASService.Options.SERVICE_KEY, new GASService()); + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |