From: <mrp...@us...> - 2010-08-19 20:55:40
|
Revision: 3449 http://bigdata.svn.sourceforge.net/bigdata/?rev=3449&view=rev Author: mrpersonick Date: 2010-08-19 20:55:32 +0000 (Thu, 19 Aug 2010) Log Message: ----------- renamed the evaluation strategy class Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl2.java Copied: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java (from rev 3405, branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl2.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java 2010-08-19 20:55:32 UTC (rev 3449) @@ -0,0 +1,2078 @@ +package com.bigdata.rdf.sail; + +import info.aduna.iteration.CloseableIteration; +import info.aduna.iteration.EmptyIteration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Group; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Or; +import org.openrdf.query.algebra.Order; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.SameTerm; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.Union; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.Compare.CompareOp; +import org.openrdf.query.algebra.StatementPattern.Scope; +import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; +import org.openrdf.query.algebra.evaluation.iterator.FilterIterator; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import com.bigdata.BigdataStatics; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.IVariableOrConstant; +import com.bigdata.bop.ap.Predicate; +import com.bigdata.bop.constraint.EQ; +import com.bigdata.bop.constraint.EQConstant; +import com.bigdata.bop.constraint.IN; +import com.bigdata.bop.constraint.NE; +import com.bigdata.bop.constraint.NEConstant; +import com.bigdata.bop.constraint.OR; +import com.bigdata.btree.keys.IKeyBuilderFactory; +import com.bigdata.rdf.internal.DummyIV; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.internal.IVUtility; +import com.bigdata.rdf.internal.constraints.InlineEQ; +import com.bigdata.rdf.internal.constraints.InlineGE; +import com.bigdata.rdf.internal.constraints.InlineGT; +import com.bigdata.rdf.internal.constraints.InlineLE; +import com.bigdata.rdf.internal.constraints.InlineLT; +import com.bigdata.rdf.internal.constraints.InlineNE; +import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.model.BigdataValue; +import com.bigdata.rdf.rules.RuleContextEnum; +import com.bigdata.rdf.sail.BigdataSail.Options; +import com.bigdata.rdf.spo.DefaultGraphSolutionExpander; +import com.bigdata.rdf.spo.ExplicitSPOFilter; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.NamedGraphSolutionExpander; +import com.bigdata.rdf.spo.SPOPredicate; +import com.bigdata.rdf.spo.SPOStarJoin; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BD; +import com.bigdata.rdf.store.BigdataSolutionResolverator; +import com.bigdata.rdf.store.IRawTripleStore; +import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.accesspath.IBuffer; +import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.relation.rule.IProgram; +import com.bigdata.relation.rule.IQueryOptions; +import com.bigdata.relation.rule.IRule; +import com.bigdata.relation.rule.ISolutionExpander; +import com.bigdata.relation.rule.ISortOrder; +import com.bigdata.relation.rule.IStep; +import com.bigdata.relation.rule.Program; +import com.bigdata.relation.rule.QueryOptions; +import com.bigdata.relation.rule.Rule; +import com.bigdata.relation.rule.eval.ActionEnum; +import com.bigdata.relation.rule.eval.DefaultEvaluationPlanFactory2; +import com.bigdata.relation.rule.eval.IEvaluationPlanFactory; +import com.bigdata.relation.rule.eval.IJoinNexus; +import com.bigdata.relation.rule.eval.IJoinNexusFactory; +import com.bigdata.relation.rule.eval.IRuleTaskFactory; +import com.bigdata.relation.rule.eval.ISolution; +import com.bigdata.relation.rule.eval.NestedSubqueryWithJoinThreadsTask; +import com.bigdata.relation.rule.eval.RuleStats; +import com.bigdata.search.FullTextIndex; +import com.bigdata.search.IHit; +import com.bigdata.striterator.DistinctFilter; +import com.bigdata.striterator.IChunkedOrderedIterator; + +/** + * Extended to rewrite Sesame {@link TupleExpr}s onto native {@link Rule}s and + * to evaluate magic predicates for full text search, etc. Query evaluation can + * proceed either by Sesame 2 evaluation or, if {@link Options#NATIVE_JOINS} is + * enabled, then by translation of Sesame 2 query expressions into native + * {@link IRule}s and native evaluation of those {@link IRule}s. + * + * <h2>Query options</h2> + * The following summarizes how various high-level query language feature are + * mapped onto native {@link IRule}s. + * <dl> + * <dt>DISTINCT</dt> + * <dd>{@link IQueryOptions#isDistinct()}, which is realized using + * {@link DistinctFilter}.</dd> + * <dt>ORDER BY</dt> + * <dd>{@link IQueryOptions#getOrderBy()} is effected by a custom + * {@link IKeyBuilderFactory} which generates sort keys that capture the desired + * sort order from the bindings in an {@link ISolution}. Unless DISTINCT is + * also specified, the generated sort keys are made unique by appending a one up + * long integer to the key - this prevents sort keys that otherwise compare as + * equals from dropping solutions. Note that the SORT is actually imposed by the + * {@link DistinctFilter} using an {@link IKeyBuilderFactory} assembled from the + * ORDER BY constraints. + * + * FIXME BryanT - implement the {@link IKeyBuilderFactory}. + * + * FIXME MikeP - assemble the {@link ISortOrder}[] from the query and set on + * the {@link IQueryOptions}.</dd> + * <dt>OFFSET and LIMIT</dt> + * <dd> + * <p> + * {@link IQueryOptions#getSlice()}, which is effected as a conditional in + * {@link NestedSubqueryWithJoinThreadsTask} based on the + * {@link RuleStats#solutionCount}. Query {@link ISolution}s are counted as + * they are generated, but they are only entered into the {@link ISolution} + * {@link IBuffer} when the solutionCount is GE the OFFSET and LT the LIMIT. + * Query evaluation halts once the LIMIT is reached. + * </p> + * <p> + * Note that when DISTINCT and either LIMIT and/or OFFSET are specified + * together, then the LIMIT and OFFSET <strong>MUST</strong> be applied after + * the solutions have been generated since we may have to generate more than + * LIMIT solutions in order to have LIMIT <em>DISTINCT</em> solutions. We + * handle this for now by NOT translating the LIMIT and OFFSET onto the + * {@link IRule} and instead let Sesame close the iterator once it has enough + * solutions. + * </p> + * <p> + * Note that LIMIT and SLICE requires an evaluation plan that provides stable + * results. For a simple query this is achieved by setting + * {@link IQueryOptions#isStable()} to <code>true</code>. + * <p> + * For a UNION query, you must also set {@link IProgram#isParallel()} to + * <code>false</code> to prevent parallelized execution of the {@link IRule}s + * in the {@link IProgram}. + * </p> + * </dd> + * <dt>UNION</dt> + * <dd>A UNION is translated into an {@link IProgram} consisting of one + * {@link IRule} for each clause in the UNION. + * + * FIXME MikeP - implement.</dd> + * </dl> + * <h2>Filters</h2> + * The following provides a summary of how various kinds of FILTER are handled. + * A filter that is not explicitly handled is left untranslated and will be + * applied by Sesame against the generated {@link ISolution}s. + * <p> + * Whenever possible, a FILTER is translated into an {@link IConstraint} on an + * {@link IPredicate} in the generated native {@link IRule}. Some filters are + * essentially JOINs against the {@link LexiconRelation}. Those can be handled + * either as JOINs (generating an additional {@link IPredicate} in the + * {@link IRule}) or as an {@link IN} constraint, where the inclusion set is + * pre-populated by some operation on the {@link LexiconRelation}. + * <dl> + * <dt>EQ</dt> + * <dd>Translated into an {@link EQ} constraint on an {@link IPredicate}.</dd> + * <dt>NE</dt> + * <dd>Translated into an {@link NE} constraint on an {@link IPredicate}.</dd> + * <dt>IN</dt> + * <dd>Translated into an {@link IN} constraint on an {@link IPredicate}.</dd> + * <dt>OR</dt> + * <dd>Translated into an {@link OR} constraint on an {@link IPredicate}.</dd> + * <dt></dt> + * <dd></dd> + * </dl> + * <h2>Magic predicates</h2> + * <p> + * {@link BD#SEARCH} is the only magic predicate at this time. When the object + * position is bound to a constant, the magic predicate is evaluated once and + * the result is used to generate a set of term identifiers that are matches for + * the token(s) extracted from the {@link Literal} in the object position. Those + * term identifiers are then used to populate an {@link IN} constraint. The + * object position in the {@link BD#SEARCH} MUST be bound to a constant. + * </p> + * + * FIXME We are not in fact rewriting the query operation at all, simply + * choosing a different evaluation path as we go. The rewrite should really be + * isolated from the execution, e.g., in its own class. That more correct + * approach is more than I want to get into right now as we will have to define + * variants on the various operators that let us model the native rule system + * directly, e.g., an n-ary IProgram, n-ary IRule operator, an IPredicate + * operator, etc. Then we can handle evaluation using their model with anything + * re-written to our custom operators being caught by our custom evaluate() + * methods and everything else running their default methods. Definitely the + * right approach, and much easier to write unit tests. + * + * @todo REGEX : if there is a "ˆ" literal followed by a wildcard + * AND there are no flags which would cause problems (case-folding, etc) + * then the REGEX can be rewritten as a prefix scan on the lexicon, which + * is very efficient, and converted to an IN filter. When the set size is + * huge we should rewrite it as another tail in the query instead. + * <p> + * Otherwise, regex filters are left outside of the rule. We can't + * optimize that until we generate rules that perform JOINs across the + * lexicon and the spo relations (which we could do, in which case it + * becomes a constraint on that join). + * <p> + * We don't have any indices that are designed to optimize regex scans, + * but we could process a regex scan as a parallel iterator scan against + * the lexicon. + * + * @todo Roll more kinds of filters into the native {@link IRule}s as + * {@link IConstraint}s on {@link IPredicate}s. + * <p> + * isURI(), etc. can be evaluated by testing a bit flag on the term + * identifier, which is very efficient. + * <p> + * + * @todo Verify handling of datatype operations. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: BigdataEvaluationStrategyImpl.java 2272 2009-11-04 02:10:19Z + * mrpersonick $ + */ +public class BigdataEvaluationStrategyImpl extends EvaluationStrategyImpl { + + /** + * Logger. + */ + protected static final Logger log = + Logger.getLogger(BigdataEvaluationStrategyImpl.class); + +// protected static final boolean INFO = log.isInfoEnabled(); +// +// protected static final boolean DEBUG = log.isDebugEnabled(); + + protected final BigdataTripleSource tripleSource; + + protected final Dataset dataset; + + private final AbstractTripleStore database; + + private final boolean nativeJoins; + + private final boolean starJoins; + + private final boolean inlineTerms; + + // private boolean slice = false, distinct = false, union = false; + // + // // Note: defaults are illegal values. + // private long offset = -1L, limit = 0L; + // /** + // * @param tripleSource + // */ + // public BigdataEvaluationStrategyImpl(final BigdataTripleSource + // tripleSource) { + // + // this(tripleSource, null/* dataset */, false WHY FALSE? /* nativeJoins + // */); + // + // } + /** + * @param tripleSource + * @param dataset + */ + public BigdataEvaluationStrategyImpl( + final BigdataTripleSource tripleSource, final Dataset dataset, + final boolean nativeJoins, final boolean starJoins, + final boolean inlineTerms) { + + super(tripleSource, dataset); + + this.tripleSource = tripleSource; + this.dataset = dataset; + this.database = tripleSource.getDatabase(); + this.nativeJoins = nativeJoins; + this.starJoins = starJoins; + this.inlineTerms = inlineTerms; + + } + + // @Override + // public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + // org.openrdf.query.algebra.Slice slice, BindingSet bindings) + // throws QueryEvaluationException { + // /* + // * Note: Sesame has somewhat different semantics for offset and limit. + // * They are [int]s. -1 is used to indicate the the offset or limit was + // * not specified. you use hasFoo() to see if there is an offset or a + // * limit and then assign the value. For bigdata, the NOP offset is 0L + // * and the NOP limit is Long.MAX_VALUE. + // * + // * Note: We can't process the offset natively unless we remove the slice + // * from the Sesame operator tree. If we did then we would skip over the + // * first OFFSET solutions and Sesame would skip over the first OFFSET + // * solutions that we passed on, essentially doubling the offset. + // * + // * FIXME native rule slices work, but they can not be applied if there + // * is a non-native filter outside of the join. This code could be + // * modified to test for that using tuplExpr.visit(...), but really we + // * just need to do a proper rewrite of the query expressions that is + // * distinct from their evaluation! + // */ + // //// if (!slice.hasOffset()) { + // // this.slice = true; + // // this.offset = slice.hasOffset() ? slice.getOffset() : 0L; + // // this.limit = slice.hasLimit() ? slice.getLimit() : Long.MAX_VALUE; + // //// return evaluate(slice.getArg(), bindings); + // //// } + // return super.evaluate(slice, bindings); + // } + // + // @Override + // public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + // Union union, BindingSet bindings) throws QueryEvaluationException { + // this.union = true; + // return super.evaluate(union, bindings); + // } + // + // @Override + // public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + // Distinct distinct, BindingSet bindings) + // throws QueryEvaluationException { + // this.distinct = true; + // return super.evaluate(distinct, bindings); + // } + + /** + * A set of properties that act as query hints for the join nexus. + */ + private Properties queryHints; + + /** + * This is the top-level method called by the SAIL to evaluate a query. + * The TupleExpr parameter here is guaranteed to be the root of the operator + * tree for the query. Query hints are parsed by the SAIL from the + * namespaces in the original query. See {@link BD#QUERY_HINTS_NAMESPACE}. + */ + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + TupleExpr expr, BindingSet bindings, Properties queryHints) + throws QueryEvaluationException { + + // spit out the whole operator tree + if (log.isInfoEnabled()) { + log.info("operator tree:\n" + expr); + } + + this.queryHints = queryHints; + + if (log.isInfoEnabled()) { + log.info("queryHints:\n" + queryHints); + } + + return super.evaluate(expr, bindings); + + } + + + + /** + * Eventually we will want to translate the entire operator tree into a + * native bigdata program. For now this is just a means of inspecting it. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + TupleExpr expr, BindingSet bindings) + throws QueryEvaluationException { + + if (log.isDebugEnabled()) { + log.debug("tuple expr:\n" + expr); + } + + return super.evaluate(expr, bindings); + + } + + /** + * Translate top-level UNIONs into native bigdata programs for execution. + * This will attempt to look down the operator tree from this point and turn + * the Sesame operators into a set of native rules within a single program. + * <p> + * FIXME A Union is a BinaryTupleOperator composed of two expressions. This + * native evaluation only handles the special case where the left and right + * args are one of: {Join, LeftJoin, StatementPattern, Union}. It's + * possible that the left or right arg is something other than one of those + * operators, in which case we punt to the Sesame evaluation, which + * degrades performance. + * <p> + * FIXME Also, even if the left or right arg is one of the cases we handle, + * it's possible that the translation of that arg into a native rule will + * fail because of an unsupported SPARQL language feature, such as an + * embedded UNION or an unsupported filter type. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + Union union, BindingSet bindings) throws QueryEvaluationException { + + if (!nativeJoins) { + // Use Sesame 2 evaluation + return super.evaluate(union, bindings); + } + + if (log.isDebugEnabled()) { + log.debug("union:\n" + union); + } + + /* + * FIXME Another deficiency in the native rule model. We can only handle + * top-level UNIONs for now. + */ + QueryModelNode operator = union; + while ((operator = operator.getParentNode()) != null) { + if (operator instanceof LeftJoin || operator instanceof Join) { + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(operator); + } + + return super.evaluate(union, bindings); + } + } + + + try { + + IStep query = createNativeQuery(union); + + if (query == null) { + return new EmptyIteration<BindingSet, QueryEvaluationException>(); + } + + return execute(query); + + } catch (UnknownOperatorException ex) { + + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(ex.getOperator()); + } + + return super.evaluate(union, bindings); + + } + + } + + /** + * Override evaluation of StatementPatterns to recognize magic search + * predicate. + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + final StatementPattern sp, final BindingSet bindings) + throws QueryEvaluationException { + + // no check against the nativeJoins property here because we are simply + // using the native execution model to take care of magic searches. + + if (log.isDebugEnabled()) { + log.debug("evaluating statement pattern:\n" + sp); + } + + IStep query = createNativeQuery(sp); + + if (query == null) { + return new EmptyIteration<BindingSet, QueryEvaluationException>(); + } + + return execute(query); + + } + */ + + /** + * Translate top-level JOINs into native bigdata programs for execution. + * This will attempt to look down the operator tree from this point and turn + * the Sesame operators into a native rule. + * <p> + * FIXME It's possible that the translation of the left or right arg into a + * native rule will fail because of an unsupported SPARQL language feature, + * such as an embedded UNION or an unsupported filter type. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + Join join, BindingSet bindings) throws QueryEvaluationException { + + if (!nativeJoins) { + // Use Sesame 2 evaluation + return super.evaluate(join, bindings); + } + + if (log.isDebugEnabled()) { + log.debug("join:\n" + join); + } + + /* + * FIXME Another deficiency in the native rule model. If we are doing + * a join that is nested inside an optional, we don't have the + * appropriate variable bindings to arrive at the correct answer. + * Example: + * select * + * { + * :x1 :p ?v . + * OPTIONAL { :x3 :q ?w } + * OPTIONAL { :x3 :q ?w . :x2 :p ?v } + * } + * + * 1. LeftJoin + * 2. LeftJoin + * 3. StatementPattern + * 4. StatementPattern + * 5. Join + * 6. StatementPattern + * 7. StatementPattern + * + * (1) punts, because the right arg is a Join and we can't mark an + * entire Join as optional. Then, (5) makes it here, to the evaluate + * method. But we can't evaluate it in isolation, we need to pump + * the bindings in from the stuff above it. + */ + QueryModelNode operator = join; + while ((operator = operator.getParentNode()) != null) { + if (operator instanceof LeftJoin) { + + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(operator); + } + + return super.evaluate(join, bindings); + } + } + + try { + + IStep query = createNativeQuery(join); + + if (query == null) { + return new EmptyIteration<BindingSet, QueryEvaluationException>(); + } + + return execute(query); + + } catch (UnknownOperatorException ex) { + + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(ex.getOperator()); + } + + return super.evaluate(join, bindings); + + } + + } + + /** + * Translate top-level LEFTJOINs into native bigdata programs for execution. + * This will attempt to look down the operator tree from this point and turn + * the Sesame operators into a native rule. + * <p> + * FIXME It's possible that the translation of the left or right arg into a + * native rule will fail because of an unsupported SPARQL language feature, + * such as an embedded UNION or an unsupported filter type. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + LeftJoin join, BindingSet bindings) throws QueryEvaluationException { + + if (!nativeJoins) { + // Use Sesame 2 evaluation + return super.evaluate(join, bindings); + } + + if (log.isDebugEnabled()) { + log.debug("left join:\n" + join); + } + + /* + * FIXME Another deficiency in the native rule model. If we are doing + * a left join that is nested inside an optional, we don't have the + * appropriate variable bindings to arrive at the correct answer. + * Example: + * SELECT * + * { + * :x1 :p ?v . + * OPTIONAL + * { + * :x3 :q ?w . + * OPTIONAL { :x2 :p ?v } + * } + * } + * + * 1. LeftJoin + * 2. StatementPattern + * 3. LeftJoin + * 4. StatementPattern + * 5. StatementPattern + * + * (1) punts, because the right arg is a LeftJoin and we can't mark an + * entire Join as optional. Then, (3) makes it here, to the evaluate + * method. But we can't evaluate it in isolation, we need to pump + * the bindings in from the LeftJoin above it. + */ + QueryModelNode operator = join; + while ((operator = operator.getParentNode()) != null) { + if (operator instanceof LeftJoin) { + + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(operator); + } + + return super.evaluate(join, bindings); + } + } + + try { + + IStep query = createNativeQuery(join); + + if (query == null) { + return new EmptyIteration<BindingSet, QueryEvaluationException>(); + } + + return execute(query); + + } catch (UnknownOperatorException ex) { + + // Use Sesame 2 evaluation + + if (log.isInfoEnabled()) { + log.info("could not evaluate natively, punting to Sesame"); + } + if (log.isDebugEnabled()) { + log.debug(ex.getOperator()); + } + + return super.evaluate(join, bindings); + + } + + } + + /** + * This is the method that will attempt to take a top-level join or left + * join and turn it into a native bigdata rule. The Sesame operators Join + * and LeftJoin share only the common base class BinaryTupleOperator, but + * other BinaryTupleOperators are not supported by this method. Other + * specific types of BinaryTupleOperators will cause this method to throw + * an exception. + * <p> + * This method will also turn a single top-level StatementPattern into a + * rule with one predicate in it. + * <p> + * Note: As a pre-condition, the {@link Value}s in the query expression + * MUST have been rewritten as {@link BigdataValue}s and their term + * identifiers MUST have been resolved. Any term identifier that remains + * {@link IRawTripleStore#NULL} is an indication that there is no entry for + * that {@link Value} in the database. Since the JOINs are required (vs + * OPTIONALs), that means that there is no solution for the JOINs and an + * {@link EmptyIteration} is returned rather than evaluating the query. + * + * @param join + * @return native bigdata rule + * @throws UnknownOperatorException + * this exception will be thrown if the Sesame join contains any + * SPARQL language constructs that cannot be converted into + * the bigdata native rule model + * @throws QueryEvaluationException + */ + private IRule createNativeQuery(final TupleExpr join) + throws UnknownOperatorException, + QueryEvaluationException { + + if (!(join instanceof StatementPattern || + join instanceof Join || join instanceof LeftJoin || + join instanceof Filter)) { + throw new AssertionError( + "only StatementPattern, Join, and LeftJoin supported"); + } + + // flattened collection of statement patterns nested within this join, + // along with whether or not each one is optional + final Map<StatementPattern, Boolean> stmtPatterns = + new LinkedHashMap<StatementPattern, Boolean>(); + // flattened collection of filters nested within this join + final Collection<Filter> filters = new LinkedList<Filter>(); + + // will throw EncounteredUnknownTupleExprException if the join + // contains something we don't handle yet + collectStatementPatterns(join, stmtPatterns, filters); + + if (false) { + for (Map.Entry<StatementPattern, Boolean> entry : + stmtPatterns.entrySet()) { + log.debug(entry.getKey() + ", optional=" + entry.getValue()); + } + for (Filter filter : filters) { + log.debug(filter.getCondition()); + } + } + + // generate tails + Collection<IPredicate> tails = new LinkedList<IPredicate>(); + // keep a list of free text searches for later to solve a named graphs + // problem + final Map<IPredicate, StatementPattern> searches = + new HashMap<IPredicate, StatementPattern>(); + for (Map.Entry<StatementPattern, Boolean> entry : stmtPatterns + .entrySet()) { + StatementPattern sp = entry.getKey(); + boolean optional = entry.getValue(); + IPredicate tail = generateTail(sp, optional); + // encountered a value not in the database lexicon + if (tail == null) { + if (log.isDebugEnabled()) { + log.debug("could not generate tail for: " + sp); + } + if (optional) { + // for optionals, just skip the tail + continue; + } else { + // for non-optionals, skip the entire rule + return null; + } + } + if (tail.getSolutionExpander() instanceof FreeTextSearchExpander) { + searches.put(tail, sp); + } + tails.add(tail); + } + + /* + * When in quads mode, we need to go through the free text searches and + * make sure that they are properly filtered for the dataset where + * needed. Joins will take care of this, so we only need to add a filter + * when a search variable does not appear in any other tails that are + * non-optional. + * + * @todo Bryan seems to think this can be fixed with a DISTINCT JOIN + * mechanism in the rule evaluation. + */ + if (database.isQuads() && dataset != null) { + for (IPredicate search : searches.keySet()) { + final Set<URI> graphs; + StatementPattern sp = searches.get(search); + switch (sp.getScope()) { + case DEFAULT_CONTEXTS: { + /* + * Query against the RDF merge of zero or more source + * graphs. + */ + graphs = dataset.getDefaultGraphs(); + break; + } + case NAMED_CONTEXTS: { + /* + * Query against zero or more named graphs. + */ + graphs = dataset.getNamedGraphs(); + break; + } + default: + throw new AssertionError(); + } + if (graphs == null) { + continue; + } + // why would we use a constant with a free text search??? + if (search.get(0).isConstant()) { + throw new AssertionError(); + } + // get ahold of the search variable + com.bigdata.bop.Var searchVar = + (com.bigdata.bop.Var) search.get(0); + if (log.isDebugEnabled()) { + log.debug(searchVar); + } + // start by assuming it needs filtering, guilty until proven + // innocent + boolean needsFilter = true; + // check the other tails one by one + for (IPredicate<ISPO> tail : tails) { + ISolutionExpander<ISPO> expander = + tail.getSolutionExpander(); + // only concerned with non-optional tails that are not + // themselves magic searches + if (expander instanceof FreeTextSearchExpander + || tail.isOptional()) { + continue; + } + // see if the search variable appears in this tail + boolean appears = false; + for (int i = 0; i < tail.arity(); i++) { + IVariableOrConstant term = tail.get(i); + if (log.isDebugEnabled()) { + log.debug(term); + } + if (term.equals(searchVar)) { + appears = true; + break; + } + } + // if it appears, we don't need a filter + if (appears) { + needsFilter = false; + break; + } + } + // if it needs a filter, add it to the expander + if (needsFilter) { + if (log.isDebugEnabled()) { + log.debug("needs filter: " + searchVar); + } + FreeTextSearchExpander expander = (FreeTextSearchExpander) + search.getSolutionExpander(); + expander.addNamedGraphsFilter(graphs); + } + } + } + + // generate constraints + final Collection<IConstraint> constraints = + new LinkedList<IConstraint>(); + final Iterator<Filter> filterIt = filters.iterator(); + while (filterIt.hasNext()) { + final Filter filter = filterIt.next(); + final IConstraint constraint = generateConstraint(filter); + if (constraint != null) { + // remove if we are able to generate a native constraint for it + if (log.isDebugEnabled()) { + log.debug("able to generate a constraint: " + constraint); + } + filterIt.remove(); + constraints.add(constraint); + } + } + + /* + * FIXME Native slice, DISTINCT, etc. are all commented out for now. + * Except for ORDER_BY, support exists for all of these features in the + * native rules, but we need to separate the rewrite of the tupleExpr + * and its evaluation in order to properly handle this stuff. + */ + IQueryOptions queryOptions = QueryOptions.NONE; + // if (slice) { + // if (!distinct && !union) { + // final ISlice slice = new Slice(offset, limit); + // queryOptions = new QueryOptions(false/* distinct */, + // true/* stable */, null/* orderBy */, slice); + // } + // } else { + // if (distinct && !union) { + // queryOptions = QueryOptions.DISTINCT; + // } + // } + + if (log.isDebugEnabled()) { + for (IPredicate<ISPO> tail : tails) { + ISolutionExpander<ISPO> expander = tail.getSolutionExpander(); + if (expander != null) { + IAccessPath<ISPO> accessPath = database.getSPORelation() + .getAccessPath(tail); + accessPath = expander.getAccessPath(accessPath); + IChunkedOrderedIterator<ISPO> it = accessPath.iterator(); + while (it.hasNext()) { + log.debug(it.next().toString(database)); + } + } + } + } + + /* + * Collect a set of variables required beyond just the join (i.e. + * aggregation, projection, filters, etc.) + */ + Set<String> required = new HashSet<String>(); + + try { + + QueryModelNode p = join; + while (true) { + p = p.getParentNode(); + if (log.isDebugEnabled()) { + log.debug(p.getClass()); + } + if (p instanceof UnaryTupleOperator) { + required.addAll(collectVariables((UnaryTupleOperator) p)); + } + if (p instanceof QueryRoot) { + break; + } + } + + if (filters.size() > 0) { + for (Filter filter : filters) { + required.addAll(collectVariables((UnaryTupleOperator) filter)); + } + } + + } catch (Exception ex) { + throw new QueryEvaluationException(ex); + } + + IVariable[] requiredVars = new IVariable[required.size()]; + int i = 0; + for (String v : required) { + requiredVars[i++] = com.bigdata.bop.Var.var(v); + } + + if (log.isDebugEnabled()) { + log.debug("required binding names: " + Arrays.toString(requiredVars)); + } + + if (starJoins) { // database.isQuads() == false) { + if (log.isDebugEnabled()) { + log.debug("generating star joins"); + } + tails = generateStarJoins(tails); + } + + // generate native rule + IRule rule = new Rule("nativeJoin", + // @todo should serialize the query string here for the logs. + null, // head + tails.toArray(new IPredicate[tails.size()]), queryOptions, + // constraints on the rule. + constraints.size() > 0 ? constraints + .toArray(new IConstraint[constraints.size()]) : null, + null/* constants */, null/* taskFactory */, requiredVars); + + if (BigdataStatics.debug) { + System.err.println(join.toString()); + System.err.println(rule.toString()); + } + + // we have filters that we could not translate natively + if (filters.size() > 0) { + if (log.isDebugEnabled()) { + log.debug("could not translate " + filters.size() + + " filters into native constraints:"); + for (Filter filter : filters) { + log.debug("\n" + filter.getCondition()); + } + } + // use the basic filter iterator for remaining filters + rule = new ProxyRuleWithSesameFilters(rule, filters); + } + + return rule; + + } + + /** + * Collect the variables used by this <code>UnaryTupleOperator</code> so + * they can be added to the list of required variables in the query for + * correct binding set pruning. + * + * @param uto + * the <code>UnaryTupleOperator</code> + * @return + * the variables it uses + */ + protected Set<String> collectVariables(UnaryTupleOperator uto) + throws Exception { + + final Set<String> vars = new HashSet<String>(); + if (uto instanceof Projection) { + List<ProjectionElem> elems = + ((Projection) uto).getProjectionElemList().getElements(); + for (ProjectionElem elem : elems) { + vars.add(elem.getSourceName()); + } + } else if (uto instanceof MultiProjection) { + List<ProjectionElemList> elemLists = + ((MultiProjection) uto).getProjections(); + for (ProjectionElemList list : elemLists) { + List<ProjectionElem> elems = list.getElements(); + for (ProjectionElem elem : elems) { + vars.add(elem.getSourceName()); + } + } + } else if (uto instanceof Filter) { + Filter f = (Filter) uto; + ValueExpr ve = f.getCondition(); + ve.visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(Var v) throws Exception { + vars.add(v.getName()); + } + }); + } else if (uto instanceof Group) { + Group g = (Group) uto; + g.visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(Var v) { + vars.add(v.getName()); + } + }); + } else if (uto instanceof Order) { + Order o = (Order) uto; + o.visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(Var v) { + vars.add(v.getName()); + } + }); + } + return vars; + + } + + /** + * This method will take a Union and attempt to turn it into a native + * bigdata program. If either the left or right arg is a Union, the method + * will act recursively to flatten the nested Unions into a single program. + * <p> + * See comments for {@link #evaluate(Union, BindingSet)}. + * + * @param union + * @return native bigdata program + * @throws UnknownOperatorException + * this exception will be thrown if the Sesame join contains any + * SPARQL language constructs that cannot be converted into the + * bigdata native rule model + * @throws QueryEvaluationException + */ + private IProgram createNativeQuery(Union union) + throws UnknownOperatorException, + QueryEvaluationException { + + // create a new program that can run in parallel + Program program = new Program("union", true); + + TupleExpr left = union.getLeftArg(); + // if the left arg is a union, create a program for it and merge it + if (left instanceof Union) { + Program p2 = (Program) createNativeQuery((Union) left); + program.addSteps(p2.steps()); + } else if (left instanceof Join || left instanceof LeftJoin || + left instanceof Filter) { + IRule rule = createNativeQuery(left); + if (rule != null) { + if (rule instanceof ProxyRuleWithSesameFilters) { + // unfortunately I think we just have to punt to be super safe + Collection<Filter> filters = + ((ProxyRuleWithSesameFilters) rule).getSesameFilters(); + if (log.isDebugEnabled()) { + log.debug("could not translate " + filters.size() + + " filters into native constraints:"); + for (Filter filter : filters) { + log.debug("\n" + filter.getCondition()); + } + } + throw new UnknownOperatorException(filters.iterator().next()); + } + program.addStep(rule); + } + } else if (left instanceof StatementPattern) { + IRule rule = createNativeQuery((StatementPattern) left); + if (rule != null) { + program.addStep(rule); + } + } else { + throw new UnknownOperatorException(left); + } + + TupleExpr right = union.getRightArg(); + // if the right arg is a union, create a program for it and merge it + if (right instanceof Union) { + Program p2 = (Program) createNativeQuery((Union) right); + program.addSteps(p2.steps()); + } else if (right instanceof Join || right instanceof LeftJoin || + right instanceof Filter) { + IRule rule = createNativeQuery(right); + if (rule != null) { + if (rule instanceof ProxyRuleWithSesameFilters) { + // unfortunately I think we just have to punt to be super safe + Collection<Filter> filters = + ((ProxyRuleWithSesameFilters) rule).getSesameFilters(); + if (log.isDebugEnabled()) { + log.debug("could not translate " + filters.size() + + " filters into native constraints:"); + for (Filter filter : filters) { + log.debug("\n" + filter.getCondition()); + } + } + throw new UnknownOperatorException(filters.iterator().next()); + } + program.addStep(rule); + } + } else if (right instanceof StatementPattern) { + IRule rule = createNativeQuery((StatementPattern) right); + if (rule != null) { + program.addStep(rule); + } + } else { + throw new UnknownOperatorException(right); + } + + return program; + + } + + /** + * Take the supplied tuple expression and flatten all the statement patterns + * into a collection that can then be fed into a bigdata rule. So if the + * tuple expression is itself a statement pattern or a filter, simply cast + * and add it to the appropriate collection. If the tuple expression is a + * join or left join, use recursion on the left and right argument of the + * join. If the tuple expression is anything else, for example a Union, + * this method will throw an exception. Currently Unions nested inside + * of joins is not supported due to deficiencies in the native bigdata + * rule model. + * <p> + * @todo support nested Unions + * + * @param tupleExpr + * @param stmtPatterns + * @param filters + */ + private void collectStatementPatterns(final TupleExpr tupleExpr, + final Map<StatementPattern, Boolean> stmtPatterns, + final Collection<Filter> filters) + throws UnknownOperatorException { + + if (tupleExpr instanceof StatementPattern) { + stmtPatterns.put((StatementPattern) tupleExpr, Boolean.FALSE); + } else if (tupleExpr instanceof Filter) { + final Filter filter = (Filter) tupleExpr; + filters.add(filter); + final TupleExpr arg = filter.getArg(); + collectStatementPatterns(arg, stmtPatterns, filters); + } else if (tupleExpr instanceof Join) { + final Join join = (Join) tupleExpr; + final TupleExpr left = join.getLeftArg(); + final TupleExpr right = join.getRightArg(); + collectStatementPatterns(left, stmtPatterns, filters); + collectStatementPatterns(right, stmtPatterns, filters); + } else if (tupleExpr instanceof LeftJoin) { + + final LeftJoin join = (LeftJoin) tupleExpr; + + /* + * FIXME Another deficiency in the native rule model. Incorrect + * scoping of join. + * Example: + * SELECT * + * { + * ?X :name "paul" + * {?Y :name "george" . OPTIONAL { ?X :email ?Z } } + * } + * + * 1. Join + * 2. StatementPattern + * 3. LeftJoin + * 4. StatementPattern + * 5. StatementPattern + * + * (1) starts collecting its child nodes and gets to (3), which + * puts us here in the code. But this is not a case where we + * can just flatten the whole tree. (3) needs to be evaluated + * independently, as a subprogram. + */ + QueryModelNode operator = join; + while ((operator = operator.getParentNode()) != null) { + if (operator instanceof Join) { + // Use Sesame 2 evaluation + throw new UnknownOperatorException(join); + } + } + + // FIXME is this right? what about multiple optionals - do they nest? + final TupleExpr left = join.getLeftArg(); + final TupleExpr right = join.getRightArg(); + // all we know how to handle right now is a left join of: + // { StatementPattern || Join || LeftJoin } x { StatementPattern } + if (!(right instanceof StatementPattern)) { + throw new UnknownOperatorException(right); + } + final ValueExpr condition = join.getCondition(); + if (condition != null) { + /* + Filter filter = new Filter(right, condition); + // fake a filter, we just need the value expr later + filters.add(filter); + */ + // we have to punt on nested optional filters just to be safe + throw new UnknownOperatorException(join); + } + stmtPatterns.put((StatementPattern) right, Boolean.TRUE); + collectStatementPatterns(left, stmtPatterns, filters); + } else { + throw new UnknownOperatorException(tupleExpr); + } + + } + + /** + * Generate a bigdata {@link IPredicate} (tail) for the supplied + * StatementPattern. + * <p> + * As a shortcut, if the StatementPattern contains any bound values that + * are not in the database, this method will return null. + * + * @param stmtPattern + * @param optional + * @return the generated bigdata {@link Predicate} or <code>null</code> if + * the statement pattern contains bound values not in the database. + * @throws QueryEvaluationException + */ + private IPredicate generateTail(final StatementPattern stmtPattern, + final boolean optional) throws QueryEvaluationException { + + // create a solution expander for free text search if necessary + ISolutionExpander<ISPO> expander = null; + final Value predValue = stmtPattern.getPredicateVar().getValue(); + if (log.isDebugEnabled()) { + log.debug(predValue); + } + if (predValue != null && BD.SEARCH.equals(predValue)) { + final Value objValue = stmtPattern.getObjectVar().getValue(); + if (log.isDebugEnabled()) { + log.debug(objValue); + } + if (objValue != null && objValue instanceof Literal) { + expander = new FreeTextSearchExpander(database, + (Literal) objValue); + } + } + + // @todo why is [s] handled differently? + // because [s] is the variable in free text searches, no need to test + // to see if the free text search expander is in place + final IVariableOrConstant<IV> s = generateVariableOrConstant( + stmtPattern.getSubjectVar()); + if (s == null) { + return null; + } + + final IVariableOrConstant<IV> p; + if (expander == null) { + p = generateVariableOrConstant(stmtPattern.getPredicateVar()); + } else { + p = new Constant(DummyIV.INSTANCE); + } + if (p == null) { + return null; + } + + final IVariableOrConstant<IV> o; + if (expander == null) { + o = generateVariableOrConstant(stmtPattern.getObjectVar()); + } else { + o = new Constant(DummyIV.INSTANCE); + } + if (o == null) { + return null; + } + + final IVariableOrConstant<IV> c; + if (!database.isQuads()) { + /* + * Either triple store mode or provenance mode. + */ + final Var var = stmtPattern.getContextVar(); + if (var == null) { + // context position is not used. + c = null; + } else { + final Value val = var.getValue(); + if (val != null && database.isStatementIdentifiers()) { + /* + * Note: The context position is used as a statement + * identifier (SID). SIDs may be used to retrieve provenance + * statements (statements about statement) using high-level + * query. SIDs are represented as blank nodes and is not + * possible to have them bound in the original query. They + * only become bound during query evaluation. + */ + throw new QueryEvaluationException( + "Context position is a statement identifier and may not be bound in the original query: " + + stmtPattern); + } + final String name = var.getName(); + c = com.bigdata.bop.Var.var(name); + } + } else { + /* + * Quad store mode. + * + * FIXME Scale-out joins depend on knowledge of the best access path + * and the index partitions (aka shards) which it will traverse. + * Review all of the new expanders and make sure that they do not + * violate this principle. Expanders tend to lazily determine the + * access path, and I believe that RDFJoinNexus#getTailAccessPath() + * may even refuse to operate with expanders. If this is the case, + * then the choice of the access path needs to be completely coded + * into the predicate as a combination of binding or clearing the + * context variable and setting an appropriate constraint (filter). + */ + if (BigdataStatics.debug) { + if (dataset == null) { + System.err.println("No dataset."); + } else { + final int defaultGraphSize = dataset.getDefaultGraphs() + .size(); + final int namedGraphSize = dataset.getNamedGraphs().size(); + if (defaultGraphSize > 10 || namedGraphSize > 10) { + System.err.println("large dataset: defaultGraphs=" + + defaultGraphSize + ", namedGraphs=" + + namedGraphSize); + } else { + System.err.println(dataset.toString()); + } + } + System.err.println(stmtPattern.toString()); + } + if (expander != null) { + /* + * @todo can this happen? If it does then we need to look at how + * to layer the expanders. + */ + // throw new AssertionError("expander already set"); + // we are doing a free text search, no need to do any named or + // default graph expansion work + c = null; + } else { + final Var cvar = stmtPattern.getContextVar(); + if (dataset == null) { + if (cvar == null) { + /* + * There is no dataset and there is no graph variable, + * so the default graph will be the RDF Merge of ALL + * graphs in the quad store. + * + * This code path uses an "expander" which strips off + * the context information and filters for the distinct + * (s,p,o) triples to realize the RDF Merge of the + * source graphs for the default graph. + */ + c = null; + expander = new DefaultGraphSolutionExpander(null/* ALL */); + } else { + /* + * There is no data set and there is a graph variable, + * so the query will run against all named graphs and + * [cvar] will be to the context of each (s,p,o,c) in + * turn. This handles constructions such as: + * + * "SELECT * WHERE {graph ?g {?g :p :o } }" + */ + expander = new NamedGraphSolutionExpander(null/* ALL */); + c = generateVariableOrConstant(cvar); + } + } else { // dataset != null + switch (stmtPattern.getScope()) { + case DEFAULT_CONTEXTS: { + /* + * Query against the RDF merge of zero or more source + * graphs. + */ + expander = new DefaultGraphSolutionExpander(dataset + .getDefaultGraphs()); + /* + * Note: cvar can not become bound since context is + * stripped for the default graph. + */ + if (cvar == null) + c = null; + el... [truncated message content] |
From: <tho...@us...> - 2011-01-15 21:23:29
|
Revision: 4103 http://bigdata.svn.sourceforge.net/bigdata/?rev=4103&view=rev Author: thompsonbry Date: 2011-01-15 21:23:23 +0000 (Sat, 15 Jan 2011) Log Message: ----------- Javadoc on QueryHints. Bugfix to the BigdataEvaluationStrategyImpl to address problems where a smaller buffer backing the BigdataBindingSetResolverator could result in a deadlock during query (e.g., LUBM U50 Q8). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java 2011-01-15 14:59:52 UTC (rev 4102) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java 2011-01-15 21:23:23 UTC (rev 4103) @@ -1710,106 +1710,167 @@ // startId, -1/* partitionId */, // newBindingSetIterator(new HashBindingSet()))); - final IRunningQuery runningQuery = queryEngine.eval(query); + IRunningQuery runningQuery = null; + try { + + // Submit query for evaluation. + runningQuery = queryEngine.eval(query); + + // Iterator draining the query results. + final IAsynchronousIterator<IBindingSet[]> it1 = + runningQuery.iterator(); + + // De-chunk the IBindingSet[] visited by that iterator. + final IChunkedOrderedIterator<IBindingSet> it2 = + new ChunkedWrappedIterator<IBindingSet>( + new Dechunkerator<IBindingSet>(it1)); - final IAsynchronousIterator<IBindingSet[]> it1 = - runningQuery.iterator(); - -// final IChunkedOrderedIterator<IBindingSet> it2 = -// new ChunkedArraysIterator<IBindingSet>(it1); - final IChunkedOrderedIterator<IBindingSet> it2 = new ChunkedWrappedIterator<IBindingSet>( - new Dechunkerator<IBindingSet>(it1)); - - CloseableIteration<BindingSet, QueryEvaluationException> result = - new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( - new BigdataBindingSetResolverator(database, it2).start(database - .getExecutorService())); + // Materialize IVs as RDF Values. + CloseableIteration<BindingSet, QueryEvaluationException> result = + // Monitor IRunningQuery and cancel if Sesame iterator is closed. + new RunningQueryCloseableIteration<BindingSet, QueryEvaluationException>(runningQuery, + // Convert bigdata binding sets to Sesame binding sets. + new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( + // Materialize IVs as RDF Values. + new BigdataBindingSetResolverator(database, it2).start( + database.getExecutorService()))); + +// No - will deadlock if buffer fills up +// // Wait for the Future (checks for errors). +// runningQuery.get(); + + // use the basic filter iterator for remaining filters + if (step instanceof ProxyRuleWithSesameFilters) { + Collection<Filter> filters = + ((ProxyRuleWithSesameFilters) step).getSesameFilters(); + if (log.isInfoEnabled() && filters.size() > 0) { + log.info("could not translate " + filters.size() + + " filters into native constraints:"); + } + for (Filter filter : filters) { + if (log.isInfoEnabled()) + log.info("\n" + filter.getCondition()); + result = new FilterIterator(filter, result, this); + } + } +// // use the basic filter iterator for remaining filters +// if (sesameFilters != null) { +// for (Filter f : sesameFilters) { +// if (log.isDebugEnabled()) { +// log.debug("attaching sesame filter: " + f); +// } +// result = new FilterIterator(f, result, this); +// } +// } - /* - * FIXME This will deadlock in the buffer fills - see - * BigdataEvaluationStrategyImpl3 which contains a new code pattern for - * this. - */ - try { - // Wait for the Future (checks for errors). - runningQuery.get(); - } catch (Exception ex) { - throw new QueryEvaluationException(ex); - } - -// final boolean backchain = // -// tripleSource.getDatabase().getAxioms().isRdfSchema() -// && tripleSource.includeInferred -// && tripleSource.conn.isQueryTimeExpander(); + return result; + + } catch (Throwable t) { + if (runningQuery != null) + runningQuery.cancel(true/* mayInterruptIfRunning */); + throw new QueryEvaluationException(t); + } + +// final IRunningQuery runningQuery = queryEngine.eval(query); +// +// final IAsynchronousIterator<IBindingSet[]> it1 = +// runningQuery.iterator(); // -// if (log.isDebugEnabled()) { -// log.debug("Running tupleExpr as native rule:\n" + step); -// log.debug("backchain: " + backchain); -// } +//// final IChunkedOrderedIterator<IBindingSet> it2 = +//// new ChunkedArraysIterator<IBindingSet>(it1); +// final IChunkedOrderedIterator<IBindingSet> it2 = new ChunkedWrappedIterator<IBindingSet>( +// new Dechunkerator<IBindingSet>(it1)); // -// // run the query as a native rule. -// final IChunkedOrderedIterator<ISolution> itr1; +// CloseableIteration<BindingSet, QueryEvaluationException> result = +// new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( +// new BigdataBindingSetResolverator(database, it2).start(database +// .getExecutorService())); +// +// /* +// * FIXME This will deadlock in the buffer fills - see +// * BigdataEvaluationStrategyImpl3 which contains a new code pattern for +// * this. +// */ // try { -// final IEvaluationPlanFactory planFactory = -// DefaultEvaluationPlanFactory2.INSTANCE; -// -// /* -// * alternative evaluation orders for LUBM Q9 (default is 1 4, 2, 3, -// * 0, 5). All three evaluation orders are roughly as good as one -// * another. Note that tail[2] (z rdf:type ...) is entailed by the -// * ontology and could be dropped from evaluation. -// */ -// // final IEvaluationPlanFactory planFactory = new -// // FixedEvaluationPlanFactory( -// // // new int[] { 1, 4, 3, 0, 5, 2 } good -// // // new int[] { 1, 3, 0, 4, 5, 2 } good -// // ); -// -// final IJoinNexusFactory joinNexusFactory = database -// .newJoinNexusFactory(RuleContextEnum.HighLevelQuery, -// ActionEnum.Query, IJoinNexus.BINDINGS, -// null, // filter -// false, // justify -// backchain, // -// planFactory, // -// queryHints -// ); -// -// final IJoinNexus joinNexus = joinNexusFactory.newInstance(database -// .getIndexManager()); -// itr1 = joinNexus.runQuery(step); -// +// // Wait for the Future (checks for errors). +// runningQuery.get(); // } catch (Exception ex) { // throw new QueryEvaluationException(ex); // } // -// /* -// * Efficiently resolve term identifiers in Bigdata ISolutions to RDF -// * Values in Sesame 2 BindingSets and align the resulting iterator with -// * the Sesame 2 API. -// */ -// CloseableIteration<BindingSet, QueryEvaluationException> result = -// new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( -// new BigdataSolutionResolverator(database, itr1).start(database -// .getExecutorService())); +//// final boolean backchain = // +//// tripleSource.getDatabase().getAxioms().isRdfSchema() +//// && tripleSource.includeInferred +//// && tripleSource.conn.isQueryTimeExpander(); +//// +//// if (log.isDebugEnabled()) { +//// log.debug("Running tupleExpr as native rule:\n" + step); +//// log.debug("backchain: " + backchain); +//// } +//// +//// // run the query as a native rule. +//// final IChunkedOrderedIterator<ISolution> itr1; +//// try { +//// final IEvaluationPlanFactory planFactory = +//// DefaultEvaluationPlanFactory2.INSTANCE; +//// +//// /* +//// * alternative evaluation orders for LUBM Q9 (default is 1 4, 2, 3, +//// * 0, 5). All three evaluation orders are roughly as good as one +//// * another. Note that tail[2] (z rdf:type ...) is entailed by the +//// * ontology and could be dropped from evaluation. +//// */ +//// // final IEvaluationPlanFactory planFactory = new +//// // FixedEvaluationPlanFactory( +//// // // new int[] { 1, 4, 3, 0, 5, 2 } good +//// // // new int[] { 1, 3, 0, 4, 5, 2 } good +//// // ); +//// +//// final IJoinNexusFactory joinNexusFactory = database +//// .newJoinNexusFactory(RuleContextEnum.HighLevelQuery, +//// ActionEnum.Query, IJoinNexus.BINDINGS, +//// null, // filter +//// false, // justify +//// backchain, // +//// planFactory, // +//// queryHints +//// ); +//// +//// final IJoinNexus joinNexus = joinNexusFactory.newInstance(database +//// .getIndexManager()); +//// itr1 = joinNexus.runQuery(step); +//// +//// } catch (Exception ex) { +//// throw new QueryEvaluationException(ex); +//// } +//// +//// /* +//// * Efficiently resolve term identifiers in Bigdata ISolutions to RDF +//// * Values in Sesame 2 BindingSets and align the resulting iterator with +//// * the Sesame 2 API. +//// */ +//// CloseableIteration<BindingSet, QueryEvaluationException> result = +//// new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( +//// new BigdataSolutionResolverator(database, itr1).start(database +//// .getExecutorService())); +// +// // use the basic filter iterator for remaining filters +// if (step instanceof ProxyRuleWithSesameFilters) { +// Collection<Filter> filters = +// ((ProxyRuleWithSesameFilters) step).getSesameFilters(); +// if (log.isInfoEnabled() && filters.size() > 0) { +// log.info("could not translate " + filters.size() +// + " filters into native constraints:"); +// } +// for (Filter filter : filters) { +// if (log.isInfoEnabled()) +// log.info("\n" + filter.getCondition()); +// result = new FilterIterator(filter, result, this); +// } +// } +// +// return result; - // use the basic filter iterator for remaining filters - if (step instanceof ProxyRuleWithSesameFilters) { - Collection<Filter> filters = - ((ProxyRuleWithSesameFilters) step).getSesameFilters(); - if (log.isInfoEnabled() && filters.size() > 0) { - log.info("could not translate " + filters.size() - + " filters into native constraints:"); - } - for (Filter filter : filters) { - if (log.isInfoEnabled()) - log.info("\n" + filter.getCondition()); - result = new FilterIterator(filter, result, this); - } - } - - return result; - } // /** Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java 2011-01-15 14:59:52 UTC (rev 4102) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java 2011-01-15 21:23:23 UTC (rev 4103) @@ -56,11 +56,22 @@ */ String NAMESPACE = "BIGDATA_QUERY_HINTS"; - /** - * Specify the query optimizer. - * - * @see QueryOptimizerEnum - */ + /** + * Specify the query optimizer. For example, you can disable the query + * optimizer using + * + * <pre> + * PREFIX BIGDATA_QUERY_HINTS: <http://www.bigdata.com/queryHints#com.bigdata.rdf.sail.QueryHints.optimizer=None> + * </pre> + * + * Disabling the query optimizer can be useful if you have a query for which + * the static query optimizer is producing a inefficient join ordering. With + * the query optimizer disabled for that query, the joins will be run in the + * order given. This makes it possible for you to decide on the right join + * ordering for that query. + * + * @see QueryOptimizerEnum + */ String OPTIMIZER = QueryHints.class.getName() + ".optimizer"; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-03-30 18:56:41
|
Revision: 4352 http://bigdata.svn.sourceforge.net/bigdata/?rev=4352&view=rev Author: martyncutcher Date: 2011-03-30 18:56:32 +0000 (Wed, 30 Mar 2011) Log Message: ----------- First commit of JettySparqlServer, a simple sparql REST interface is fronted by the RESTServlet which delegates to the implementation servlets Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataContext.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/JettySparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SparqlCommand.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/XMLBuilder.java Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataContext.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataContext.java 2011-03-30 18:56:32 UTC (rev 4352) @@ -0,0 +1,616 @@ +package com.bigdata.rdf.sail.webapp; + +import info.aduna.xml.XMLWriter; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.StringWriter; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; + +import org.apache.log4j.Logger; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.QueryParser; +import org.openrdf.query.parser.sparql.SPARQLParserFactory; +import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter; +import org.openrdf.repository.RepositoryException; +import org.openrdf.rio.rdfxml.RDFXMLWriter; +import org.openrdf.sail.SailException; + +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.journal.TimestampUtility; +import com.bigdata.rawstore.Bytes; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.BigdataSailGraphQuery; +import com.bigdata.rdf.sail.BigdataSailRepository; +import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.sail.BigdataSailTupleQuery; +import com.bigdata.rdf.sail.bench.NanoSparqlServer; +import com.bigdata.rdf.sail.webapp.BigdataServlet.QueryType; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.util.concurrent.ThreadPoolExecutorBaseStatisticsTask; +import com.bigdata.util.httpd.NanoHTTPD; + +/** + * + * @author Martyn Cutcher + * + */ +public class BigdataContext { + /** + * The logger for the concrete {@link NanoSparqlServer} class. The {@link NanoHTTPD} + * class has its own logger. + */ + static private final Logger log = Logger.getLogger(BigdataServlet.class); + + static private BigdataContext s_context; + + private final Config m_config; + private final IIndexManager m_indexManager; + private final QueryParser m_engine; + + private final ScheduledFuture<?> m_queueStatsFuture; + private final ThreadPoolExecutorBaseStatisticsTask m_queueSampleTask; + + /** + * The currently executing queries (does not include queries where a client + * has established a connection but the query is not running because the + * {@link #queryService} is blocking). + */ + protected final ConcurrentHashMap<Long/* queryId */, RunningQuery> m_queries = new ConcurrentHashMap<Long, RunningQuery>(); + + public Map<Long, RunningQuery> getQueries() { + return m_queries; + } + + /** + * Factory for the query identifiers. + */ + protected final AtomicLong m_queryIdFactory = new AtomicLong(); + + public AtomicLong getQueryIdFactory() { + return m_queryIdFactory; + } + + /** + * This call establishes the context to run the servlets that + * use it in an embedded server. + * + * @param config + * @param indexManager + * @return the BigdataContext + */ + synchronized static public BigdataContext establishContext(final Config config, final IIndexManager indexManager) + throws SailException, RepositoryException, IOException { + if (s_context == null) { + s_context = new BigdataContext(config, indexManager); + } + + return s_context; + } + + /** + * When a servlet starts up in a web container it establishes the BigdataContext + * that will be defined by the context parameters in the web.xml file. + * + * @param context + * @return the BigdataContext + */ + synchronized static BigdataContext establishContext(ServletContext context) { + if (s_context == null) { + // TODO get config info from servlet context + } + + return s_context; + } + + static BigdataContext getContext() { + return s_context; + } + + public BigdataContext(final Config config, final IIndexManager indexManager) throws IOException, SailException, + RepositoryException { + + if (config.namespace == null) + throw new IllegalArgumentException(); + + if (indexManager == null) + throw new IllegalArgumentException(); + + m_config = config; + + m_indexManager = indexManager; + + // used to parse qeries. + m_engine = new SPARQLParserFactory().getParser(); + + if (indexManager.getCollectQueueStatistics()) { + + final long initialDelay = 0; // initial delay in ms. + final long delay = 1000; // delay in ms. + final TimeUnit unit = TimeUnit.MILLISECONDS; + + // FIXME add mechanism for stats sampling + // queueSampleTask = new ThreadPoolExecutorBaseStatisticsTask( + // (ThreadPoolExecutor) queryService); + // + // queueStatsFuture = indexManager.addScheduledTask(queueSampleTask, + // initialDelay, delay, unit); + + m_queueSampleTask = null; + + m_queueStatsFuture = null; + } else { + + m_queueSampleTask = null; + + m_queueStatsFuture = null; + + } + + } + + public void shutdownNow() { + if(log.isInfoEnabled()) + log.info("Normal shutdown."); + + // Stop collecting queue statistics. + if (m_queueStatsFuture != null) + m_queueStatsFuture.cancel(true/* mayInterruptIfRunning */); + + } + + public IIndexManager getIndexManager() { + return m_indexManager; + } + + /** + * Configuration object. + */ + public static class Config { + + /** + * When true, suppress various things otherwise written on stdout. + */ + public boolean quiet = false; + + /** + * The port on which the server will answer requests -or- ZERO to + * use any open port. + */ + public int port = 80; + + /** + * The default namespace. + */ + public String namespace; + + /** + * The default timestamp used to query the default namespace. The server + * will obtain a read only transaction which reads from the commit point + * associated with this timestamp. + */ + public long timestamp = ITx.UNISOLATED; + + /** + * The #of threads to use to handle SPARQL queries -or- ZERO (0) for an + * unbounded pool. + */ + public int queryThreadPoolSize = 8; + + /** + * The capacity of the buffers for the pipe connecting the running query + * to the HTTP response. + */ + public int bufferCapacity = Bytes.kilobyte32 * 1; + + public String resourceBase = "."; + + public Config() { + } + + } + + public Config getConfig() { + return m_config; + } + + public ThreadPoolExecutorBaseStatisticsTask getSampleTask() { + return m_queueSampleTask; + } + + public static void clear() { + if (s_context != null) { + s_context.shutdownNow(); + s_context = null; + } + } + + /** + * Abstract base class for running queries handles the timing, pipe, + * reporting, obtains the connection, and provides the finally {} semantics + * for each type of query task. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * @version $Id$ + */ + public abstract class AbstractQueryTask implements Callable<Void> { + + /** The namespace against which the query will be run. */ + private final String namespace; + + /** + * The timestamp of the view for that namespace against which the query + * will be run. + */ + private final long timestamp; + + /** The SPARQL query string. */ + protected final String queryStr; + + /** + * A symbolic constant indicating the type of query. + */ + protected final QueryType queryType; + + /** + * The negotiated MIME type to be used for the query response. + */ + protected final String mimeType; + + /** A pipe used to incrementally deliver the results to the client. */ + private final OutputStream os; + + /** + * Sesame has an option for a base URI during query evaluation. This + * provides a symbolic place holder for that URI in case we ever provide + * a hook to set it. + */ + protected final String baseURI = null; + + /** + * The queryId used by the {@link NanoSparqlServer}. + */ + protected final Long queryId; + + /** + * The queryId used by the {@link QueryEngine}. + */ + protected final UUID queryId2; + + /** + * + * @param namespace + * The namespace against which the query will be run. + * @param timestamp + * The timestamp of the view for that namespace against which + * the query will be run. + * @param queryStr + * The SPARQL query string. + * @param os + * A pipe used to incrementally deliver the results to the + * client. + */ + protected AbstractQueryTask(final String namespace, + final long timestamp, final String queryStr, + final QueryType queryType, + final String mimeType, + final OutputStream os) { + + this.namespace = namespace; + this.timestamp = timestamp; + this.queryStr = queryStr; + this.queryType = queryType; + this.mimeType = mimeType; + this.os = os; + this.queryId = Long.valueOf(m_queryIdFactory.incrementAndGet()); + this.queryId2 = UUID.randomUUID(); + + } + + /** + * Execute the query. + * + * @param cxn + * The connection. + * @param os + * Where the write the query results. + * + * @throws Exception + */ + abstract protected void doQuery(BigdataSailRepositoryConnection cxn, + OutputStream os) throws Exception; + + final public Void call() throws Exception { + final long begin = System.nanoTime(); + BigdataSailRepositoryConnection cxn = null; + try { + cxn = getQueryConnection(namespace, timestamp); + m_queries.put(queryId, new RunningQuery(queryId.longValue(),queryId2, + queryStr, begin)); + if(log.isTraceEnabled()) + log.trace("Query running..."); +// try { + doQuery(cxn, os); +// } catch(Throwable t) { +// /* +// * Log the query and the exception together. +// */ +// log.error(t.getLocalizedMessage() + ":\n" + queryStr, t); +// } + if(log.isTraceEnabled()) + log.trace("Query done - flushing results."); + os.flush(); + os.close(); + if(log.isTraceEnabled()) + log.trace("Query done - output stream closed."); + return null; + } catch (Throwable t) { + // launder and rethrow the exception. + throw BigdataServlet.launderThrowable(t, os, queryStr); + } finally { + m_queries.remove(queryId); + try { + os.close(); + } catch (Throwable t) { + log.error(t, t); + } + try { + if (cxn != null) + cxn.close(); + } catch (Throwable t) { + log.error(t, t); + } + } + } + + } + + /** + * Executes a tuple query. + */ + private class TupleQueryTask extends AbstractQueryTask { + + public TupleQueryTask(final String namespace, final long timestamp, + final String queryStr, final QueryType queryType, + final String mimeType, final OutputStream os) { + + super(namespace, timestamp, queryStr, queryType, mimeType, os); + + } + + protected void doQuery(final BigdataSailRepositoryConnection cxn, + final OutputStream os) throws Exception { + + final BigdataSailTupleQuery query = cxn.prepareTupleQuery( + QueryLanguage.SPARQL, queryStr, baseURI); + + if (true) { + StringWriter strw = new StringWriter(); + + query.evaluate(new SPARQLResultsXMLWriter(new XMLWriter(strw))); + + OutputStreamWriter outstr = new OutputStreamWriter(os); + String res = strw.toString(); + outstr.write(res); + outstr.flush(); + outstr.close(); + } else { + query.evaluate(new SPARQLResultsXMLWriter(new XMLWriter(os))); + } + } + + } + + /** + * Executes a graph query. + */ + private class GraphQueryTask extends AbstractQueryTask { + + public GraphQueryTask(final String namespace, final long timestamp, + final String queryStr, final QueryType queryType, + final String mimeType, final OutputStream os) { + + super(namespace, timestamp, queryStr, queryType, mimeType, os); + + } + + @Override + protected void doQuery(final BigdataSailRepositoryConnection cxn, + final OutputStream os) throws Exception { + + final BigdataSailGraphQuery query = cxn.prepareGraphQuery( + QueryLanguage.SPARQL, queryStr, baseURI); + + query.evaluate(new RDFXMLWriter(os)); + + } + + } + + /** + * Return the task which will execute the query. + * + * @param queryStr + * The query. + * @param os + * Where the task will write its output. + * + * @return The task. + * + * @throws MalformedQueryException + */ + public AbstractQueryTask getQueryTask(final String namespace, + final long timestamp, final String queryStr, + final HttpServletRequest req, + final OutputStream os) throws MalformedQueryException { + + /* + * Parse the query so we can figure out how it will need to be executed. + * + * Note: This will fail a query on its syntax. However, the logic used + * in the tasks to execute a query will not fail a bad query for some + * reason which I have not figured out yet. Therefore, we are in the + * position of having to parse the query here and then again when it is + * executed. + */ + final ParsedQuery q = m_engine.parseQuery(queryStr, null/*baseURI*/); + + if(log.isDebugEnabled()) + log.debug(q.toString()); + + final QueryType queryType = QueryType + .fromQuery(queryStr); + + final String mimeType; + switch (queryType) { + case ASK: + /* + * FIXME handle ASK. + */ + break; + case DESCRIBE: + case CONSTRUCT: + // FIXME Conneg for the mime type for construct/describe! + mimeType = BigdataServlet.MIME_RDF_XML; + return new GraphQueryTask(namespace, timestamp, queryStr, + queryType, mimeType, os); + case SELECT: + mimeType = BigdataServlet.MIME_SPARQL_RESULTS_XML; + return new TupleQueryTask(namespace, timestamp, queryStr, + queryType, mimeType, os); + } + + throw new RuntimeException("Unknown query type: " + queryType); + + } + + /** + * Metadata about running queries. + */ + static class RunningQuery { + + /** + * The unique identifier for this query for the {@link NanoSparqlServer}. + */ + final long queryId; + + /** + * The unique identifier for this query for the {@link QueryEngine}. + * + * @see QueryEngine#getRunningQuery(UUID) + */ + final UUID queryId2; + + /** The query. */ + final String query; + + /** The timestamp when the query was accepted (ns). */ + final long begin; + + public RunningQuery(final long queryId, final UUID queryId2, + final String query, final long begin) { + + this.queryId = queryId; + + this.queryId2 = queryId2; + + this.query = query; + + this.begin = begin; + + } + + } + + /** + * Return a read-only transaction which will read from the commit point + * associated with the given timestamp. + * + * @param namespace + * The namespace. + * @param timestamp + * The timestamp. + * + * @throws RepositoryException + * + * @todo enforce historical query by making sure timestamps conform (we do + * not want to allow read/write tx queries unless update semantics are + * introduced ala SPARQL 1.1). + * + * @todo Use a distributed read-only tx for queries (it would be nice if a + * tx used 2PL to specify which namespaces it could touch). + */ + public BigdataSailRepositoryConnection getQueryConnection( + final String namespace, final long timestamp) + throws RepositoryException { + + // resolve the default namespace. + final AbstractTripleStore tripleStore = (AbstractTripleStore) getIndexManager() + .getResourceLocator().locate(namespace, timestamp); + + if (tripleStore == null) { + + throw new RuntimeException("Not found: namespace=" + namespace + + ", timestamp=" + TimestampUtility.toString(timestamp)); + + } + + /* + * Since the kb exists, wrap it as a sail. + * + * @todo cache? close when not in use any more? + */ + final BigdataSail sail = new BigdataSail(tripleStore); + + final BigdataSailRepository repo = new BigdataSailRepository(sail); + + repo.initialize(); + + return (BigdataSailRepositoryConnection) repo + .getReadOnlyConnection(timestamp); + + } + + QueryServlet m_queryServlet; + public void registerServlet(QueryServlet queryServlet) { + m_queryServlet = queryServlet; + } + + public QueryServlet getQueryServlet() { + return m_queryServlet; + } + + DeleteServlet m_deleteServlet; + public void registerServlet(DeleteServlet deleteServlet) { + m_deleteServlet = deleteServlet; + } + + public DeleteServlet getDeleteServlet() { + return m_deleteServlet; + } + + UpdateServlet m_updateServlet; + public void registerServlet(UpdateServlet updateServlet) { + m_updateServlet = updateServlet; + } + public UpdateServlet getUpdateServlet() { + return m_updateServlet; + } + + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataContext.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2011-03-30 18:56:32 UTC (rev 4352) @@ -0,0 +1,663 @@ +package com.bigdata.rdf.sail.webapp; + +import info.aduna.xml.XMLWriter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter; +import org.openrdf.repository.RepositoryException; +import org.openrdf.rio.rdfxml.RDFXMLWriter; + +import com.bigdata.bop.BufferAnnotations; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.btree.IndexMetadata; +import com.bigdata.journal.IAtomicStore; +import com.bigdata.journal.IBufferStrategy; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.Journal; +import com.bigdata.journal.RWStrategy; +import com.bigdata.journal.TimestampUtility; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.BigdataSailGraphQuery; +import com.bigdata.rdf.sail.BigdataSailRepository; +import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.sail.BigdataSailTupleQuery; +import com.bigdata.rdf.sail.webapp.BigdataContext.Config; +import com.bigdata.rdf.sail.webapp.BigdataContext.RunningQuery; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.relation.AbstractResource; +import com.bigdata.relation.RelationSchema; +import com.bigdata.rwstore.RWStore; +import com.bigdata.sparse.ITPS; +import com.bigdata.util.concurrent.ThreadPoolExecutorBaseStatisticsTask; +import com.bigdata.util.httpd.NanoHTTPD; + +public class BigdataServlet extends HttpServlet { + + /** + * The logger for the concrete {@link NanoSparqlServer} class. The {@link NanoHTTPD} + * class has its own logger. + */ + static private final Logger log = Logger.getLogger(BigdataServlet.class); + + /** + * The character set used for the response (not negotiated). + */ + static protected final String charset = "UTF-8"; + + /** + * Some HTTP response status codes + */ + public static final int + HTTP_OK = HttpServletResponse.SC_ACCEPTED, + HTTP_REDIRECT = HttpServletResponse.SC_TEMPORARY_REDIRECT, + HTTP_FORBIDDEN = HttpServletResponse.SC_FORBIDDEN, + HTTP_NOTFOUND = HttpServletResponse.SC_NOT_FOUND, + HTTP_BADREQUEST = HttpServletResponse.SC_BAD_REQUEST, + HTTP_METHOD_NOT_ALLOWED = HttpServletResponse.SC_METHOD_NOT_ALLOWED, + HTTP_INTERNALERROR = HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + HTTP_NOTIMPLEMENTED = HttpServletResponse.SC_NOT_IMPLEMENTED; + + /** + * Common mime types for dynamic content + */ + public static final String + MIME_TEXT_PLAIN = "text/plain", + MIME_TEXT_HTML = "text/html", + MIME_TEXT_XML = "text/xml", + MIME_DEFAULT_BINARY = "application/octet-stream", + MIME_APPLICATION_XML = "application/xml", + MIME_TEXT_JAVASCRIPT = "text/javascript", + /** + * The traditional encoding of URL query parameters within a POST + * message body. + */ + MIME_APPLICATION_URL_ENCODED = "application/x-www-form-urlencoded"; + + /** + * A SPARQL results set in XML. + */ + static public final String MIME_SPARQL_RESULTS_XML = "application/sparql-results+xml"; + + /** + * RDF/XML. + */ + static public final String MIME_RDF_XML = "application/rdf+xml"; + + /** + * Helper class to figure out the type of a query. + */ + public static enum QueryType { + + ASK(0), + DESCRIBE(1), + CONSTRUCT(2), + SELECT(3); + + private final int order; + + private QueryType(final int order) { + this.order = order; + } + + private static QueryType getQueryType(final int order) { + switch (order) { + case 0: + return ASK; + case 1: + return DESCRIBE; + case 2: + return CONSTRUCT; + case 3: + return SELECT; + default: + throw new IllegalArgumentException("order=" + order); + } + } + + /** + * Used to note the offset at which a keyword was found. + */ + static private class P implements Comparable<P> { + + final int offset; + final QueryType queryType; + + public P(final int offset, final QueryType queryType) { + this.offset = offset; + this.queryType = queryType; + } + /** Sort into descending offset. */ + public int compareTo(final P o) { + return o.offset - offset; + } + } + + /** + * Hack returns the query type based on the first occurrence of the + * keyword for any known query type in the query. + * + * @param queryStr + * The query. + * + * @return The query type. + */ + static public QueryType fromQuery(final String queryStr) { + + // force all to lower case. + final String s = queryStr.toUpperCase(); + + final int ntypes = QueryType.values().length; + + final P[] p = new P[ntypes]; + + int nmatch = 0; + for (int i = 0; i < ntypes; i++) { + + final QueryType queryType = getQueryType(i); + + final int offset = s.indexOf(queryType.toString()); + + if (offset == -1) + continue; + + p[nmatch++] = new P(offset, queryType); + + } + + if (nmatch == 0) { + + throw new RuntimeException( + "Could not determine the query type: " + queryStr); + + } + + Arrays.sort(p, 0/* fromIndex */, nmatch/* toIndex */); + + final P tmp = p[0]; + +// System.out.println("QueryType: offset=" + tmp.offset + ", type=" +// + tmp.queryType); + + return tmp.queryType; + + } + + } + + + + IIndexManager getIndexManager() { + return getContext().getIndexManager(); + } + + protected BigdataSailRepositoryConnection getQueryConnection( + final String namespace, final long timestamp) + throws RepositoryException { + return getContext().getQueryConnection(namespace, timestamp); + } + + protected AtomicLong getQueryIdFactory() { + return getContext().getQueryIdFactory(); + } + + protected Map<Long, RunningQuery> getQueries() { + return getContext().getQueries(); + } + /** + * Return a list of the registered {@link AbstractTripleStore}s. + */ + protected List<String> getNamespaces() { + + // the triple store namespaces. + final List<String> namespaces = new LinkedList<String>(); + + // scan the relation schema in the global row store. + final Iterator<ITPS> itr = (Iterator<ITPS>) getIndexManager() + .getGlobalRowStore().rangeIterator(RelationSchema.INSTANCE); + + while (itr.hasNext()) { + + // A timestamped property value set is a logical row with + // timestamped property values. + final ITPS tps = itr.next(); + + // If you want to see what is in the TPS, uncomment this. +// System.err.println(tps.toString()); + + // The namespace is the primary key of the logical row for the + // relation schema. + final String namespace = (String) tps.getPrimaryKey(); + + // Get the name of the implementation class + // (AbstractTripleStore, SPORelation, LexiconRelation, etc.) + final String className = (String) tps.get(RelationSchema.CLASS) + .getValue(); + + try { + final Class<?> cls = Class.forName(className); + if (AbstractTripleStore.class.isAssignableFrom(cls)) { + // this is a triple store (vs something else). + namespaces.add(namespace); + } + } catch (ClassNotFoundException e) { + log.error(e,e); + } + + } + + return namespaces; + + } + + /** + * Return the namespace which will be used to execute the query. The + * namespace is represented by the first component of the URI. If there is + * no namespace, then return the configured default namespace. + * + * @param uri + * The URI path string. + * + * @return The namespace. + */ + protected String getNamespace(final String uri) { + +// // locate the "//" after the protocol. +// final int index = uri.indexOf("//"); + + int snmsp = uri.indexOf("/namespace/"); + + if (snmsp == -1) { + // use the default namespace. + return getConfig().namespace; + } + + // locate the next "/" in the URI path. + final int beginIndex = uri.indexOf('/', snmsp + 1/* fromIndex */); + + // locate the next "/" in the URI path. + int endIndex = uri.indexOf('/', beginIndex + 1/* fromIndex */); + + if (endIndex == -1) { + // use the rest of the URI. + endIndex = uri.length(); + } + + // return the namespace. + return uri.substring(beginIndex + 1, endIndex); + + } + + Config getConfig() { + return getContext().getConfig(); + } + + BigdataContext getContext() { + return BigdataContext.getContext(); + } + + /** + * Return the timestamp which will be used to execute the query. The uri + * query parameter <code>timestamp</code> may be used to communicate the + * desired commit time against which the query will be issued. If that uri + * query parameter is not given then the default configured commit time will + * be used. Applications may create protocols for sharing interesting commit + * times as reported by {@link IAtomicStore#commit()} or by a distributed + * data loader (for scale-out). + * + * @todo the configured timestamp should only be used for the default + * namespace (or it should be configured for each graph explicitly, or + * we should bundle the (namespace,timestamp) together as a single + * object). + */ + protected long getTimestamp(final String uri, + final HttpServletRequest req) { + final String timestamp = req.getParameter("timestamp"); + + if (timestamp == null) { + return getConfig().timestamp; + } + + return Long.valueOf(timestamp); + + } + + void buildResponse(HttpServletResponse resp, int status, String mimeType) throws IOException { + resp.setStatus(status); + resp.setContentType(mimeType); + } + + void buildResponse(HttpServletResponse resp, int status, String mimeType, String content) throws IOException { + buildResponse(resp, status, mimeType); + + resp.getWriter().print(content); + } + + void buildResponse(HttpServletResponse resp, int status, String mimeType, InputStream content) throws IOException { + buildResponse(resp, status, mimeType); + + final OutputStream outstr = resp.getOutputStream(); + final byte[] buf = new byte[1024]; + while (true) { + int rdlen = content.read(buf); + if (rdlen <= 0) { + break; + } + outstr.write(buf, 0, rdlen); + } + } + + /** + * Write the stack trace onto the output stream. This will show up in the + * client's response. This code path should be used iff we have already + * begun writing the response. Otherwise, an HTTP error status should be + * used instead. + * + * @param t + * The thrown error. + * @param os + * The stream on which the response will be written. + * @param queryStr + * The query string (if available). + * + * @return The laundered exception. + * + * @throws Exception + */ + public static RuntimeException launderThrowable(final Throwable t, + final OutputStream os, final String queryStr) throws Exception { + try { + // log an error for the service. + log.error(t, t); + } finally { + // ignore any problems here. + } + if (os != null) { + try { + final PrintWriter w = new PrintWriter(os); + if (queryStr != null) { + /* + * Write the query onto the output stream. + */ + w.write(queryStr); + w.write("\n"); + } + /* + * Write the stack trace onto the output stream. + */ + t.printStackTrace(w); + w.flush(); + // flush the output stream. + os.flush(); + } finally { + // ignore any problems here. + } + try { + // ensure output stream is closed. + os.close(); + } catch (Throwable t2) { + // ignore any problems here. + } + } + if (t instanceof RuntimeException) { + return (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof Exception) { + throw (Exception) t; + } else + throw new RuntimeException(t); + } + + /** + * Return various interesting metadata about the KB state. + * + * @todo The range counts can take some time if the cluster is heavily + * loaded since they must query each shard for the primary statement + * index and the TERM2ID index. + */ + protected StringBuilder getKBInfo(final String namespace, + final long timestamp) { + + final StringBuilder sb = new StringBuilder(); + + BigdataSailRepositoryConnection conn = null; + + try { + + conn = getQueryConnection(namespace, timestamp); + + final AbstractTripleStore tripleStore = conn.getTripleStore(); + + sb.append("class\t = " + tripleStore.getClass().getName() + "\n"); + + sb + .append("indexManager\t = " + + tripleStore.getIndexManager().getClass() + .getName() + "\n"); + + sb.append("namespace\t = " + tripleStore.getNamespace() + "\n"); + + sb.append("timestamp\t = " + + TimestampUtility.toString(tripleStore.getTimestamp()) + + "\n"); + + sb.append("statementCount\t = " + tripleStore.getStatementCount() + + "\n"); + + sb.append("termCount\t = " + tripleStore.getTermCount() + "\n"); + + sb.append("uriCount\t = " + tripleStore.getURICount() + "\n"); + + sb.append("literalCount\t = " + tripleStore.getLiteralCount() + "\n"); + + /* + * Note: The blank node count is only available when using the told + * bnodes mode. + */ + sb + .append("bnodeCount\t = " + + (tripleStore.getLexiconRelation() + .isStoreBlankNodes() ? "" + + tripleStore.getBNodeCount() : "N/A") + + "\n"); + + sb.append(IndexMetadata.Options.BTREE_BRANCHING_FACTOR + + "=" + + tripleStore.getSPORelation().getPrimaryIndex() + .getIndexMetadata().getBranchingFactor() + "\n"); + + sb.append(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY + + "=" + + tripleStore.getSPORelation().getPrimaryIndex() + .getIndexMetadata() + .getWriteRetentionQueueCapacity() + "\n"); + + sb.append(BigdataSail.Options.STAR_JOINS + "=" + + conn.getRepository().getSail().isStarJoins() + "\n"); + + sb.append("-- All properties.--\n"); + + // get the triple store's properties from the global row store. + final Map<String, Object> properties = getIndexManager() + .getGlobalRowStore().read(RelationSchema.INSTANCE, + namespace); + + // write them out, + for (String key : properties.keySet()) { + sb.append(key + "=" + properties.get(key)+"\n"); + } + + /* + * And show some properties which can be inherited from + * AbstractResource. These have been mainly phased out in favor of + * BOP annotations, but there are a few places where they are still + * in use. + */ + + sb.append("-- Interesting AbstractResource effective properties --\n"); + + sb.append(AbstractResource.Options.CHUNK_CAPACITY + "=" + + tripleStore.getChunkCapacity() + "\n"); + + sb.append(AbstractResource.Options.CHUNK_OF_CHUNKS_CAPACITY + "=" + + tripleStore.getChunkOfChunksCapacity() + "\n"); + + sb.append(AbstractResource.Options.CHUNK_TIMEOUT + "=" + + tripleStore.getChunkTimeout() + "\n"); + + sb.append(AbstractResource.Options.FULLY_BUFFERED_READ_THRESHOLD + "=" + + tripleStore.getFullyBufferedReadThreshold() + "\n"); + + sb.append(AbstractResource.Options.MAX_PARALLEL_SUBQUERIES + "=" + + tripleStore.getMaxParallelSubqueries() + "\n"); + + /* + * And show some interesting effective properties for the KB, SPO + * relation, and lexicon relation. + */ + sb.append("-- Interesting KB effective properties --\n"); + + sb + .append(AbstractTripleStore.Options.TERM_CACHE_CAPACITY + + "=" + + tripleStore + .getLexiconRelation() + .getProperties() + .getProperty( + AbstractTripleStore.Options.TERM_CACHE_CAPACITY, + AbstractTripleStore.Options.DEFAULT_TERM_CACHE_CAPACITY) + "\n"); + + /* + * And show several interesting properties with their effective + * defaults. + */ + + sb.append("-- Interesting Effective BOP Annotations --\n"); + + sb.append(BufferAnnotations.CHUNK_CAPACITY + + "=" + + tripleStore.getProperties().getProperty( + BufferAnnotations.CHUNK_CAPACITY, + "" + BufferAnnotations.DEFAULT_CHUNK_CAPACITY) + + "\n"); + + sb + .append(BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY + + "=" + + tripleStore + .getProperties() + .getProperty( + BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, + "" + + BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY) + + "\n"); + + sb.append(BufferAnnotations.CHUNK_TIMEOUT + + "=" + + tripleStore.getProperties().getProperty( + BufferAnnotations.CHUNK_TIMEOUT, + "" + BufferAnnotations.DEFAULT_CHUNK_TIMEOUT) + + "\n"); + + sb.append(PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS + + "=" + + tripleStore.getProperties().getProperty( + PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS, + "" + PipelineJoin.Annotations.DEFAULT_MAX_PARALLEL_CHUNKS) + "\n"); + + sb + .append(IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD + + "=" + + tripleStore + .getProperties() + .getProperty( + IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, + "" + + IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD) + + "\n"); + + // sb.append(tripleStore.predicateUsage()); + + if (tripleStore.getIndexManager() instanceof Journal) { + + final Journal journal = (Journal) tripleStore.getIndexManager(); + + final IBufferStrategy strategy = journal.getBufferStrategy(); + + if (strategy instanceof RWStrategy) { + + final RWStore store = ((RWStrategy) strategy).getRWStore(); + + store.showAllocators(sb); + + } + + } + + } catch (Throwable t) { + + log.warn(t.getMessage(), t); + + } finally { + + if(conn != null) { + try { + conn.close(); + } catch (RepositoryException e) { + log.error(e, e); + } + + } + + } + + return sb; + + } + + protected ThreadPoolExecutorBaseStatisticsTask getSampleTask() { + return getContext().getSampleTask(); + } + + <T> TreeMap<Long, T> getQueryMap() { + return new TreeMap<Long, T>(new Comparator<Long>() { + /** + * Comparator puts the entries into descending order by the query + * execution time (longest running queries are first). + */ + public int compare(final Long o1, final Long o2) { + if(o1.longValue()<o2.longValue()) return 1; + if(o1.longValue()>o2.longValue()) return -1; + return 0; + } + }); + } + + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java 2011-03-30 18:56:32 UTC (rev 4352) @@ -0,0 +1,380 @@ +package com.bigdata.rdf.sail.webapp; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.query.parser.sparql.SPARQLParserFactory; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.RDFParserFactory; +import org.openrdf.rio.RDFParserRegistry; +import org.openrdf.rio.helpers.RDFHandlerBase; +import org.openrdf.rio.rdfxml.RDFXMLParser; +import org.openrdf.sail.SailException; + +import com.bigdata.journal.ITx; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; +import com.bigdata.rdf.sail.webapp.BigdataContext.AbstractQueryTask; +import com.bigdata.rdf.store.AbstractTripleStore; + +public class DeleteServlet extends BigdataServlet { + /** + * The logger for the concrete {@link BigdataServlet} class. + */ + static private final Logger log = Logger.getLogger(BigdataServlet.class); + + public DeleteServlet() { + + getContext().registerServlet(this); + } + + /** + * REST DELETE. There are two forms for this operation. + * + * <pre> + * DELETE [/namespace/NAMESPACE] + * ... + * Content-Type + * ... + * + * BODY + * + * </pre> + * <p> + * BODY contains RDF statements according to the specified Content-Type. + * Statements parsed from the BODY are deleted from the addressed namespace. + * </p> + * <p> + * -OR- + * </p> + * + * <pre> + * DELETE [/namespace/NAMESPACE] ?query=... + * </pre> + * <p> + * Where <code>query</code> is a CONSTRUCT or DESCRIBE query. Statements are + * materialized using the query from the addressed namespace are deleted + * from that namespace. + * </p> + */ + public void doDelete(final HttpServletRequest req, final HttpServletResponse resp) { + + final String contentType = req.getContentType(); + + final String queryStr = req.getRequestURI(); + + if (contentType != null) { + + doDeleteWithBody(req, resp); + + } else if (queryStr != null) { + + doDeleteWithQuery(req, resp); + + } else { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + + } + + /** + * Delete all statements materialized by a DESCRIBE or CONSTRUCT query. + * <p> + * Note: To avoid materializing the statements, this runs the query against + * the last commit time. This is done while it is holding the unisolated + * connection which prevents concurrent modifications. Therefore the entire + * SELECT + DELETE operation is ACID. + */ + private void doDeleteWithQuery(final HttpServletRequest req, final HttpServletResponse resp) { + + final String baseURI = "";// @todo baseURI query parameter? + + final String namespace = getNamespace(req.getRequestURI()); + + final String queryStr = req.getParameter("query"); + + if(queryStr == null) + throw new UnsupportedOperationException(); + + if (log.isInfoEnabled()) + log.info("delete with query: "+queryStr); + + try { + + // resolve the default namespace. + final AbstractTripleStore tripleStore = (AbstractTripleStore) getIndexManager() + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + if (tripleStore == null) { + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Not found: namespace=" + namespace); + return; + } + + /* + * Note: pipe is drained by this thread to consume the query + * results, which are the statements to be deleted. + */ + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + + final AbstractQueryTask queryTask = BigdataContext.getContext().getQueryTask(namespace, + ITx.READ_COMMITTED, queryStr, req, + bos); + + switch (queryTask.queryType) { + case DESCRIBE: + case CONSTRUCT: + break; + default: + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Must be DESCRIBE or CONSTRUCT query."); + return; + } + + // invoke query, writing statements into temporary OS + queryTask.call(); + + final AtomicLong nmodified = new AtomicLong(0L); + + // Wrap with SAIL. + final BigdataSail sail = new BigdataSail(tripleStore); + BigdataSailConnection conn = null; + try { + + sail.initialize(); + + // get the unisolated connection. + conn = sail.getConnection(); + + final RDFXMLParser rdfParser = new RDFXMLParser( + tripleStore.getValueFactory()); + + rdfParser.setVerifyData(false); + + rdfParser.setStopAtFirstError(true); + + rdfParser + .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); + + rdfParser.setRDFHandler(new RemoveStatementHandler(conn, nmodified)); + + /* + * Run the parser, which will cause statements to be + * deleted. + */ + rdfParser.parse(new ByteArrayInputStream(bos.toByteArray()), baseURI); + + // Commit the mutation. + conn.commit(); + + } finally { + + if (conn != null) + conn.close(); + +// sail.shutDown(); + + } + + buildResponse(resp, HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() + + " statements modified."); + + } catch (Throwable t) { + + throw launderThrowable(t, resp.getOutputStream(), queryStr); + + } + + } catch (Exception ex) { + + // Will be rendered as an INTERNAL_ERROR. + throw new RuntimeException(ex); + + } + + } + + /** + * The delete servlet can delete statements posted in the body of + * the request. Note that the DELETE method cannot be used directly for + * this since it is not permitted to open an OutputStream for a DELETE + * request. + */ + public void doPost(final HttpServletRequest req, final HttpServletResponse resp) { + + final String contentType = req.getContentType(); + + final String queryStr = req.getRequestURI(); + + if (contentType != null) { + + doDeleteWithBody(req, resp); + + } else if (queryStr != null) { + + doDeleteWithQuery(req, resp); + + } else { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + } + + } + + /** + * DELETE request with a request body containing the statements to be + * removed. + */ + private void doDeleteWithBody(final HttpServletRequest req, final HttpServletResponse resp) { + + final String baseURI = "";// @todo baseURI query parameter? + + final String namespace = getNamespace(req.getRequestURI()); + + final String contentType = req.getContentType(); + + if (contentType == null) + throw new UnsupportedOperationException(); + + if (log.isInfoEnabled()) + log.info("Request body: " + contentType); + + try { + + // resolve the default namespace. + final AbstractTripleStore tripleStore = (AbstractTripleStore) getIndexManager() + .getResourceLocator().locate(namespace, ITx.UNISOLATED); + + if (tripleStore == null) { + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Not found: namespace=" + namespace); + return; + } + + final AtomicLong nmodified = new AtomicLong(0L); + + // Wrap with SAIL. + final BigdataSail sail = new BigdataSail(tripleStore); + BigdataSailConnection conn = null; + try { + + sail.initialize(); + conn = sail.getConnection(); + + /* + * There is a request body, so let's try and parse it. + */ + + final RDFFormat format = RDFFormat.forMIMEType(contentType); + + if (format == null) { + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Content-Type not recognized as RDF: " + + contentType); + + return; + } + + final RDFParserFactory rdfParserFactory = RDFParserRegistry + .getInstance().get(format); + + if (rdfParserFactory == null) { + buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, + "Parser not found: Content-Type=" + contentType); + return; + } + + final RDFParser rdfParser = rdfParserFactory.getParser(); + + rdfParser.setValueFactory(tripleStore.getValueFactory()); + + rdfParser.setVerifyData(true); + + rdfParser.setStopAtFirstError(true); + + rdfParser + .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); + + rdfParser.setRDFHandler(new RemoveStatementHandler(conn, + nmodified)); + + /* + * Run the parser, which will cause statements to be deleted. + */ + rdfParser.parse(req.getInputStream(), baseURI); + + // Commit the mutation. + conn.commit(); + + buildResponse(resp, HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() + + " statements modified."); + + } finally { + + if (conn != null) + conn.close(); + +// sail.shutDown(); + + } + + } catch (Exception ex) { + + // Will be rendered as an INTERNAL_ERROR. + throw new RuntimeException(ex); + + } + + } + /** + * Helper class removes statements from the sail as they are visited by a parser. + */ + private static class RemoveStatementHandler extends RDFHandlerBase { + + private final BigdataSailConnection conn; + private final AtomicLong nmodified; + + public RemoveStatementHandler(final BigdataSailConnection conn, + final AtomicLong nmodified) { + this.conn = conn; + this.nmodified = nmodified; + } + + Resource[] nullArray = new Resource[]{}; + public void handleStatement(Statement stmt) throws RDFHandlerException { + + try { + Resource context = stmt.getContext(); + + conn.removeStatements(// + stmt.getSubject(), // + stmt.getPredicate(), // + stmt.getObject(), // + (Resource[]) (context == null ? nullArray + : new Resource[] { context })// + ); + + } catch (SailException e) { + + throw new RDFHandlerException(e); + + } + + nmodified.incrementAndGet(); + + } + + } + + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/JettySparqlServer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/JettySparqlServer.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/JettySparqlServer.java 2011-03-30 18:56:32 UTC (rev 4352) @@ -0,0 +1,188 @@ +package com.bigdata.rdf.sail.webapp; + +import java.io.IOException; +import java.util.TreeMap; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.bigdata.journal.IIndexManager; +import com.bigdata.rdf.sail.webapp.BigdataContext.Config; + +/** + * The JettySparqlServer enables easy embedding of the SparqlServlet. + * + * To invoke from the commandline use the JettySparqlCommand. + * + * The server provides an embeddable alternative to a standard web application + * deployment. + * + * @author martyn Cutcher + * + */ +public class JettySparqlServer extends Server { + + static private final Logger log = Logger.getLogger(JettySparqlServer.class); + + int m_port = -1; // allow package visibility from JettySparqlCommand + + final TreeMap<String, Handler> m_handlerMap = new TreeMap<String, Handler>(); + + public JettySparqlServer(int port) { + super(port); // creates Connector for specified port + } + + public void handle(String target, Request baseRequest, HttpServletRequest req, HttpServletResponse resp) { + try { + if (log.isDebugEnabled()) + log.debug("Lookingup " + target); + + Handler handler = m_handlerMap.get(lookup(target)); + if (handler != null) { + handler.handle(target, baseRequest, req, resp); + baseRequest.setHandled(true); + } else { + + if (log.isDebugEnabled()) + log.debug("Calling default handler"); + + super.handle(target, baseRequest, req, resp); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ServletException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private Object lookup(String target) { + int spi = target.indexOf("/", 1); + + String ret = spi == -1 ? target : target.substring(0, spi); + + return ret; + } + + public void setupHandlers(final Config config, final IIndexManager indexManager) throws SailException, + RepositoryException, IOException { + ResourceHandler resource_handler = new ResourceHandler(); + resource_handler.setDirectoriesListed(true); + resource_handler.setWelcomeFiles(new String[] { "index.html" }); + + resource_handler.setResourceBase(config.resourceBase); + + BigdataContext.establishContext(config, indexManager); + + // embedded setup + m_handlerMap.put("/status", new ServletHandler(new StatusServlet())); + m_handlerMap.put("/query", new ServletHandler(new QueryServlet())); + m_handlerMap.put("/update", new ServletHandler(new UpdateServlet())); + m_handlerMap.put("/delete", new ServletHandler(new DeleteServlet())); + m_handlerMap.put("/REST", new ServletHandler(new RESTServlet())); + + // the "stop" handler is only relevant for the embedded server + m_handlerMap.put("/stop", new AbstractHandler() { + public void handle(String arg0, Request arg1, HttpServletRequest arg2, HttpServletResponse resp) + throws IOException, ServletException { + try { + resp.getWriter().println("Server Stop request received"); + shutdownNow(); + } catch (InterruptedException e) { + // okay + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + HandlerList handlers = new HandlerList(); + handlers.setHandlers(new Handler[] { resource_handler, new DefaultHandler() }); + + setHandler(handlers); + } + + public void startup(final Config config, final IIndexManager indexManager) throws Exception { + setupHandlers(config, indexManager); + + log.info("Calling start"); + + start(); + + m_port = getConnectors()[0].getLocalPort(); + + m_running = true; + + log.info("Running on port: " + m_port); + } + + static Handler makeContextHandler(Handler handler, String spec) { + ContextHandler context = new ContextHandler(); + context.setContextPath(spec); + context.setResourceBase("."); + + context.setClassLoader(Thread.currentThread().getContextClassLoader()); + + context.setHandler(handler); + + return context; + + } + + static class ServletHandler extends AbstractHandler { + final HttpServlet m_servlet; + + ServletHandler(HttpServlet servlet) { + m_servlet = servlet; + } + + public void handle(String arg0, Request jettyRequest, HttpServletRequest req, HttpServletResponse resp) + throws IOException, ServletException { + + m_servlet.service(req, resp); + + jettyRequest.setHandled(true); + } + + } + + boolean m_running = false; + + public boolean isOpen() { + return m_running; + } + + /** + * Shutdown the context releasing any resources and stop the server. + * @throws Exception + */ + public void shutdownNow() throws Exception { + BigdataContext.clear(); + + stop(); + + + + m_port = -1; + m_running = false; + } + + public int getPort() { + return m_port; + } +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bi... [truncated message content] |
From: <tho...@us...> - 2011-05-16 19:48:00
|
Revision: 4510 http://bigdata.svn.sourceforge.net/bigdata/?rev=4510&view=rev Author: thompsonbry Date: 2011-05-16 19:47:53 +0000 (Mon, 16 May 2011) Log Message: ----------- Resolution for https://sourceforge.net/apps/trac/bigdata/ticket/283 (query hint to provide access to the IRunningQuery interface by assigning the UUID of the query). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-05-16 16:56:16 UTC (rev 4509) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-05-16 19:47:53 UTC (rev 4510) @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +30,6 @@ import org.openrdf.query.algebra.And; import org.openrdf.query.algebra.Bound; import org.openrdf.query.algebra.Compare; -import org.openrdf.query.algebra.Compare.CompareOp; import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.Group; import org.openrdf.query.algebra.IsBNode; @@ -51,17 +51,17 @@ import org.openrdf.query.algebra.Regex; import org.openrdf.query.algebra.SameTerm; import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.StatementPattern.Scope; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.UnaryTupleOperator; import org.openrdf.query.algebra.Union; import org.openrdf.query.algebra.ValueConstant; import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.Compare.CompareOp; +import org.openrdf.query.algebra.StatementPattern.Scope; import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; import org.openrdf.query.algebra.evaluation.iterator.FilterIterator; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.impl.MapBindingSet; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpUtility; @@ -70,16 +70,17 @@ import com.bigdata.bop.IConstant; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; -import com.bigdata.bop.IPredicate.Annotations; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.IPredicate.Annotations; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.bop.constraint.INBinarySearch; import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.solutions.ISortOrder; import com.bigdata.btree.IRangeQuery; @@ -95,21 +96,21 @@ import com.bigdata.rdf.internal.constraints.IsLiteralBOp; import com.bigdata.rdf.internal.constraints.IsURIBOp; import com.bigdata.rdf.internal.constraints.MathBOp; -import com.bigdata.rdf.internal.constraints.MathBOp.MathOp; import com.bigdata.rdf.internal.constraints.NotBOp; import com.bigdata.rdf.internal.constraints.OrBOp; import com.bigdata.rdf.internal.constraints.RangeBOp; import com.bigdata.rdf.internal.constraints.SPARQLConstraint; import com.bigdata.rdf.internal.constraints.SameTermBOp; +import com.bigdata.rdf.internal.constraints.MathBOp.MathOp; import com.bigdata.rdf.lexicon.LexiconRelation; import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.sail.BigdataSail.Options; import com.bigdata.rdf.sail.sop.SOp; import com.bigdata.rdf.sail.sop.SOp2BOpUtility; import com.bigdata.rdf.sail.sop.SOpTree; -import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; import com.bigdata.rdf.sail.sop.SOpTreeBuilder; import com.bigdata.rdf.sail.sop.UnsupportedOperatorException; +import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; import com.bigdata.rdf.spo.DefaultGraphSolutionExpander; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; @@ -122,6 +123,7 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.IProgram; import com.bigdata.relation.rule.IQueryOptions; @@ -892,21 +894,45 @@ } + /** + * Return an {@link IAsynchronousIterator} that will read a single, empty + * {@link IBindingSet}. + * + * @param bindingSet + * the binding set. + */ + private ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet bindingSet) { + + return new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { bindingSet } }); + + } + CloseableIteration<BindingSet, QueryEvaluationException> doEvaluateNatively(final PipelineOp query, final BindingSet bs, final QueryEngine queryEngine, final IVariable[] required ) throws QueryEvaluationException { + // Use either the caller's UUID or a random UUID. + final String queryIdStr = queryHints == null ? null : queryHints + .getProperty(QueryHints.QUERYID); + + final UUID queryId = queryIdStr == null ? UUID.randomUUID() : UUID + .fromString(queryIdStr); + + // Wrap the input binding sets (or an empty binding set if there is no + // input). + final IAsynchronousIterator<IBindingSet[]> source = newBindingSetIterator(bs != null ? toBindingSet(bs) + : new ListBindingSet()); + IRunningQuery runningQuery = null; try { - - // Submit query for evaluation. - if (bs != null) - runningQuery = queryEngine.eval(query, toBindingSet(bs)); - else - runningQuery = queryEngine.eval(query); + // Submit query for evaluation. + runningQuery = queryEngine.eval(queryId, query, source); + /* * Wrap up the native bigdata query solution iterator as Sesame * compatible iteration with materialized RDF Values. Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java 2011-05-16 16:56:16 UTC (rev 4509) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/QueryHints.java 2011-05-16 19:47:53 UTC (rev 4510) @@ -27,8 +27,13 @@ package com.bigdata.rdf.sail; +import java.util.UUID; + import com.bigdata.bop.BOp; import com.bigdata.bop.controller.SubqueryHashJoinOp; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; /** * Query hint directives understood by a bigdata SPARQL end point. @@ -116,4 +121,36 @@ */ String DEFAULT_HASH_JOIN = "false"; + /** + * The {@link UUID} to be assigned to the {@link IRunningQuery} (optional). + * This query hint makes it possible for the application to assign the + * {@link UUID} under which the query will run. This can be used to locate + * the {@link IRunningQuery} using its {@link UUID} and gather metadata + * about the query during its evaluation. The {@link IRunningQuery} may be + * used to monitor the query or even cancel a query. + * <p> + * The {@link UUID} of each query MUST be distinct. When using this query + * hint the application assumes responsibility for applying + * {@link UUID#randomUUID()} to generate a unique {@link UUID} for the + * query. The application may then discover the {@link IRunningQuery} using + * {@link QueryEngineFactory#getQueryController(com.bigdata.journal.IIndexManager)} + * and {@link QueryEngine#getQuery(UUID)}. + * <p> + * Note: The openrdf iteration interface has a close() method, but this can + * not be invoked until hasNext() has run and the first solution has been + * materialized. For queries which use an "at-once" operator, such as ORDER + * BY, the query will run to completion before hasNext() returns. This means + * that it is effectively impossible to interrupt a running query which uses + * an ORDER BY clause from the SAIL. However, applications MAY use this + * query hint to discovery the {@link IRunningQuery} interface and cancel + * the query. + * + * <pre> + * PREFIX BIGDATA_QUERY_HINTS: <http://www.bigdata.com/queryHints#com.bigdata.rdf.sail.QueryHints.queryId=36cff615-aaea-418a-bb47-006699702e45> + * </pre> + * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/283 + */ + String QUERYID = QueryHints.class.getName() + ".queryId"; + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-06-20 17:30:30
|
Revision: 4743 http://bigdata.svn.sourceforge.net/bigdata/?rev=4743&view=rev Author: mrpersonick Date: 2011-06-20 17:30:23 +0000 (Mon, 20 Jun 2011) Log Message: ----------- fixed the bottom-up test failures Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-06-20 14:41:55 UTC (rev 4742) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-06-20 17:30:23 UTC (rev 4743) @@ -446,6 +446,17 @@ } + } catch (BadlyDesignedLeftJoinIteratorException ex) { + + log.warn("badly designed left join"); + + // turn off native joins for the remainder, we can't do + // partial execution + nativeJoins = false; + + // defer to Sesame + return super.evaluate(union, bs); + } } @@ -503,6 +514,17 @@ } + } catch (BadlyDesignedLeftJoinIteratorException ex) { + + log.warn("badly designed left join"); + + // turn off native joins for the remainder, we can't do + // partial execution + nativeJoins = false; + + // defer to Sesame + return super.evaluate(join, bs); + } } @@ -560,6 +582,17 @@ } + } catch (BadlyDesignedLeftJoinIteratorException ex) { + + log.warn("badly designed left join"); + + // turn off native joins for the remainder, we can't do + // partial execution + nativeJoins = false; + + // defer to Sesame + return super.evaluate(leftJoin, bs); + } } @@ -613,6 +646,17 @@ } + } catch (BadlyDesignedLeftJoinIteratorException ex) { + + log.warn("badly designed left join"); + + // turn off native joins for the remainder, we can't do + // partial execution + nativeJoins = false; + + // defer to Sesame + return super.evaluate(filter, bs); + } } @@ -632,7 +676,13 @@ * Note: Do not wrap as a different exception type. The caller is * looking for this. */ - throw new UnsupportedOperatorException(ex); + throw ex; + } catch(BadlyDesignedLeftJoinIteratorException ex) { + /* + * Note: Do not wrap as a different exception type. The caller is + * looking for this. + */ + throw ex; } catch (Throwable ex) { throw new QueryEvaluationException(ex); } @@ -655,7 +705,7 @@ * to work with. */ sopTree = stb.collectSOps(root); - + /* * We need to prune groups that contain terms that do not appear in * our lexicon. @@ -807,10 +857,25 @@ sop.setBOp(bop); } else if (op instanceof Filter) { final Filter filter = (Filter) op; - final ValueExpr ve = filter.getCondition(); -// try { + + /* + * If the scope binding names are empty we can definitely + * always fail the filter (since the filter's variables + * cannot be bound). + */ + if (filter.getBindingNames().isEmpty()) { + final IConstraint bop = new SPARQLConstraint(SparqlTypeErrorBOp.INSTANCE); + sop.setBOp(bop); + } else { + final ValueExpr ve = filter.getCondition(); final IConstraint bop = toConstraint(ve); sop.setBOp(bop); + } + +// try { +// final ValueExpr ve = filter.getCondition(); +// final IConstraint bop = toConstraint(ve); +// sop.setBOp(bop); // } catch (UnsupportedOperatorException ex) { // /* // * If we encounter a sesame filter (ValueExpr) that we @@ -833,6 +898,7 @@ // throw new UnsupportedOperatorException(ex); // } // } + } } @@ -858,7 +924,17 @@ } } + if (log.isDebugEnabled()) { + log.debug("\n"+sopTree); + } + /* + * Check whether optional join is "well designed" as defined in section + * 4.2 of "Semantics and Complexity of SPARQL", 2006, Jorge Pérez et al. + */ + checkForBadlyDesignedLeftJoin(sopTree); + + /* * Gather variables required by Sesame outside of the query * evaluation (projection and global sesame filters). */ @@ -2582,4 +2658,124 @@ } + /** + * We are looking for queries of the form: + * + * P = ((?X, name, paul) OPT ((?Y, name, george) OPT (?X, email, ?Z))) + * + * i.e. variables used by the right side of a left join that are not bound + * in the parent group but are bound in groups above the parent group. + */ + private static void checkForBadlyDesignedLeftJoin(final SOpTree tree) { + + final Set<com.bigdata.bop.Var> bindings = new LinkedHashSet<com.bigdata.bop.Var>(); + + final SOpGroup root = tree.getRoot(); + + addVars(bindings, root, false); + + if (root.getChildren() != null) { + + for (SOpGroup child : root.getChildren()) { + + checkForBadlyDesignedLeftJoin(bindings, child); + + } + + } + + } + + private static void checkForBadlyDesignedLeftJoin( + final Set<com.bigdata.bop.Var> bindings, final SOpGroup group) { + + /* + * Identify problem vars: + * + * 1. Add all vars used in the group (statement patterns and filters) + * 2. Remove all vars bound by the parent group (statement patterns) + * 3. Retain all vars from the grandparent groups (statement patterns) + */ + if (SOp2BOpUtility.isOptional(group)) { + + final Set<com.bigdata.bop.Var> problemVars = + new LinkedHashSet<com.bigdata.bop.Var>(); + + addVars(problemVars, group, true); + + final Set<com.bigdata.bop.Var> parentVars = + new LinkedHashSet<com.bigdata.bop.Var>(); + + SOpGroup parent = group.getParent(); + while(SOp2BOpUtility.isUnion(parent)) + parent = parent.getParent(); + + addVars(parentVars, parent, false); + + problemVars.removeAll(parentVars); + + problemVars.retainAll(bindings); + + if (!problemVars.isEmpty()) { + + throw new BadlyDesignedLeftJoinIteratorException(); + + } + + } + + /* + * Recursively check the children. + */ + if (group.getChildren() != null) { + + addVars(bindings, group, false); + + for (SOpGroup child : group.getChildren()) { + + checkForBadlyDesignedLeftJoin(bindings, child); + + } + + } + + } + + private static void addVars(final Set<com.bigdata.bop.Var> bindings, + final SOpGroup group, final boolean includeFilters) { + + for (SOp sop : group) { + + final QueryModelNode op = sop.getOperator(); + + if (!(op instanceof StatementPattern) && !includeFilters) + continue; + + final BOp bop = sop.getBOp(); + + if (bop != null) { + + final Iterator<IVariable<?>> it = + BOpUtility.getSpannedVariables(bop); + + while (it.hasNext()) { + bindings.add((com.bigdata.bop.Var) it.next()); + } + + } + + } + + } + + public static final class BadlyDesignedLeftJoinIteratorException + extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 902970986041878456L; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-06-20 14:41:55 UTC (rev 4742) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-06-20 17:30:23 UTC (rev 4743) @@ -105,7 +105,7 @@ * A Union always appears as a single Sesame operator within a group. Its * children are the things being Unioned together. */ - private static boolean isUnion(final SOpGroup sopGroup) { + public static boolean isUnion(final SOpGroup sopGroup) { if (sopGroup.size() == 1) { final SOp sop = sopGroup.getSingletonSOp(); @@ -173,7 +173,7 @@ } - private static boolean isOptional(final SOpGroup sopGroup) { + public static boolean isOptional(final SOpGroup sopGroup) { if (sopGroup.size() == 0) { throw new IllegalArgumentException(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-06-24 15:18:04
|
Revision: 4791 http://bigdata.svn.sourceforge.net/bigdata/?rev=4791&view=rev Author: thompsonbry Date: 2011-06-24 15:17:57 +0000 (Fri, 24 Jun 2011) Log Message: ----------- Backed out the change which allowed queryHints to be null. We need to dynamically set the query hints some times (for example, the queryId from the NanoSparqlServer). We can't do that if there is a null reference rather than an empty Properties object. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailQuery.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/BigdataSPARQLParser.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailQuery.java 2011-06-24 15:10:39 UTC (rev 4790) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailQuery.java 2011-06-24 15:17:57 UTC (rev 4791) @@ -49,8 +49,11 @@ /** * Return query hints associated with this query. Query hints are embedded - * in query strings as namespaces. See {@link QueryHints#PREFIX} for more - * information. + * in query strings as namespaces. + * + * @return The query hints and never <code>null</code>. + * + * @see QueryHints */ Properties getQueryHints(); Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/BigdataSPARQLParser.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/BigdataSPARQLParser.java 2011-06-24 15:10:39 UTC (rev 4790) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sparql/BigdataSPARQLParser.java 2011-06-24 15:17:57 UTC (rev 4791) @@ -146,7 +146,7 @@ static private Properties getQueryHints(final ASTQueryContainer qc) throws MalformedQueryException { - Properties queryHints = null; + final Properties queryHints = new Properties(); final Map<String, String> prefixes = PrefixDeclProcessor.process(qc); @@ -174,10 +174,6 @@ } final String key = hint.substring(0, i); final String val = hint.substring(i + 1); - if (queryHints == null) { - // Lazily instantiate. - queryHints = new Properties(); - } queryHints.put(key, val); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |