From: <tho...@us...> - 2013-04-26 20:07:21
|
Revision: 7087 http://bigdata.svn.sourceforge.net/bigdata/?rev=7087&view=rev Author: thompsonbry Date: 2013-04-26 20:07:14 +0000 (Fri, 26 Apr 2013) Log Message: ----------- The commit above broke one unit test. If we retain this feature, then the test needs to be modified to predict the new annotation. {{{ TestASTSubGroupJoinVarOptimizer.test_govtrack_21 }}} ---- I refactored the JVMDistinctBindingSets operator to extract its DISTINCT implementation as a JVMDistinctFilter. I added support into the JVMHashJoinUtility implementation for this. The feature MUST be enabled by hand in JVMHashJoinUtility.outputSolutions(). set distinct:=true. When this feature is NOT enabled the following tests fail (these are known failures and documented in the test case). {{{ TestASTSparql11SubqueryOptimizer#test_subSelectWithNoJoinVars() TestASTHashJoinOptimizer#test_hashJoinOptimizer_BSBM_Q5() TestTCK#test_sparql11_order_02() TestTCK#test_sparql11_order_03() }}} If you enable the DISTINCT on the solutions flowing into the child join group, then the following unit tests fail. I suspect that these failures are related to underproducing solutions, but some of them might also be related to a failure to produce the correct set of variables for [projectedInVars]. {{{ TestNamedGraphs#test_default_graph_joins_01f() TestOptionals#test_optionals_simplest() TestOptionals#test_optionals_simplestWithFilter() TestOptionals#test_double_optional_include() TestNegation#test_sparql11_minus_01() TestTCK#test_open_eq_12() }}} ---- Note: We need to add unit tests (SPARQL and HashJoinUtility) for this feature. Note: We need to add unit tests for DISTINCT when some of the variables on which the DISTINCT is imposed are null. ---- @see https://sourceforge.net/apps/trac/bigdata/ticket/668 (JoinGroup optimizations) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java 2013-04-26 18:36:07 UTC (rev 7086) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java 2013-04-26 20:07:14 UTC (rev 7087) @@ -49,6 +49,7 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.solutions.JVMDistinctFilter; import com.bigdata.counters.CAT; import com.bigdata.htree.HTree; import com.bigdata.rdf.internal.impl.literal.XSDBooleanIV; @@ -394,6 +395,14 @@ private final AtomicBoolean open = new AtomicBoolean(true); /** + * The operator whose annotations are used to initialize this object. + * <p> + * Note: This was added to support the DISTINCT FILTER in + * {@link #outputSolutions(IBuffer)}. + */ + private final PipelineOp op; + + /** * The type of join to be performed. */ private final JoinTypeEnum joinType; @@ -419,12 +428,22 @@ private final IVariable<?>[] joinVars; /** - * The variables to be retained (optional, all variables are retained if - * not specified). + * The variables to be retained (aka projected out) (optional, all variables + * are retained if not specified). */ private final IVariable<?>[] selectVars; /** + * The variables to be projected into a join group. When non- + * <code>null</code> variables that are NOT in this array are NOT flowed + * into the join group. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/668" > + * JoinGroup optimizations </a> + */ + private final IVariable<?>[] projectedInVars; + + /** * The join constraints (optional). */ private final IConstraint[] constraints; @@ -515,6 +534,7 @@ if(joinType == null) throw new IllegalArgumentException(); + this.op = op; this.joinType = joinType; this.optional = joinType == JoinTypeEnum.Optional; this.filter = joinType == JoinTypeEnum.Filter; @@ -527,11 +547,19 @@ this.joinVars = (IVariable<?>[]) op .getRequiredProperty(HashJoinAnnotations.JOIN_VARS); - // The projected variables (optional and equal to the join variables iff - // this is a DISTINCT filter). + /* + * The projected OUT variables (optional and equal to the join variables + * iff this is a DISTINCT filter). + */ this.selectVars = filter ? joinVars : (IVariable<?>[]) op .getProperty(JoinAnnotations.SELECT); + /* + * The variables that are projected IN to the join group. + */ + this.projectedInVars = (IVariable<?>[]) op + .getProperty(HashJoinAnnotations.PROJECT_IN_VARS); + // The join constraints (optional). this.constraints = (IConstraint[]) op .getProperty(JoinAnnotations.CONSTRAINTS); @@ -557,7 +585,8 @@ * keyword. This would give us what amounts to per-hash code striped * locks. Note: the JVMDistinctBindingSetsOp does not use this class * right now because it enjoys better concurrency than the - * JVMHashJoinUtility. + * JVMHashJoinUtility. Also see JVMDistinctFilter, which is the backing + * implementation for the JVMDistinctBindingSetsOp. */ rightSolutionsRef.set(new LinkedHashMap<Key, Bucket>(// op.getProperty(HashMapAnnotations.INITIAL_CAPACITY, @@ -682,8 +711,8 @@ } @Override - public long filterSolutions(ICloseableIterator<IBindingSet[]> itr, - BOpStats stats, IBuffer<IBindingSet> sink) { + public long filterSolutions(final ICloseableIterator<IBindingSet[]> itr, + final BOpStats stats, final IBuffer<IBindingSet> sink) { try { @@ -1085,6 +1114,47 @@ @Override public void outputSolutions(final IBuffer<IBindingSet> out) { + /* + * FIXME Set this to enable "DISTINCT" on the solutions flowing into the + * join group. + * + * Note: This should be set by the HashIndexOp (or passed in through the + * interface). + * + * Note: Enabling this causes failures. See the ticket below. I suspect + * that these failures are related to underproducing solutions, but some + * of them might also be related to a failure to produce the correct set + * of variables for [projectedInVars]. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/668" > + * JoinGroup optimizations </a> + */ + final boolean distinct = false; + + final JVMDistinctFilter distinctFilter; + + if (distinct && projectedInVars != null && projectedInVars.length > 0) { + + /* + * Note: We are single threaded here so we can use a lower + * concurrencyLevel value. + */ + final int concurrencyLevel = 1;//ConcurrentHashMapAnnotations.DEFAULT_CONCURRENCY_LEVEL; + + distinctFilter = new JVMDistinctFilter(projectedInVars, // + op.getProperty(HashMapAnnotations.INITIAL_CAPACITY, + HashMapAnnotations.DEFAULT_INITIAL_CAPACITY),// + op.getProperty(HashMapAnnotations.LOAD_FACTOR, + HashMapAnnotations.DEFAULT_LOAD_FACTOR),// + concurrencyLevel + ); + + } else { + + distinctFilter = null; + + } + try { // if (true) { @@ -1151,6 +1221,26 @@ IBindingSet bs = solutionHit.solution; + if (distinctFilter != null) { + + /* + * Note: The DISTINCT filter is based on the + * variables that are projected INTO the child + * join group. However, those are NOT always + * the same as the variables that are projected + * OUT of the child join group, so we need to + * + */ + + if (distinctFilter.accept2(bs) == null) { + + // Drop duplicate solutions. + continue; + + } + + } + if (selected != null) { // Drop variables which are not projected. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-04-26 18:36:07 UTC (rev 7086) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-04-26 20:07:14 UTC (rev 7087) @@ -1,26 +1,43 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ package com.bigdata.bop.solutions; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.FutureTask; -import org.apache.log4j.Logger; - import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.ConcurrentHashMapAnnotations; import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstant; import com.bigdata.bop.IQueryAttributes; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.join.JVMHashJoinUtility; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -31,22 +48,18 @@ * <p> * Note: This implementation is a pipelined operator which inspects each chunk * of solutions as they arrive and those solutions which are distinct for each - * chunk are passed on. It uses a {@link ConcurrentMap} and is thread-safe. + * chunk are passed on. It uses a {@link ConcurrentMap} and is thread-safe. It + * is significantly faster than the single-threaded hash index routines in the + * {@link JVMHashJoinUtility}. * - * TODO Look into reconciling this class with {@link JVMHashJoinUtility}. - * However, note that this implementation is thread-safe and uses a - * {@link ConcurrentMap}. It may be better to leave things as they are since - * this implementation may be more efficient for the special case which it - * handles. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z * thompsonbry $ */ public class JVMDistinctBindingSetsOp extends PipelineOp { - private final static transient Logger log = Logger - .getLogger(JVMDistinctBindingSetsOp.class); +// private final static transient Logger log = Logger +// .getLogger(JVMDistinctBindingSetsOp.class); /** * @@ -148,81 +161,21 @@ } /** - * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. - */ - private static class Solution { - - private final int hash; - - private final IConstant<?>[] vals; - - public Solution(final IConstant<?>[] vals) { - this.vals = vals; - this.hash = java.util.Arrays.hashCode(vals); - } - - public int hashCode() { - return hash; - } - - public boolean equals(final Object o) { - if (this == o) - return true; - if (!(o instanceof Solution)) { - return false; - } - final Solution t = (Solution) o; - if (vals.length != t.vals.length) - return false; - for (int i = 0; i < vals.length; i++) { - // @todo verify that this allows for nulls with a unit test. - if (vals[i] == t.vals[i]) - continue; - if (vals[i] == null) - return false; - if (!vals[i].equals(t.vals[i])) - return false; - } - return true; - } - } - - /** * Task executing on the node. */ static private class DistinctTask implements Callable<Void> { private final BOpContext<IBindingSet> context; - /** - * A concurrent map whose keys are the bindings on the specified - * variables (the keys and the values are the same since the map - * implementation does not allow <code>null</code> values). - * <p> - * Note: The map is shared state and can not be discarded or cleared - * until the last invocation!!! - */ - private final ConcurrentHashMap<Solution, Solution> map; - - /** - * The variables used to impose a distinct constraint. - */ - private final IVariable<?>[] vars; + private final JVMDistinctFilter filter; - @SuppressWarnings("unchecked") DistinctTask(final JVMDistinctBindingSetsOp op, final BOpContext<IBindingSet> context) { this.context = context; - this.vars = op.getVariables(); + final IVariable<?>[] vars = op.getVariables(); - if (vars == null) - throw new IllegalArgumentException(); - - if (vars.length == 0) - throw new IllegalArgumentException(); - /* * The map is shared state across invocations of this operator task. */ @@ -232,68 +185,28 @@ final IQueryAttributes attribs = context.getRunningQuery() .getAttributes(); - ConcurrentHashMap<Solution, Solution> map = (ConcurrentHashMap<Solution, Solution>) attribs - .get(key); + JVMDistinctFilter filter = (JVMDistinctFilter) attribs.get(key); - if (map == null) { + if (filter == null) { - map = new ConcurrentHashMap<Solution, Solution>( + filter = new JVMDistinctFilter(vars, op.getInitialCapacity(), op.getLoadFactor(), op.getConcurrencyLevel()); - final ConcurrentHashMap<Solution, Solution> tmp = (ConcurrentHashMap<Solution, Solution>) attribs - .putIfAbsent(key, map); + final JVMDistinctFilter tmp = (JVMDistinctFilter) attribs + .putIfAbsent(key, filter); if (tmp != null) - map = tmp; + filter = tmp; } - this.map = map; + this.filter = filter; } } - /** - * If the bindings are distinct for the configured variables then return - * those bindings. - * - * @param bset - * The binding set to be filtered. - * - * @return The distinct as bound values -or- <code>null</code> if the - * binding set duplicates a solution which was already accepted. - */ - private IConstant<?>[] accept(final IBindingSet bset) { - - final IConstant<?>[] r = new IConstant<?>[vars.length]; - - for (int i = 0; i < vars.length; i++) { - - /* - * Note: This allows null's. - * - * @todo write a unit test when some variables are not bound. - */ - r[i] = bset.get(vars[i]); - - } - - final Solution s = new Solution(r); - - if (log.isTraceEnabled()) - log.trace("considering: " + Arrays.toString(r)); - - final boolean distinct = map.putIfAbsent(s, s) == null; - - if (distinct && log.isDebugEnabled()) - log.debug("accepted: " + Arrays.toString(r)); - - return distinct ? r : null; - - } - public Void call() throws Exception { final BOpStats stats = context.getStats(); @@ -323,27 +236,14 @@ * Test to see if this solution is distinct from those * already seen. */ - final IConstant<?>[] vals = accept(bset); + final IBindingSet tmp = filter.accept2(bset); - if (vals != null) { + if (tmp != null) { - /* - * This is a distinct solution. Copy only the - * variables used to select distinct solutions into - * a new binding set and add that to the set of - * [accepted] binding sets which will be emitted by - * this operator. - */ - - final ListBindingSet tmp = new ListBindingSet(); - - for (int i = 0; i < vars.length; i++) { - - if (vals[i] != null) - tmp.set(vars[i], vals[i]); - - } - + /* + * This is a distinct solution. + */ + accepted.add(tmp); naccepted++; @@ -391,7 +291,7 @@ * is not going to be cleared until the query goes out of * scope and is swept by GC. */ - map.clear(); + filter.clear(); } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java 2013-04-26 20:07:14 UTC (rev 7087) @@ -0,0 +1,224 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Apr 26, 2013 + */ +package com.bigdata.bop.solutions; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.bindingSet.ListBindingSet; + +/** + * Utility class for imposing a DISTINCT filter on {@link IBindingSet}. This + * class is thread-safe. It is based on a {@link ConcurrentHashMap}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class JVMDistinctFilter { + + private static final Logger log = Logger.getLogger(JVMDistinctFilter.class); + + /** + * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. + */ + private static class Solution { + + private final int hash; + + private final IConstant<?>[] vals; + + public Solution(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Solution)) { + return false; + } + final Solution t = (Solution) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo verify that this allows for nulls with a unit test. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + } + + /** + * The variables used to impose a distinct constraint. + */ + private final IVariable<?>[] vars; + + /** + * A concurrent map whose keys are the bindings on the specified variables + * (the keys and the values are the same since the map implementation does + * not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared until + * the last invocation!!! + */ + private final ConcurrentHashMap<Solution, Solution> map; + + /** + * + * @param vars + * The set of variables on which the DISTINCT filter will be + * imposed. Only these variables will be present in the + * "accepted" solutions. Any variable bindings not specified in + * this array will be dropped). + * @param initialCapacity + * @param loadFactor + * @param concurrencyLevel + */ + public JVMDistinctFilter(final IVariable<?>[] vars, + final int initialCapacity, final float loadFactor, + final int concurrencyLevel) { + + if (vars == null) + throw new IllegalArgumentException(); + + if (vars.length == 0) + throw new IllegalArgumentException(); + + this.vars = vars; + + this.map = new ConcurrentHashMap<Solution, Solution>(initialCapacity, + loadFactor, concurrencyLevel); + + } + + /** + * If the bindings are distinct for the configured variables then return + * those bindings. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + public IConstant<?>[] accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[vars.length]; + + for (int i = 0; i < vars.length; i++) { + + /* + * Note: This allows null's. + * + * @todo write a unit test when some variables are not bound. + */ + r[i] = bset.get(vars[i]); + + } + + final Solution s = new Solution(r); + + if (log.isTraceEnabled()) + log.trace("considering: " + Arrays.toString(r)); + + final boolean distinct = map.putIfAbsent(s, s) == null; + + if (distinct && log.isDebugEnabled()) + log.debug("accepted: " + Arrays.toString(r)); + + return distinct ? r : null; + + } + + /** + * If the bindings are distinct for the configured variables then return a + * new {@link IBindingSet} consisting of only the selected variables. + * + * @param bset + * The binding set to be filtered. + * + * @return A new {@link IBindingSet} containing only the distinct as bound + * values -or- <code>null</code> if the binding set duplicates a + * solution which was already accepted. + */ + public IBindingSet accept2(final IBindingSet bset) { + + final IConstant<?>[] vals = accept(bset); + + if (vals == null) { + + /* + * This is a duplicate solution. + */ + + return null; + + } + + /* + * This is a distinct solution. Copy only the variables used to select + * distinct solutions into a new binding set and add that to the set of + * [accepted] binding sets which will be emitted by this operator. + */ + + final ListBindingSet tmp = new ListBindingSet(); + + for (int i = 0; i < vars.length; i++) { + + if (vals[i] != null) + tmp.set(vars[i], vals[i]); + + } + + return tmp; + + } + + /** + * Discard the map backing this filter. + */ + public void clear() { + + map.clear(); + + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-04-28 15:03:50
|
Revision: 7088 http://bigdata.svn.sourceforge.net/bigdata/?rev=7088&view=rev Author: thompsonbry Date: 2013-04-28 15:03:41 +0000 (Sun, 28 Apr 2013) Log Message: ----------- Refactored the JVMHashJoinUtility to extract a JVMHashIndex class that encapsulates the Map/Bucket/SolutionHit pattern for reuse. In particular, this pattern allows us to track the #of occurrences of a distinct solution (a histogram). I identified and fixed 2 bugs in JVMHashJoinUtility.filterSolutions(). It appears that this method was not in use, which makes sense. We prefer the JVMDistinctBindingSetsOp which is based on a ConcurrentHashMap and has significantly better performance for DISTINCT. I found a bug in my implementation of the "DISTINCT" filter on the solutions projected into the sub-group. I was failing to restrict the solutions passed into the sub-group to only the "projected-in-vars". This has been fixed, but there are still FIXMEs in the JVMHashJoinUtility.outputSolutions() method documenting things that need to be reviewed. The "distinct" filter is still disabled in the code at the head of the outputSolutions() method. There are 5 places where we use a hashIndexBuild => subGroup => hashJoin pattern. These all need to be reviewed for correctness of the projectedIn versus projectedOut (aka "selected"). The bug mentioned directly above was related to the use of "selected" in outputSolutions(). It is assumed in the code that the projectedInVars and projectedOutVars are the same, but that is not true because we want to project in only those variables that are visible to the sub-group and we want to project out only those variables bound in the sub-group that are reused in the parent. We also need to carefully review the named solution set query patterns as the source is not always the pipeline and this might change the effective semantics of the outputSolutions() method (code review required). - addExistsSubquery(PipelineOp, SubqueryRoot, Set<IVariable<?>>, AST2BOpContext) - addNamedSubqueryInclude(PipelineOp, NamedSubqueryInclude, Set<IVariable<?>>, AST2BOpContext) - addSparql11Subquery(PipelineOp, SubqueryRoot, Set<IVariable<?>>, AST2BOpContext) - addSubgroup(PipelineOp, GraphPatternGroup<IGroupMemberNode>, Set<IVariable<?>>, AST2BOpContext) - doMergeJoin(PipelineOp, JoinGroupNode, Set<IVariable<?>>, AtomicInteger, AST2BOpContext) See https://sourceforge.net/apps/trac/bigdata/ticket/668 (JoinGroup Optimization) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java Removed Paths: ------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java Copied: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java (from rev 7087, branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java) =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java 2013-04-28 15:03:41 UTC (rev 7088) @@ -0,0 +1,224 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Apr 26, 2013 + */ +package com.bigdata.bop.join; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.bindingSet.ListBindingSet; + +/** + * Utility class for imposing a DISTINCT filter on {@link IBindingSet}. This + * class is thread-safe. It is based on a {@link ConcurrentHashMap}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class JVMDistinctFilter { + + private static final Logger log = Logger.getLogger(JVMDistinctFilter.class); + + /** + * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. + */ + private static class Solution { + + private final int hash; + + private final IConstant<?>[] vals; + + public Solution(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Solution)) { + return false; + } + final Solution t = (Solution) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo verify that this allows for nulls with a unit test. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + } + + /** + * The variables used to impose a distinct constraint. + */ + private final IVariable<?>[] vars; + + /** + * A concurrent map whose keys are the bindings on the specified variables + * (the keys and the values are the same since the map implementation does + * not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared until + * the last invocation!!! + */ + private final ConcurrentHashMap<Solution, Solution> map; + + /** + * + * @param vars + * The set of variables on which the DISTINCT filter will be + * imposed. Only these variables will be present in the + * "accepted" solutions. Any variable bindings not specified in + * this array will be dropped). + * @param initialCapacity + * @param loadFactor + * @param concurrencyLevel + */ + public JVMDistinctFilter(final IVariable<?>[] vars, + final int initialCapacity, final float loadFactor, + final int concurrencyLevel) { + + if (vars == null) + throw new IllegalArgumentException(); + + if (vars.length == 0) + throw new IllegalArgumentException(); + + this.vars = vars; + + this.map = new ConcurrentHashMap<Solution, Solution>(initialCapacity, + loadFactor, concurrencyLevel); + + } + + /** + * If the bindings are distinct for the configured variables then return + * those bindings. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + public IConstant<?>[] accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[vars.length]; + + for (int i = 0; i < vars.length; i++) { + + /* + * Note: This allows null's. + * + * @todo write a unit test when some variables are not bound. + */ + r[i] = bset.get(vars[i]); + + } + + final Solution s = new Solution(r); + + if (log.isTraceEnabled()) + log.trace("considering: " + Arrays.toString(r)); + + final boolean distinct = map.putIfAbsent(s, s) == null; + + if (distinct && log.isDebugEnabled()) + log.debug("accepted: " + Arrays.toString(r)); + + return distinct ? r : null; + + } + + /** + * If the bindings are distinct for the configured variables then return a + * new {@link IBindingSet} consisting of only the selected variables. + * + * @param bset + * The binding set to be filtered. + * + * @return A new {@link IBindingSet} containing only the distinct as bound + * values -or- <code>null</code> if the binding set duplicates a + * solution which was already accepted. + */ + public IBindingSet accept2(final IBindingSet bset) { + + final IConstant<?>[] vals = accept(bset); + + if (vals == null) { + + /* + * This is a duplicate solution. + */ + + return null; + + } + + /* + * This is a distinct solution. Copy only the variables used to select + * distinct solutions into a new binding set and add that to the set of + * [accepted] binding sets which will be emitted by this operator. + */ + + final ListBindingSet tmp = new ListBindingSet(); + + for (int i = 0; i < vars.length; i++) { + + if (vals[i] != null) + tmp.set(vars[i], vals[i]); + + } + + return tmp; + + } + + /** + * Discard the map backing this filter. + */ + public void clear() { + + map.clear(); + + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java 2013-04-26 20:07:14 UTC (rev 7087) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/join/JVMHashJoinUtility.java 2013-04-28 15:03:41 UTC (rev 7088) @@ -49,7 +49,8 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.BOpStats; -import com.bigdata.bop.solutions.JVMDistinctFilter; +import com.bigdata.bop.join.JVMHashJoinUtility.JVMHashIndex.Bucket; +import com.bigdata.bop.join.JVMHashJoinUtility.JVMHashIndex.SolutionHit; import com.bigdata.counters.CAT; import com.bigdata.htree.HTree; import com.bigdata.rdf.internal.impl.literal.XSDBooleanIV; @@ -74,321 +75,522 @@ private static final Logger log = Logger.getLogger(JVMHashJoinUtility.class); - /** - * Note: If joinVars is an empty array, then the solutions will all hash to - * ONE (1). - */ - private static final int ONE = 1; - - /** - * Return the hash code which will be used as the key given the ordered - * as-bound values for the join variables. - * - * @param joinVars - * The join variables. - * @param bset - * The bindings whose as-bound hash code for the join variables - * will be computed. - * @param ignoreUnboundVariables - * If a variable without a binding should be silently ignored. - * - * @return The hash code. - * - * @throws JoinVariableNotBoundException - * if there is no binding for a join variable. - */ - private static int hashCode(final IVariable<?>[] joinVars, - final IBindingSet bset, final boolean ignoreUnboundVariables) - throws JoinVariableNotBoundException { + public static class JVMHashIndex { - int h = ONE; + /** + * Note: If joinVars is an empty array, then the solutions will all hash to + * ONE (1). + */ + private static final int ONE = 1; + + /** + * Return the hash code which will be used as the key given the ordered + * as-bound values for the join variables. + * + * @param joinVars + * The join variables. + * @param bset + * The bindings whose as-bound hash code for the join variables + * will be computed. + * @param ignoreUnboundVariables + * If a variable without a binding should be silently ignored. + * + * @return The hash code. + * + * @throws JoinVariableNotBoundException + * if there is no binding for a join variable. + */ + private static int hashCode(final IVariable<?>[] joinVars, + final IBindingSet bset, final boolean ignoreUnboundVariables) + throws JoinVariableNotBoundException { - for (IVariable<?> v : joinVars) { + int h = ONE; - final IConstant<?> c = bset.get(v); + for (IVariable<?> v : joinVars) { - if (c == null) { + final IConstant<?> c = bset.get(v); - if(ignoreUnboundVariables) - continue; + if (c == null) { - // Reject any solution which does not have a binding for a join - // variable. + if(ignoreUnboundVariables) + continue; - throw new JoinVariableNotBoundException(v.getName()); + // Reject any solution which does not have a binding for a join + // variable. + + throw new JoinVariableNotBoundException(v.getName()); + + } + + h = 31 * h + c.hashCode(); } - - h = 31 * h + c.hashCode(); - } - - if (log.isTraceEnabled()) - log.trace("hashCode=" + h + ", joinVars=" - + Arrays.toString(joinVars) + " : " + bset); + if (log.isTraceEnabled()) + log.trace("hashCode=" + h + ", joinVars=" + + Arrays.toString(joinVars) + " : " + bset); - return h; + return h; - } - - /** - * Return an array of constants corresponding to the as-bound values of the - * join variables for the given solution. - * - * @param joinVars - * The join variables. - * @param bset - * The solution. - * @param optional - * <code>true</code> iff the hash join is optional. - * - * @return The as-bound values for the join variables for that solution. - */ - static private Key makeKey(final IVariable<?>[] joinVars, - final IBindingSet bset, final boolean optional) { + } - final IConstant<?>[] vals = new IConstant<?>[joinVars.length]; + + /** + * Return an array of constants corresponding to the as-bound values of the + * join variables for the given solution. + * + * @param joinVars + * The join variables. + * @param bset + * The solution. + * @param optional + * <code>true</code> iff the hash join is optional. + * + * @return The as-bound values for the join variables for that solution. + */ + static private Key makeKey(final IVariable<?>[] joinVars, + final IBindingSet bset, final boolean optional) { - for (int i = 0; i < joinVars.length; i++) { + final IConstant<?>[] vals = new IConstant<?>[joinVars.length]; - final IVariable<?> v = joinVars[i]; + for (int i = 0; i < joinVars.length; i++) { - vals[i] = bset.get(v); + final IVariable<?> v = joinVars[i]; - } + vals[i] = bset.get(v); - int hashCode = ONE; - try { + } - /* - * Note: The original version of this class always throws an - * exception for an unbound join variable out of its hashCode() impl - * and then handles that exception here. - */ - - hashCode = hashCode(joinVars, bset, false/* ignoreUnboundVariables */); + int hashCode = ONE; + try { - } catch (JoinVariableNotBoundException ex) { - - if (!optional) { + /* + * Note: The original version of this class always throws an + * exception for an unbound join variable out of its hashCode() impl + * and then handles that exception here. + */ - // Drop solution; + hashCode = hashCode(joinVars, bset, false/* ignoreUnboundVariables */); + + } catch (JoinVariableNotBoundException ex) { - if (log.isDebugEnabled()) - log.debug(ex); + if (!optional) { + + // Drop solution; + + if (log.isDebugEnabled()) + log.debug(ex); - return null; + return null; + } + } + + return new Key(hashCode, vals); } - - return new Key(hashCode, vals); - } + /** + * Wrapper for the keys in the hash table. This is necessary for the hash + * table to compare the keys as equal and also provides a efficiencies in + * the hash code and equals() methods. + */ + static class Key { + + private final int hash; - /** - * Wrapper for the keys in the hash table. This is necessary for the hash - * table to compare the keys as equal and also provides a efficiencies in - * the hash code and equals() methods. - */ - private static class Key { - - private final int hash; + private final IConstant<?>[] vals; - private final IConstant<?>[] vals; + private Key(final int hashCode, final IConstant<?>[] vals) { + this.vals = vals; + this.hash = hashCode; + } - private Key(final int hashCode, final IConstant<?>[] vals) { - this.vals = vals; - this.hash = hashCode; - } + public int hashCode() { + return hash; + } - public int hashCode() { - return hash; - } - - public boolean equals(final Object o) { - if (this == o) - return true; - if (!(o instanceof Key)) { - return false; - } - final Key t = (Key) o; - if (vals.length != t.vals.length) - return false; - for (int i = 0; i < vals.length; i++) { - if (vals[i] == t.vals[i]) - continue; - if (vals[i] == null) + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Key)) { return false; - if (!vals[i].equals(t.vals[i])) + } + final Key t = (Key) o; + if (vals.length != t.vals.length) return false; + for (int i = 0; i < vals.length; i++) { + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; } - return true; } - } - - /** - * An input solution and a hit counter. - */ - private static class SolutionHit { - + /** - * The input solution. + * An input solution and a hit counter. */ - final public IBindingSet solution; + public static class SolutionHit { + /** + * The input solution. + */ + final public IBindingSet solution; + + /** + * The #of hits on that input solution when processing the join against + * the subquery. + */ + public final CAT nhits = new CAT(); + + private SolutionHit(final IBindingSet solution) { + + if(solution == null) + throw new IllegalArgumentException(); + + this.solution = solution; + + } + + public String toString() { + + return getClass().getName() + "{nhits=" + nhits + ",solution=" + + solution + "}"; + + } + + } // class SolutionHit + /** - * The #of hits on that input solution when processing the join against - * the subquery. + * A group of solutions having the same as-bound values for their join vars. + * Each solution is paired with a hit counter so we can support OPTIONAL + * semantics for the join. */ - public final CAT nhits = new CAT(); - - private SolutionHit(final IBindingSet solution) { + public static class Bucket implements Iterable<SolutionHit>, + Comparable<Bucket> { + + /** The hash code for this collision bucket. */ + private final int hashCode; - if(solution == null) - throw new IllegalArgumentException(); + /** + * A set of solutions (and their hit counters) which have the same + * as-bound values for the join variables. + */ + private final List<SolutionHit> solutions = new LinkedList<SolutionHit>(); + + public String toString() { + return super.toString() + + // + "{hashCode=" + hashCode + ",#solutions=" + solutions.size() + + "}"; + } - this.solution = solution; + public Bucket(final int hashCode, final IBindingSet solution) { + + this.hashCode = hashCode; + + add(solution); + + } + + public void add(final IBindingSet solution) { + + if (solution == null) + throw new IllegalArgumentException(); + + solutions.add(new SolutionHit(solution)); + + } - } - - public String toString() { + /** + * Add the solution to the bucket iff the solutions is not already + * present in the bucket. + * <p> + * Note: There is already a hash index in place on the join variables + * when we are doing a DISTINCT filter. Further, only the "join" + * variables are "selected" and participate in a DISTINCT filter. + * Therefore, if we have a hash collision such that two solutions would + * be directed into the same {@link Bucket} then we can not improve + * matters but must simply scan the solutions in the bucket to decide + * whether the new solution duplicates a solution which is already + * present. + * + * @param solution + * The solution. + * + * @return <code>true</code> iff the bucket was modified by this + * operation. + */ + public boolean addDistinct(final IBindingSet solution) { - return getClass().getName() + "{nhits=" + nhits + ",solution=" - + solution + "}"; + if(solutions.isEmpty()) { - } - - } // class SolutionHit + // First solution. + solutions.add(new SolutionHit(solution)); + + return true; + + } - /** - * A group of solutions having the same as-bound values for their join vars. - * Each solution is paired with a hit counter so we can support OPTIONAL - * semantics for the join. - */ - private static class Bucket implements Iterable<SolutionHit>, - Comparable<Bucket> { + final Iterator<SolutionHit> itr = solutions.iterator(); + + while(itr.hasNext()) { + + final SolutionHit aSolution = itr.next(); + + if(aSolution.solution.equals(solution)) { + + // Solution already in this bucket. + return false; + + } + + } + + // This is a distinct solution. + solutions.add(new SolutionHit(solution)); + + return true; + + } + + public Iterator<SolutionHit> iterator() { + +// return Collections.unmodifiableList(solutions).iterator(); + return solutions.iterator(); + + } - /** The hash code for this collision bucket. */ - private final int hashCode; - +// @SuppressWarnings("unchecked") +// public Iterator<IBindingSet> bindingSetIterator() { +// +// return new Striterator(solutions.iterator()).addFilter(new Resolver() { +// +// @Override +// protected Object resolve(Object obj) { +// return ((SolutionHit)obj).solution; +// } +// }); +// +// } + + /** + * Orders the buckets based on their hash codes. + */ + @Override + public int compareTo(final Bucket o) { + if (hashCode > o.hashCode) + return 1; + if (hashCode < o.hashCode) + return -1; + return 0; + } + + } // Bucket + /** - * A set of solutions (and their hit counters) which have the same - * as-bound values for the join variables. + * The backing map - this is NOT thread safe. */ - private final List<SolutionHit> solutions = new LinkedList<SolutionHit>(); + private final Map<Key, Bucket> map; + private final IVariable<?>[] joinVars; +// private final boolean optional; - public String toString() { - return super.toString() - + // - "{hashCode=" + hashCode + ",#solutions=" + solutions.size() - + "}"; - } - - public Bucket(final int hashCode, final IBindingSet solution) { + public JVMHashIndex(final int initialCapacity, final float loadFactor, + final IVariable<?>[] joinVars) {//, final boolean optional) { - this.hashCode = hashCode; + if (joinVars == null) { + /* + * A ZERO LENGTH joinVars[] means that all solutions will be in + * the same hash bucket. This can arise due to poor assignment + * of join variables or simply because there are no available + * join variables (full cross product join). Such joins are very + * expensive. + */ + throw new IllegalArgumentException(); + } - add(solution); - + this.map = new LinkedHashMap<Key, Bucket>(initialCapacity, + loadFactor); + + this.joinVars = joinVars; + + /* + * TOOD Can we pass this in and remove it from the API? But see + * filterDistinct(). + */ +// this.optional = optional; + } - public void add(final IBindingSet solution) { - - if (solution == null) - throw new IllegalArgumentException(); + /** + * Add the solution to the index. + * + * @param bset + * The {@link IBindingSet}. + * @param optional + * + * @return The {@link Key} iff the solution was added to the index and + * <code>null</code> iff the solution was not added (because a + * {@link Key} could not be formed for the solution given the + * specified {@link #joinVars}). + * + * TODO javadoc on OPTIONAL + */ + public Key add(final IBindingSet bset,final boolean optional) { + + final Key key = makeKey(joinVars, bset, optional); + + if (key == null) { + + // Drop solution. + return null; + + } + + Bucket b = map.get(key); + + if (b == null) { + + map.put(key, b = new Bucket(key.hash, bset)); + + } else { + + b.add(bset); + + } + + return key; - solutions.add(new SolutionHit(solution)); - } /** - * Add the solution to the bucket iff the solutions is not already - * present in the bucket. - * <p> - * Note: There is already a hash index in place on the join variables - * when we are doing a DISTINCT filter. Further, only the "join" - * variables are "selected" and participate in a DISTINCT filter. - * Therefore, if we have a hash collision such that two solutions would - * be directed into the same {@link Bucket} then we can not improve - * matters but must simply scan the solutions in the bucket to decide - * whether the new solution duplicates a solution which is already - * present. + * Add the solution to the index iff the solution is not already present + * in the index. * * @param solution * The solution. * - * @return <code>true</code> iff the bucket was modified by this + * @return <code>true</code> iff the index was modified by this * operation. */ - public boolean addDistinct(final IBindingSet solution) { + public boolean addDistinct(final IBindingSet bset) { - if(solutions.isEmpty()) { + // TODO Review why optional:=true here. + final Key key = makeKey(joinVars, bset, true/* optional */); - // First solution. - solutions.add(new SolutionHit(solution)); - + assert key != null; + + Bucket b = map.get(key); + + if (b == null) { + + // New bucket holding just this solution. + map.put(key, b = new Bucket(key.hash, bset)); + return true; - - } - final Iterator<SolutionHit> itr = solutions.iterator(); - - while(itr.hasNext()) { - - final SolutionHit aSolution = itr.next(); - - if(aSolution.solution.equals(solution)) { - - // Solution already in this bucket. - return false; - + } else { + + if (b.addDistinct(bset)) { + + // Existing bucket not having this solution. + return true; + } - + + // Existing bucket with duplicate solution. + return false; + } + + } + + /** + * Return the hash {@link Bucket} into which the given solution is mapped. + * <p> + * Note: The caller must apply an appropriate join constraint in order + * to correctly reject solutions that (a) violate the join contract; and + * (b) that are present in the hash bucket due to a hash collection + * rather than because they have the same bindings for the join + * variables. + * + * @param left + * The probe. + * @param optional + * + * @return The hash {@link Bucket} into which the given solution is + * mapped -or- <code>null</code> if there is no such hash + * bucket. + * + * TODO javadoc [optional]. + */ + public Bucket getBucket(final IBindingSet left, final boolean optional) { + + final Key key = makeKey(joinVars, left, optional); + + if (key == null) { + + return null; + + } + + // Probe the hash map : May return [null]! + return map.get(key); + + } + + /** + * Visit all buckets in the hash index. + */ + public Iterator<Bucket> buckets() { - // This is a distinct solution. - solutions.add(new SolutionHit(solution)); + return map.values().iterator(); - return true; - } - public Iterator<SolutionHit> iterator() { + /** + * The #of buckets in the hash index. Each bucket has a distinct hash + * code. Hash collisions can cause solutions that are distinct in their + * {@link #joinVars} to nevertheless be mapped into the same hash + * bucket. + * + * @return The #of buckets in the hash index. + */ + public int bucketCount() { -// return Collections.unmodifiableList(solutions).iterator(); - return solutions.iterator(); + return map.size(); } + + /** + * Export the {@link Bucket}s as an array. + */ + public Bucket[] toArray() { -// @SuppressWarnings("unchecked") -// public Iterator<IBindingSet> bindingSetIterator() { -// -// return new Striterator(solutions.iterator()).addFilter(new Resolver() { -// -// @Override -// protected Object resolve(Object obj) { -// return ((SolutionHit)obj).solution; -// } -// }); -// -// } + // source. + final Iterator<Bucket> bucketIterator = map.values().iterator(); - /** - * Orders the buckets based on their hash codes. - */ - @Override - public int compareTo(final Bucket o) { - if (hashCode > o.hashCode) - return 1; - if (hashCode < o.hashCode) - return -1; - return 0; + final Bucket[] a = new Bucket[map.size()]; + + int i = 0; + + while (bucketIterator.hasNext()) { + + a[i++] = bucketIterator.next(); + + } + + return a; + } - } // Bucket - + } + /** * <code>true</code> until the state is discarded by {@link #release()}. */ @@ -454,7 +656,7 @@ * Note: There is no separate "joinSet". Instead, the {@link SolutionHit} * class provides a join hit counter. */ - private final AtomicReference<Map<Key, Bucket>> rightSolutionsRef = new AtomicReference<Map<Key, Bucket>>(); + private final AtomicReference<JVMHashIndex> rightSolutionsRef = new AtomicReference<JVMHashIndex>(); /** * The #of solutions accepted into the hash index. @@ -583,16 +785,20 @@ * do this with the DISTINCT SOLUTIONS filter we would have to make the * mutation operations on a Bucket atomic. E.g., using the synchronized * keyword. This would give us what amounts to per-hash code striped - * locks. Note: the JVMDistinctBindingSetsOp does not use this class - * right now because it enjoys better concurrency than the - * JVMHashJoinUtility. Also see JVMDistinctFilter, which is the backing - * implementation for the JVMDistinctBindingSetsOp. + * locks. + * + * Note: the JVMDistinctBindingSetsOp does not use this class right now + * because it enjoys better concurrency than the JVMHashJoinUtility. + * Also see JVMDistinctFilter, which is the backing implementation for + * the JVMDistinctBindingSetsOp. */ - rightSolutionsRef.set(new LinkedHashMap<Key, Bucket>(// + rightSolutionsRef.set(new JVMHashIndex(// op.getProperty(HashMapAnnotations.INITIAL_CAPACITY, HashMapAnnotations.DEFAULT_INITIAL_CAPACITY),// op.getProperty(HashMapAnnotations.LOAD_FACTOR, - HashMapAnnotations.DEFAULT_LOAD_FACTOR)// + HashMapAnnotations.DEFAULT_LOAD_FACTOR),// + joinVars// +// optional// )); } @@ -624,7 +830,7 @@ } - private Map<Key,Bucket> getRightSolutions() { + private JVMHashIndex getRightSolutions() { return rightSolutionsRef.get(); @@ -659,123 +865,108 @@ final BOpStats stats) { try { - - final Map<Key,Bucket> map = getRightSolutions(); - - final IBindingSet[] all = BOpUtility.toArray(itr, stats); - if (log.isDebugEnabled()) - log.debug("Materialized: " + all.length - + " source solutions."); + final JVMHashIndex index = getRightSolutions(); - long naccepted = 0; - - for (IBindingSet bset : all) { + final IBindingSet[] all = BOpUtility.toArray(itr, stats); - final Key key = makeKey(joinVars, bset, optional); + if (log.isDebugEnabled()) + log.debug("Materialized: " + all.length + " source solutions."); - if (key == null) { - // Drop solution. - continue; + long naccepted = 0; + + for (IBindingSet bset : all) { + + if (index.add(bset, optional) == null) { + + continue; + + } + + naccepted++; + } - - Bucket b = map.get(key); - - if(b == null) { - - map.put(key, b = new Bucket(key.hash, bset)); - - } else { - - b.add(bset); - - } - - naccepted++; - } + if (log.isDebugEnabled()) + log.debug("There are " + index.bucketCount() + + " hash buckets, joinVars=" + + Arrays.toString(joinVars)); - if (log.isDebugEnabled()) - log.debug("There are : " + map.size() - + " distinct combinations of the join vars: " - + Arrays.toString(joinVars)); + rightSolutionCount.add(naccepted); - rightSolutionCount.add(naccepted); - - return naccepted; + return naccepted; - } catch(Throwable t) { + } catch (Throwable t) { + throw launderThrowable(t); + } } + /* + * FIXME I have observed two apparent bugs in this class. First, it was not + * assigning the output of [bset.copy(joinVars)] back to bset. Second, it + * was failing to output the first solution in a given bucket. I suspect + * that nobody is calling this code and that the JVMDistinctBindingSetOp is + * being used instead (which is a better choice since it allows full + * concurrency) - I have verified this. This method is not called. We might + * use the method by the same name on the HTreeHashJoinUtility, but not this + * version. + */ @Override public long filterSolutions(final ICloseableIterator<IBindingSet[]> itr, final BOpStats stats, final IBuffer<IBindingSet> sink) { - + try { - final Map<Key, Bucket> map = getRightSolutions(); - - final IBindingSet[] all = BOpUtility.toArray(itr, stats); + final JVMHashIndex index = getRightSolutions(); - if (log.isDebugEnabled()) - log.debug("Materialized: " + all.length - + " source solutions."); + final IBindingSet[] all = BOpUtility.toArray(itr, stats); - for (IBindingSet bset : all) { + if (log.isDebugEnabled()) + log.debug("Materialized: " + all.length + " source solutions."); - /* - * Note: For a DISTINCT SOLUTIONS filter, we only consider the - * variables that are being projected. Further, all variables are - * used when computing the hash code. Therefore "joinVars" == - * "selectedVars" for a DISTINCT SOLUTIONS filter. - */ - bset.copy(joinVars); // only consider the selected variables. + for (IBindingSet bset : all) { - /* - * Note: Solutions are NOT dropped if a variable is not bound in a - * given solution. The variable is simply not used when computing - * the hash code. Specifying optional:=true here causes makeKey() to - * have this behavior. - */ - final Key key = makeKey(joinVars, bset, true/* optional */); + /* + * Note: For a DISTINCT SOLUTIONS filter, we only consider the + * variables that are being projected. Further, all variables + * are used when computing the hash code. Therefore "joinVars" + * == "selectedVars" for a DISTINCT SOLUTIONS filter. + */ + bset = bset.copy(joinVars); // only consider the selected variables. - assert key != null; + /* + * Note: Solutions are NOT dropped if a variable is not bound in + * a given solution. The variable is simply not used when + * computing the hash code. Specifying optional:=true here + * causes makeKey() to have this behavior. + */ + if (index.addDistinct(bset)) { - Bucket b = map.get(key); - - if(b == null) { - - map.put(key, b = new Bucket(key.hash, bset)); - - } else { - - if(b.addDistinct(bset)) { - // Write on the output sink. sink.add(bset); - + } - + } - } + if (log.isDebugEnabled()) + log.debug("There are " + index.bucketCount() + + " hash buckets, joinVars=" + + Arrays.toString(joinVars)); - if (log.isDebugEnabled()) - log.debug("There are : " + map.size() - + " distinct combinations of the join vars: " - + Arrays.toString(joinVars)); + final long naccepted = all.length; - final long naccepted = all.length; - - rightSolutionCount.add(naccepted); + rightSolutionCount.add(naccepted); - return naccepted; + return naccepted; - } catch(Throwable t) { + } catch (Throwable t) { + throw launderThrowable(t); + } } @@ -805,10 +996,10 @@ final IConstraint[] constraints// ) { - final Map<Key,Bucket> rightSolutions = getRightSolutions(); + final JVMHashIndex rightSolutions = getRightSolutions(); if (log.isInfoEnabled()) { - log.info("rightSolutions: #buckets=" + rightSolutions.size() + log.info("rightSolutions: #buckets=" + rightSolutions.bucketCount() + ",#solutions=" + getRightSolutionCount()); } @@ -826,22 +1017,15 @@ if (log.isDebugEnabled()) log.debug("Considering " + left); - final Key key = JVMHashJoinUtility.makeKey(joinVars, left, - optional); + final Bucket bucket = rightSolutions.getBucket( + left, optional); - if (key == null) { - // Drop solution. + if (bucket == null) continue; - } + + final Iterator<SolutionHit> ritr = bucket + .iterator(); - // Probe the hash map. - final Bucket b = rightSolutions.get(key); - - if (b == null) - continue; - - final Iterator<SolutionHit> ritr = b.iterator(); - while (ritr.hasNext()) { final SolutionHit right = ritr.next(); @@ -983,20 +1167,24 @@ final Constant f = askVar == null ? null : new Constant( XSDBooleanIV.FALSE); - final Map<Key, Bucket> rightSolutions = getRightSolutions(); + final JVMHashIndex rightSolutions = getRightSolutions(); final IVariable<?>[] selected = getSelectVars(); if (log.isInfoEnabled()) - log.info("rightSolutions: #buckets=" + rightSolutions.size()); + log.info("rightSolutions: #buckets=" + + rightSolutions.bucketCount()); /* * Note: when NO solutions joined for a given source binding set AND * the join is OPTIONAL then we output the _original_ binding set to * the sink join task(s) and DO NOT apply the CONSTRAINT(s). */ + final Iterator<Bucket> bitr = rightSolutions.buckets(); + + while (bitr.hasNext()) { - for (Bucket b : rightSolutions.values()) { + final Bucket b = bitr.next(); for (SolutionHit hit : b) { @@ -1050,14 +1238,13 @@ // */ // final IVariable<?>[] selected = getSelectVars(); - final Map<Key, Bucket> rightSolutions = getRightSolutions(); + final JVMHashIndex rightSolutions = getRightSolutions(); if (log.isInfoEnabled()) - log.info("rightSolutions: #buckets=" + rightSolutions.size()); + log.info("rightSolutions: #buckets=" + rightSolutions.bucketCount()); // Visit the buckets. - IStriterator itr = new Striterator(rightSolutions.values() - .iterator()); + IStriterator itr = new Striterator(rightSolutions.buckets()); itr = itr.addFilter(new Expander() { @@ -1068,7 +1255,7 @@ */ @SuppressWarnings("rawtypes") @Override - protected Iterator expand(Object obj) { + protected Iterator expand(final Object obj) { final Bucket b = (Bucket) obj; @@ -1085,7 +1272,7 @@ private static final long serialVersionUID = 1L; @Override - protected Object resolve(Object obj) { + protected Object resolve(final Object obj) { final IBindingSet bs = ((SolutionHit) obj).solution; @@ -1126,6 +1313,10 @@ * of them might also be related to a failure to produce the correct set * of variables for [projectedInVars]. * + * TODO I have factored out the JVMHashIndex class. This class tracks + * the #of hits for each distinct solution. We can use this to correct + * the output cardinality. + * * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/668" > * JoinGroup optimizations </a> */ @@ -1138,6 +1329,10 @@ /* * Note: We are single threaded here so we can use a lower * concurrencyLevel value. + * + * Note: If necessary, this could be replaced with JVMHashIndex so + * we get the #of occurrences of each distinct combination of + * bindings that is projected into the sub-group/-query. */ final int concurrencyLevel = 1;//ConcurrentHashMapAnnotations.DEFAULT_CONCURRENCY_LEVEL; @@ -1201,17 +1396,17 @@ * Code works, uses nested iterators pattern. */ - final Map<Key, Bucket> rightSolutions = getRightSolutions(); + final JVMHashIndex rightSolutions = getRightSolutions(); final IVariable<?>[] selected = getSelectVars(); if (log.isInfoEnabled()) log.info("rightSolutions: #buckets=" - + rightSolutions.size()); + + rightSolutions.bucketCount()); // source. - final Iterator<Bucket> bucketIterator = rightSolutions.values() - .iterator(); + final Iterator<Bucket> bucketIterator = rightSolutions. + buckets(); while (bucketIterator.hasNext()) { @@ -1232,17 +1427,23 @@ * */ - if (distinctFilter.accept2(bs) == null) { + if ((bs = distinctFilter.accept2(bs)) == null) { // Drop duplicate solutions. continue; } - - } - - if (selected != null) { + } else if (selected != null) { + + /* + * FIXME We should be using projectedInVars here since + * outputSolutions() is used to stream solutions into + * the child join group (at least for some kinds of + * joins, but there might be exceptions for joining with + * a named solution set). + */ + // Drop variables which are not projected. bs = bs.copy(selected); @@ -1276,15 +1477,19 @@ final Constant t = askVar == null ? null : new Constant( XSDBooleanIV.TRUE); - final Map<Key, Bucket> rightSolutions = getRightSolutions(); + final JVMHashIndex rightSolutions = getRightSolutions(); final IVariable<?>[] selected = getSelectVars(); if (log.isInfoEnabled()) - log.info("rightSolutions: #buckets=" + rightSolutions.size()); + log.info("rightSolutions: #buckets=" + rightSolutions.bucketCount()); - for (Bucket b : rightSolutions.values()) { + final Iterator<Bucket> bitr = rightSolutions.buckets(); + while(bitr.hasNext()) { + + final Bucket b = bitr.next(); + for (SolutionHit hit : b) { if (hit.nhits.get() == 0) @@ -1367,29 +1572,6 @@ } /** - * Export the {@link Bucket}s as an array. - */ - static private Bucket[] toArray(final Map<Key,Bucket> rightSolutions) { - - // source. - final Iterator<Bucket> bucketIterator = rightSolutions.values() - .iterator(); - - final Bucket[] a = new Bucket[rightSolutions.size()]; - - int i = 0; - - while (bucketIterator.hasNext()) { - - a[i++] = bucketIterator.next(); - - } - - return a; - - } - - /** * Advance each other source to the first hash code GTE the hashCode for the * first source. * <p> @@ -1550,8 +1732,8 @@ for (int i = 0; i < all.length; i++) { // Fully materialize the solution set as a Bucket[]. - final Bucket[] t = toArray(all[i].getRightSolutions()); - + final Bucket[] t = all[i].getRightSolutions().toArray(); + /* * Sort the array. It's natural sort order is by the hash code * of the join variables. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-04-26 20:07:14 UTC (rev 7087) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-04-28 15:03:41 UTC (rev 7088) @@ -39,6 +39,7 @@ import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.join.JVMDistinctFilter; import com.bigdata.bop.join.JVMHashJoinUtility; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.striterator.ICloseableIterator; Deleted: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java 2013-04-26 20:07:14 UTC (rev 7087) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctFilter.java 2013-04-28 15:03:41 UTC (rev 7088) @@ -1,224 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Apr 26, 2013 - */ -package com.bigdata.bop.solutions; - -import java.util.Arrays; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; - -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstant; -import com.bigdata.bop.IVariable; -import com.bigdata.bop.bindingSet.ListBindingSet; - -/** - * Utility class for imposing a DISTINCT filter on {@link IBindingSet}. This - * class is thread-safe. It is based on a {@link ConcurrentHashMap}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - */ -public class JVMDistinctFilter { - - private static final Logger log = Logger.getLogger(JVMDistinctFilter.class); - - /** - * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. - */ - private static class Solution { - - private final int hash; - - private final IConstant<?>[] vals; - - public Solution(final IConstant<?>[] vals) { - this.vals = vals; - this.hash = java.util.Arrays.hashCode(vals); - } - - public int hashCode() { - return hash; - } - - public boolean equals(final Object o) { - if (this == o) - return true; - if (!(o instanceof Solution)) { - return false; - } - final Solution t = (Solution) o; - if (vals.length != t.vals.length) - return false; - for (int i = 0; i < vals.length; i++) { - // @todo verify that this allows for nulls with a unit test. - if (vals[i] == t.vals[i]) - continue; - if (vals[i] == null) - return false; - if (!vals[i].equals(t.vals[i])) - return false; - } - return true; - } - } - - /** - * The variables used to impose a distinct constraint. - */ - private final IVariable<?>[] vars; - - /** - * A concurrent map whose keys are the bindings on the specified variables - * (the keys and the values are the same since the map implementation does - * not allow <code>null</code> values). - * <p> - * Note: The map is shared state and can not be discarded or cleared until - * the last invocation!!! - */ - private final ConcurrentHashMap<Solution, Solution> map; - - /** - * - * @param vars - * The set of variables on which the DISTINCT filter will be - * imposed. Only these variables will be present in the - * "accepted" solutions. Any variable bindings not specified in - * this array will be dropped). - * @param initialCapacity - * @param loadFactor - * @param concurrencyLevel - */ - public JVMDistinctFilter(final IVariable<?>[] vars, - final int initialCapacity, final float loadFactor, - final int concurrencyLevel) { - - if (vars == null) - throw new IllegalArgumentException(); - - if (vars.length == 0) - throw new IllegalArgumentException(); - - this.vars = vars; - - this.map = new ConcurrentHashMap<Solution, Solution>(initialCapacity, - loadFactor, concurrencyLevel); - - } - - /** - * If the bindings are distinct for the configured variables then return - * those bindings. - * - * @param bset - * The binding set to be filtered. - * - * @return The distinct as bound values -or- <code>null</code> if the - * binding set duplicates a solution which was already accepted. - */ - public IConstant<?>[] accept(final IBindingSet bset) { - - final IConstant<?>[] r = new IConstant<?>[vars.length]; - - for (int i = 0; i < vars.length; i++) { - - /* - * Note: This allows null's. - * - * @todo write a unit test when some variables are not bound. - */ - r[i] = bset.get(vars[i]); - - } - - final Solution s = new Solution(r); - - if (log.isTraceEnabled()) - log.trace("considering: " + Arrays.toString(r)); - - final boolean distinct = map.putIfAbsent(s, s) == null; - - if (distinct && log.isDebugEnabled()) - log.debug("accepted: " + Arrays.toString(r)); - - return distinct ? r : null; - - } - - /** - * If the bindings are distinct for the configured variables then return a - * new {@link IBin... [truncated message content] |