From: <mrp...@us...> - 2011-04-06 21:28:36
|
Revision: 4376 http://bigdata.svn.sourceforge.net/bigdata/?rev=4376&view=rev Author: mrpersonick Date: 2011-04-06 21:28:29 +0000 (Wed, 06 Apr 2011) Log Message: ----------- added support for using hash joins for optional join groups Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java 2011-04-06 21:27:45 UTC (rev 4375) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryHashJoinOp.java 2011-04-06 21:28:29 UTC (rev 4376) @@ -508,6 +508,11 @@ final UnsyncLocalOutputBuffer<IBindingSet> unsyncBuffer = new UnsyncLocalOutputBuffer<IBindingSet>( joinOp.getChunkCapacity(), sink); + // Thread-local buffer iff optional sink is in use. + final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = sink2 == null ? null + : new UnsyncLocalOutputBuffer<IBindingSet>( + joinOp.getChunkCapacity(), sink2); + // The iterator draining the subquery final IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = runningSubquery .iterator(); @@ -554,7 +559,7 @@ * solutions produced by the join are rejected * by the filter. */ - src.nhits++; +// src.nhits++; if (log.isDebugEnabled()) log.debug("Join with " + src); @@ -582,11 +587,13 @@ if (log.isDebugEnabled()) log.debug("Join fails constraint(s): " + bset); - + continue; } + src.nhits++; + // strip off unnecessary variables. bset = selectVars == null ? bset : bset .copy(selectVars); @@ -616,11 +623,6 @@ * any constraint on the join. */ - // Thread-local buffer iff optional sink is in use. - final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = sink2 == null ? null - : new UnsyncLocalOutputBuffer<IBindingSet>( - joinOp.getChunkCapacity(), sink2); - for(Bucket b : map.values()) { for(SolutionHit hit : b.solutions) { @@ -633,21 +635,21 @@ if (log.isDebugEnabled()) log.debug("Optional solution: " + bs); - if (constraints != null) { - if (!BOpUtility.isConsistent(constraints, - bs)) { +// if (constraints != null) { +// if (!BOpUtility.isConsistent(constraints, +// bs)) { +// +// // Failed by the constraint on the join. +// +// if (log.isDebugEnabled()) +// log +// .debug("Optional solution failed by constraints: " +// + hit); +// +// continue; +// } +// } - // Failed by the constraint on the join. - - if (log.isDebugEnabled()) - log - .debug("Optional solution failed by constraints: " - + hit); - - continue; - } - } - if (log.isTraceEnabled()) log.trace("Output optional solution: " + bs); Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-04-06 21:27:45 UTC (rev 4375) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-04-06 21:28:29 UTC (rev 4376) @@ -62,6 +62,7 @@ import com.bigdata.bop.joinGraph.PartitionedJoinGroup; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.rdf.sail.FreeTextSearchExpander; +import com.bigdata.rdf.sail.QueryHints; import com.bigdata.rdf.sail.Rule2BOpUtility; import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; import com.bigdata.rdf.sail.sop.SOpTree.SOpGroups; @@ -262,17 +263,44 @@ idFactory, queryHints); } else { - final PipelineOp subquery = convert( - child, idFactory, db, queryEngine, queryHints); - final boolean optional = isOptional(child); - final int subqueryId = idFactory.incrementAndGet(); - left = new SubqueryOp(new BOp[]{left}, - new NV(Predicate.Annotations.BOP_ID, subqueryId),// - new NV(SubqueryOp.Annotations.SUBQUERY, subquery),// - new NV(SubqueryOp.Annotations.OPTIONAL,optional)// - ); + if (useHashJoin(queryHints)) { + + final IVariable<?>[] joinVars = + gatherHashJoinVars(join, child); + final IConstraint[] joinConstraints = + gatherHashJoinConstraints(join, child); + + final PipelineOp subquery = convert( + child, idFactory, db, queryEngine, queryHints); + final boolean optional = isOptional(child); + final int subqueryId = idFactory.incrementAndGet(); + + if (log.isInfoEnabled()) { + log.info("join vars: " + Arrays.toString(joinVars)); + log.info("join constraints: " + Arrays.toString(joinConstraints)); + } + + left = new SubqueryHashJoinOp(new BOp[]{left}, + new NV(Predicate.Annotations.BOP_ID, subqueryId),// + new NV(SubqueryOp.Annotations.SUBQUERY, subquery),// + new NV(SubqueryOp.Annotations.OPTIONAL,optional),// + new NV(SubqueryHashJoinOp.Annotations.PIPELINED, false),// + new NV(SubqueryHashJoinOp.Annotations.JOIN_VARS, joinVars), + new NV(SubqueryHashJoinOp.Annotations.CONSTRAINTS, joinConstraints) + ); + } else { + final PipelineOp subquery = convert( + child, idFactory, db, queryEngine, queryHints); + final boolean optional = isOptional(child); + final int subqueryId = idFactory.incrementAndGet(); + left = new SubqueryOp(new BOp[]{left}, + new NV(Predicate.Annotations.BOP_ID, subqueryId),// + new NV(SubqueryOp.Annotations.SUBQUERY, subquery),// + new NV(SubqueryOp.Annotations.OPTIONAL,optional)// + ); + } if (log.isInfoEnabled()) { - log.info("adding a subquery: " + subqueryId + "\n" + left); + log.info("adding a subquery:\n" + BOpUtility.toString2(left)); } } } @@ -488,66 +516,8 @@ new LinkedHashMap<IVariable<?>, Collection<Predicate>>(); final Collection<IVariable<?>> boundByHashJoins = new LinkedList<IVariable<?>>(); - if (true) { // maybe check query hints for this? - - int numSearches = 0; - { // first count the searches - for (IPredicate pred : preds) { - if (isFreeTextSearch(pred)) - numSearches++; - } - } - if (numSearches > 1) { - { // collect them up - final Iterator<Predicate> it = preds.iterator(); - while (it.hasNext()) { - final Predicate pred = it.next(); - if (isFreeTextSearch(pred)) { - // we're going to handle these separately - it.remove(); - // create a hash group for this variable - final IVariable v = (IVariable) pred.get(0); - if (hashJoins.containsKey(v)) { - throw new IllegalArgumentException( - "multiple free text searches using the same variable!!"); - } - final Collection<Predicate> hashGroup = - new LinkedList<Predicate>(); - hashGroup.add(pred); - hashJoins.put(v, hashGroup); - // add this search variables to the list of known - // bound variables - boundByHashJoins.add(v); - } - } - } - { // collect up other predicates that use the search vars - final Iterator<Predicate> it = preds.iterator(); - while (it.hasNext()) { - final Predicate pred = it.next(); - // search always binds to a literal, which can only be - // used as the 2nd arg (the object) - final BOp obj = pred.get(2); - if (obj instanceof IVariable<?>) { - final IVariable<?> v = (IVariable<?>) obj; - if (hashJoins.containsKey(v)) { - // we're going to handle these separately - it.remove(); - // add this predicate to the hash group - hashJoins.get(v).add(pred); - // add any other variables used by this tail to - // the list of known bound variables - for (BOp arg : pred.args()) { - if (arg instanceof IVariable<?>) { - boundByHashJoins.add((IVariable<?>) arg); - } - } - } - } - } - } - } - + if (useHashJoin(queryHints)) { // maybe check query hints for this? + gatherHashJoins(preds, hashJoins, boundByHashJoins); } final IVariable<?>[] required = group.getTree().getRequiredVars(); @@ -585,63 +555,32 @@ } if (hashJoins.size() > 0) { - final Set<IVariable<?>> lastVars = new LinkedHashSet<IVariable<?>>(); - final Set<IVariable<?>> joinVars = new LinkedHashSet<IVariable<?>>(); - int i = 0; + Collection<Predicate> lastGroup = null; for (Collection<Predicate> hashGroup : hashJoins.values()) { - joinVars.clear(); - if (lastVars.size() > 0) { - for (Predicate pred : hashGroup) { - for (BOp arg : pred.args()) { - if (arg instanceof IVariable<?>) { - final IVariable<?> v = (IVariable<?>) arg; - if (lastVars.contains(v)) { - joinVars.add(v); - } - } - } - } - } - lastVars.clear(); - for (Predicate pred : hashGroup) { - for (BOp arg : pred.args()) { - if (arg instanceof IVariable<?>) { - final IVariable<?> v = (IVariable<?>) arg; - lastVars.add(v); - } - } - } - if (i == 0) { + if (lastGroup == null) { left = convert(hashGroup, constraints, left, knownBound, idFactory, db, queryEngine, queryHints); } else { final PipelineOp subquery = convert(hashGroup, constraints, null/*left*/, knownBound, idFactory, db, queryEngine, queryHints); - final PipelineOp slice = new SliceOp(new BOp[] { subquery }, NV.asMap(// - new NV(BOp.Annotations.BOP_ID, idFactory.incrementAndGet()), // - new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER),// - new NV(PipelineOp.Annotations.SHARED_STATE, true)// - )); + final IVariable<?>[] joinVars = + gatherHashJoinVars(lastGroup, hashGroup); - final IVariable<?>[] joinVarsArray = - joinVars.toArray(new IVariable[joinVars.size()]); - if (log.isInfoEnabled()) { - log.info(Arrays.toString(joinVarsArray)); + log.info(Arrays.toString(joinVars)); log.info(subquery); } left = new SubqueryHashJoinOp(new BOp[]{left}, new NV(Predicate.Annotations.BOP_ID, idFactory.incrementAndGet()),// + new NV(SubqueryHashJoinOp.Annotations.SUBQUERY, subquery),// new NV(SubqueryHashJoinOp.Annotations.PIPELINED, false),// - new NV(SubqueryHashJoinOp.Annotations.SUBQUERY, slice),// - new NV(SubqueryHashJoinOp.Annotations.JOIN_VARS, joinVarsArray)); + new NV(SubqueryHashJoinOp.Annotations.JOIN_VARS, joinVars)); } - i++; + lastGroup = hashGroup; } } @@ -673,7 +612,13 @@ return pred.getAccessPathExpander() instanceof FreeTextSearchExpander; } - + + /** + * Used by hashJoins. Temporary measure. Have to do this because normal + * rule2BOp would attach all the constraints to the last tail, which would + * cause this subquery to fail. Need to be smarter about pruning the + * constraints here and then we could just run through normal rule2BOp. + */ protected static final PipelineOp convert( final Collection<Predicate> preds, final Collection<IConstraint> constraints, @@ -728,9 +673,194 @@ } - return left; + final PipelineOp slice = new SliceOp(new BOp[] { left }, NV.asMap(// + new NV(BOp.Annotations.BOP_ID, idFactory.incrementAndGet()), // + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(PipelineOp.Annotations.SHARED_STATE, true)// + )); + + return slice; } + protected static void gatherHashJoins( + final Collection<Predicate> preds, + final Map<IVariable<?>, Collection<Predicate>> hashJoins, + final Collection<IVariable<?>> boundByHashJoins) { + + int numSearches = 0; + { // first count the searches + for (IPredicate pred : preds) { + if (isFreeTextSearch(pred)) + numSearches++; + } + } + if (numSearches > 1) { + { // collect them up + final Iterator<Predicate> it = preds.iterator(); + while (it.hasNext()) { + final Predicate pred = it.next(); + if (isFreeTextSearch(pred)) { + // we're going to handle these separately + it.remove(); + // create a hash group for this variable + final IVariable v = (IVariable) pred.get(0); + if (hashJoins.containsKey(v)) { + throw new IllegalArgumentException( + "multiple free text searches using the same variable!!"); + } + final Collection<Predicate> hashGroup = + new LinkedList<Predicate>(); + hashGroup.add(pred); + hashJoins.put(v, hashGroup); + // add this search variables to the list of known + // bound variables + boundByHashJoins.add(v); + } + } + } + { // collect up other predicates that use the search vars + final Iterator<Predicate> it = preds.iterator(); + while (it.hasNext()) { + final Predicate pred = it.next(); + // search always binds to a literal, which can only be + // used as the 2nd arg (the object) + final BOp obj = pred.get(2); + if (obj instanceof IVariable<?>) { + final IVariable<?> v = (IVariable<?>) obj; + if (hashJoins.containsKey(v)) { + // we're going to handle these separately + it.remove(); + // add this predicate to the hash group + hashJoins.get(v).add(pred); + // add any other variables used by this tail to + // the list of known bound variables + for (BOp arg : pred.args()) { + if (arg instanceof IVariable<?>) { + boundByHashJoins.add((IVariable<?>) arg); + } + } + } + } + } + } + } + + } + protected static IVariable<?>[] gatherHashJoinVars( + final SOpGroup group1, + final SOpGroup group2) { + + final Collection<Predicate> p1 = new LinkedList<Predicate>(); + final Collection<Predicate> p2 = new LinkedList<Predicate>(); + + for (SOp sop : group1) { + final BOp bop = sop.getBOp(); + if (bop instanceof Predicate) + p1.add((Predicate) bop); + } + + for (SOp sop : group2) { + final BOp bop = sop.getBOp(); + if (bop instanceof Predicate) + p2.add((Predicate) bop); + } + + return gatherHashJoinVars(p1, p2); + + } + + protected static IVariable<?>[] gatherHashJoinVars( + final Collection<Predicate> group1, + final Collection<Predicate> group2) { + + final Set<IVariable<?>> vars = new LinkedHashSet<IVariable<?>>(); + final Set<IVariable<?>> joinVars = new LinkedHashSet<IVariable<?>>(); + + for (Predicate pred : group1) { + for (BOp arg : pred.args()) { + if (arg instanceof IVariable<?>) { + final IVariable<?> v = (IVariable<?>) arg; + vars.add(v); + } + } + } + + if (vars.size() > 0) { + for (Predicate pred : group2) { + for (BOp arg : pred.args()) { + if (arg instanceof IVariable<?>) { + final IVariable<?> v = (IVariable<?>) arg; + if (vars.contains(v)) { + joinVars.add(v); + } + } + } + } + } + + return joinVars.toArray(new IVariable<?>[joinVars.size()]); + + } + + protected static IConstraint[] gatherHashJoinConstraints( + final SOpGroup group1, + final SOpGroup group2) { + + final Set<IVariable<?>> vars1 = new LinkedHashSet<IVariable<?>>(); + + for (SOp sop : group1) { + final BOp bop = sop.getBOp(); + if (bop instanceof Predicate) { + final Predicate pred = (Predicate) bop; + for (BOp arg : pred.args()) { + if (arg instanceof IVariable<?>) { + final IVariable<?> v = (IVariable<?>) arg; + vars1.add(v); + } + } + } + } + + // if the subquery has filters that use variables from the pipeline, + // we need to elevate those onto the HashJoin + + final Collection<IConstraint> constraints = new LinkedList<IConstraint>(); + final Collection<SOp> sopsToPrune = new LinkedList<SOp>(); + + for (SOp sop : group2) { + final BOp bop = sop.getBOp(); + if (bop instanceof IConstraint) { + final IConstraint c = (IConstraint) bop; + final Iterator<IVariable<?>> vars = BOpUtility.getSpannedVariables(c); + while (vars.hasNext()) { + final IVariable<?> v = vars.next(); + if (vars1.contains(v)) { + constraints.add(c); + sopsToPrune.add(sop); + } + } + } + } + + group2.pruneSOps(sopsToPrune); + + return constraints.toArray(new IConstraint[constraints.size()]); + + } + + protected static boolean useHashJoin(final Properties queryHints) { + final boolean hashJoin = Boolean.valueOf(queryHints.getProperty( + QueryHints.HASH_JOIN, QueryHints.DEFAULT_HASH_JOIN)); + if (log.isInfoEnabled()) { + log.info(queryHints); + log.info(queryHints.getProperty(QueryHints.HASH_JOIN)); + log.info("use hash join = " + hashJoin); + } + return hashJoin; + } + + } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java 2011-04-06 21:27:45 UTC (rev 4375) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java 2011-04-06 21:28:29 UTC (rev 4376) @@ -165,6 +165,10 @@ return group == 0; } + public void pruneSOps(final Collection<SOp> sopsToPrune) { + this.sops.removeAll(sopsToPrune); + } + } public class SOpGroups implements Iterable<SOpGroup> { Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2011-04-06 21:27:45 UTC (rev 4375) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBOps.java 2011-04-06 21:28:29 UTC (rev 4376) @@ -70,7 +70,7 @@ props.setProperty(BigdataSail.Options.AXIOMS_CLASS, NoAxioms.class.getName()); props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, NoVocabulary.class.getName()); props.setProperty(BigdataSail.Options.JUSTIFY, "false"); - props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "true"); return props; @@ -452,4 +452,130 @@ } + public void testHashJoin() throws Exception { + + final BigdataSail sail = getSail(); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + try { + + final ValueFactory vf = sail.getValueFactory(); + + final LexiconRelation lex = sail.getDatabase().getLexiconRelation(); + + final String ns = BD.NAMESPACE; + + URI mikeA = new URIImpl(ns+"MikeA"); + URI mikeB = new URIImpl(ns+"MikeB"); + URI bryan = new URIImpl(ns+"Bryan"); + URI martyn = new URIImpl(ns+"Martyn"); + URI person = new URIImpl(ns+"Person"); + URI name = new URIImpl(ns+"name"); + Literal l1 = new LiteralImpl("Mike"); + Literal l2 = new LiteralImpl("Bryan"); + Literal l3 = new LiteralImpl("Martyn"); +/**/ + cxn.setNamespace("ns", ns); + + cxn.add(mikeA, RDF.TYPE, person); + cxn.add(mikeA, name, l1); + cxn.add(mikeB, RDF.TYPE, person); + cxn.add(mikeB, name, l1); + cxn.add(bryan, RDF.TYPE, person); + cxn.add(bryan, name, l2); + cxn.add(martyn, RDF.TYPE, person); + cxn.add(martyn, name, l3); + + /* + * Note: The either flush() or commit() is required to flush the + * statement buffers to the database before executing any operations + * that go around the sail. + */ + cxn.flush();//commit(); + cxn.commit();// + + if (log.isInfoEnabled()) { + log.info("\n" + sail.getDatabase().dumpStore()); + } + + { + +// String query = +// "PREFIX "+QueryHints.PREFIX+": <"+QueryHints.NAMESPACE+QueryHints.HASH_JOIN+"=true> " + +// "PREFIX rdf: <"+RDF.NAMESPACE+"> " + +// "PREFIX rdfs: <"+RDFS.NAMESPACE+"> " + +// "PREFIX bds: <"+BD.SEARCH_NAMESPACE+"> " + +// "PREFIX ns: <"+ns+"> " + +// +// "select distinct ?s1 ?s2 " + +//// "select distinct ?s1 " + +// "WHERE { " + +// " ?o1 bds:search \"m*\" ." + +// " ?s1 ns:name ?o1 . " + +// " ?s1 rdf:type ns:Person . " + +// " ?s1 ns:name ?name . " + +// " OPTIONAL { " + +// " ?o2 bds:search \"m*\" ." + +// " ?s2 ns:name ?o2 . " + +// " ?s2 rdf:type ns:Person . " + +// " ?s2 ns:name ?name . " + +// " filter(?s1 != ?s2) . " + +// " } " + +//// " filter(!bound(?s2) || ?s1 != ?s2) . " + +// "}"; + + String query = + "PREFIX "+QueryHints.PREFIX+": <"+QueryHints.NAMESPACE+QueryHints.HASH_JOIN+"=true> " + + "PREFIX rdf: <"+RDF.NAMESPACE+"> " + + "PREFIX rdfs: <"+RDFS.NAMESPACE+"> " + + "PREFIX bds: <"+BD.SEARCH_NAMESPACE+"> " + + "PREFIX ns: <"+ns+"> " + + + "select distinct ?s1 ?s2 " + + "WHERE { " + + " ?s1 rdf:type ns:Person . " + + " ?s1 ns:name ?name . " + + " OPTIONAL { " + + " ?s2 rdf:type ns:Person . " + + " ?s2 ns:name ?name . " + + " filter(?s1 != ?s2) . " + + " } " + + "}"; + + + final TupleQuery tupleQuery = + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + TupleQueryResult result = tupleQuery.evaluate(); + + while (result.hasNext()) { + System.err.println(result.next()); + } + +// Collection<BindingSet> solution = new LinkedList<BindingSet>(); +// solution.add(createBindingSet(new Binding[] { +// new BindingImpl("s", mike), +// new BindingImpl("p", RDFS.LABEL), +// new BindingImpl("label", l1) +// })); +// solution.add(createBindingSet(new Binding[] { +// new BindingImpl("s", bryan), +// new BindingImpl("p", RDFS.COMMENT), +// new BindingImpl("label", l2) +// })); +// +// compare(result, solution); + + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |