From: <tho...@us...> - 2010-09-01 22:49:05
|
Revision: 3493 http://bigdata.svn.sourceforge.net/bigdata/?rev=3493&view=rev Author: thompsonbry Date: 2010-09-01 22:48:57 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Fixed bug in test harness for verifying solutions. Added a hash-map based distinct solutions operator. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/DistinctElementFilter.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -66,11 +66,32 @@ * * @param src */ - protected HashBindingSet(HashBindingSet src) { + protected HashBindingSet(final HashBindingSet src) { map = new LinkedHashMap<IVariable, IConstant>(src.map); } + + /** + * Copy constructor. + * + * @param src + */ + public HashBindingSet(final IBindingSet src) { + + map = new LinkedHashMap<IVariable, IConstant>(src.size()); + + final Iterator<Map.Entry<IVariable, IConstant>> itr = src.iterator(); + + while (itr.hasNext()) { + + final Map.Entry<IVariable, IConstant> e = itr.next(); + + map.put(e.getKey(), e.getValue()); + + } + + } public boolean isBound(IVariable var) { @@ -119,7 +140,7 @@ public String toString() { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); sb.append("{ "); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -0,0 +1,335 @@ +package com.bigdata.bop.aggregation; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.AbstractPipelineOp; +import com.bigdata.bop.ArrayBindingSet; +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * A pipelined DISTINCT operator based on a hash table. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + */ +public class DistinctBindingSetOp extends AbstractPipelineOp<IBindingSet>{ + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends BOp.Annotations { + + /** + * The initial capacity of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_INITIAL_CAPACITY + */ + String INITIAL_CAPACITY = "initialCapacity"; + + int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The load factor of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_LOAD_FACTOR + */ + String LOAD_FACTOR = "loadFactor"; + + float DEFAULT_LOAD_FACTOR = .75f; + + /** + * The concurrency level of the {@link ConcurrentHashMap} used to impose + * the distinct constraint. + * + * @see #DEFAULT_CONCURRENCY_LEVEL + */ + String CONCURRENCY_LEVEL = "concurrencyLevel"; + + int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The variables on which the distinct constraint will be imposed. + * Binding sets with distinct values for the specified variables will be + * passed on. + */ + String VARIABLES = DistinctBindingSetOp.class.getName() + ".variables"; + + } + + /** + * Required deep copy constructor. + */ + public DistinctBindingSetOp(final DistinctBindingSetOp op) { + super(op); + } + + /** + * Required shallow copy constructor. + */ + public DistinctBindingSetOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + final IVariable<?>[] vars = getVariables(); + + if (vars == null) + throw new IllegalArgumentException(); + + if (vars.length == 0) + throw new IllegalArgumentException(); + + } + + /** + * @see Annotations#INITIAL_CAPACITY + */ + public int getInitialCapacity() { + + return getProperty(Annotations.INITIAL_CAPACITY, + Annotations.DEFAULT_INITIAL_CAPACITY); + + } + + /** + * @see Annotations#LOAD_FACTOR + */ + public float getLoadFactor() { + + return getProperty(Annotations.LOAD_FACTOR, + Annotations.DEFAULT_LOAD_FACTOR); + + } + + /** + * @see Annotations#CONCURRENCY_LEVEL + */ + public int getConcurrencyLevel() { + + return getProperty(Annotations.CONCURRENCY_LEVEL, + Annotations.DEFAULT_CONCURRENCY_LEVEL); + + } + + /** + * @see Annotations#VARIABLES + */ + public IVariable<?>[] getVariables() { + + return (IVariable<?>[]) annotations.get(Annotations.VARIABLES); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new DistinctTask(this, context)); + + } + + /** + * Wrapper used for the as bound solutions in the {@link ConcurrentHashMap}. + */ + private static class Solution { + private final int hash; + + private final IConstant<?>[] vals; + + public Solution(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof Solution)) { + return false; + } + final Solution t = (Solution) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo allow for nulls. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + } + + /** + * Task executing on the node. + */ + private class DistinctTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + */ + private /*final*/ ConcurrentHashMap<Solution, Solution> map; + + /** + * The variables used to impose a distinct constraint. + */ + private final IVariable<?>[] vars; + + DistinctTask(final DistinctBindingSetOp op, + final BOpContext<IBindingSet> context) { + + this.context = context; + + this.vars = op.getVariables(); + + this.map = new ConcurrentHashMap<Solution, Solution>( + getInitialCapacity(), getLoadFactor(), + getConcurrencyLevel()); + + } + + /** + * If the bindings are distinct for the configured variables then return + * those bindings. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + private IConstant<?>[] accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[vars.length]; + + for (int i = 0; i < vars.length; i++) { + + if ((r[i] = bset.get(vars[i])) == null) { + + /* + * @todo probably allow nulls, but write a unit test for it. + */ + + throw new RuntimeException("Not bound: " + vars[i]); + + } + + } + + final Solution s = new Solution(r); + + final boolean distinct = map.putIfAbsent(s, s) == null; + + return distinct ? r : null; + + } + + public Void call() throws Exception { + + final BOpStats stats = context.getStats(); + + final IAsynchronousIterator<IBindingSet[]> itr = context + .getSource(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + try { + + while (itr.hasNext()) { + + final IBindingSet[] a = itr.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(a.length); + + final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); + + int naccepted = 0; + + for (IBindingSet bset : a) { + +// System.err.println("considering: " + bset); + + final IConstant<?>[] vals = accept(bset); + + if (vals != null) { + +// System.err.println("accepted: " +// + Arrays.toString(vals)); + + /* + * @todo This may cause problems since the + * ArrayBindingSet does not allow mutation with + * variables not declared up front. In that case use + * new HashBindingSet( new ArrayBindingSet(...)). + */ + + accepted.add(new ArrayBindingSet(vars, vals)); + + naccepted++; + + } + + } + + if (naccepted > 0) { + + final IBindingSet[] b = accepted + .toArray(new IBindingSet[naccepted]); + +// System.err.println("output: " +// + Arrays.toString(b)); + + sink.add(b); + + stats.unitsOut.add(naccepted); + stats.chunksOut.increment(); + + } + + } + + // done. + return null; + + } finally { + + sink.flush(); + sink.close(); + + // discard the map. + map = null; + + } + + } + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregation/DistinctBindingSetOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/DistinctElementFilter.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/DistinctElementFilter.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/DistinctElementFilter.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -10,8 +10,10 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; +import com.bigdata.bop.aggregation.DistinctBindingSetOp; import com.bigdata.btree.keys.KeyBuilder; import com.bigdata.rdf.relation.rule.BindingSetSortKeyBuilder; +import com.bigdata.rdf.spo.DistinctSPOIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.rule.eval.IJoinNexus; import com.bigdata.relation.rule.eval.ISolution; @@ -23,7 +25,8 @@ * A DISTINCT operator based on a hash table. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ * @param <E> * * @todo could have an implementation backed by a persistent hash map using an @@ -40,7 +43,8 @@ * increase the map concurrency level, etc. * * @todo Reconcile with {@link IChunkConverter}, {@link DistinctFilter} (handles - * solutions) and {@link MergeFilter} (handles comparables). + * solutions) and {@link MergeFilter} (handles comparables), + * {@link DistinctSPOIterator}, {@link DistinctBindingSetOp}, etc. */ public class DistinctElementFilter<E> extends BOpBase @@ -62,14 +66,15 @@ String LOAD_FACTOR = "loadFactor"; String CONCURRENCY_LEVEL = "concurrencyLevel"; - + } - public DistinctElementFilter(final IVariable<?>[] distinctList, final UUID masterUUID) { + public DistinctElementFilter(final IVariable<?>[] distinctList, + final UUID masterUUID) { super(distinctList, NV.asMap(new NV[] { -// new NV(Annotations.QUERY_ID, masterUUID), - // new NV(Annotations.BOP_ID, bopId) + // new NV(Annotations.QUERY_ID, masterUUID), + // new NV(Annotations.BOP_ID, bopId) })); if (masterUUID == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -54,7 +53,6 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.Haltable; -import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BytesUtil; import com.bigdata.btree.keys.IKeyBuilder; import com.bigdata.counters.CAT; @@ -66,7 +64,6 @@ import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.IStarJoin; import com.bigdata.relation.rule.IStarJoin.IStarConstraint; @@ -385,193 +382,8 @@ */ final PipelineJoinStats stats; - /** - * A factory pattern for per-thread objects whose life cycle is tied to - * some container. For example, there may be an instance of this pool - * for a {@link JoinTask} or an {@link AbstractBTree}. The pool can be - * torn down when the container is torn down, which prevents its - * thread-local references from escaping. - * - * @author tho...@us... - * @param <T> - * The generic type of the thread-local object. - * - * @todo There should be two implementations of a common interface or - * abstract base class: one based on a private - * {@link ConcurrentHashMap} and the other on striped locks. The - * advantage of the {@link ConcurrentHashMap} is approximately 3x - * higher concurrency. The advantage of striped locks is that you - * can directly manage the #of buffers when when the threads using - * those buffers is unbounded. However, doing so could lead to - * deadlock since two threads can be hashed onto the same buffer - * object. - */ - abstract public class ThreadLocalFactory<T extends IBuffer<E>, E> { + final private ThreadLocalBufferFactory<AbstractUnsynchronizedArrayBuffer<IBindingSet>, IBindingSet> threadLocalBufferFactory = new ThreadLocalBufferFactory<AbstractUnsynchronizedArrayBuffer<IBindingSet>, IBindingSet>() { - /** - * The thread-local queues. - */ - private final ConcurrentHashMap<Thread, T> map; - - /** - * A list of all objects visible to the caller. This is used to - * ensure that any objects allocated by the factory are visited. - * - * <p> - * Note: Since the collection is not thread-safe, synchronization is - * required when adding to the collection and when visiting the - * elements of the collection. - */ - private final LinkedList<T> list = new LinkedList<T>(); - - protected ThreadLocalFactory() { - - this(16/* initialCapacity */, .75f/* loadFactor */, 16/* concurrencyLevel */); - - } - - protected ThreadLocalFactory(final int initialCapacity, - final float loadFactor, final int concurrencyLevel) { - - map = new ConcurrentHashMap<Thread, T>(initialCapacity, - loadFactor, concurrencyLevel); - - } - - /** - * Return the #of thread-local objects. - */ - final public int size() { - - return map.size(); - - } - - /** - * Add the element to the thread-local buffer. - * - * @param e - * An element. - * - * @throws IllegalStateException - * if the factory is asynchronously closed. - */ - public void add(final E e) { - - get().add(e); - - } - - /** - * Return a thread-local buffer - * - * @return The thread-local buffer. - * - * @throws RuntimeException - * if the join is halted. - */ - final private T get() { - final Thread t = Thread.currentThread(); - T tmp = map.get(t); - if (tmp == null) { - if (map.put(t, tmp = initialValue()) != null) { - /* - * Note: Since the key is the thread it is not possible - * for there to be a concurrent put of an entry under - * the same key so we do not have to use putIfAbsent(). - */ - throw new AssertionError(); - } - // Add to list. - synchronized (list) { - list.add(tmp); - } - } - halted(); - return tmp; - } - - /** - * Flush each of the unsynchronized buffers onto their backing - * synchronized buffer. - * - * @throws RuntimeException - * if the join is halted. - */ - public void flush() { - synchronized (list) { - int n = 0; - long m = 0L; - for (T b : list) { - halted(); - // #of elements to be flushed. - final int size = b.size(); - // flush, returning total #of elements written onto this - // buffer. - final long counter = b.flush(); - m += counter; - if (log.isDebugEnabled()) - log.debug("Flushed buffer: size=" + size - + ", counter=" + counter); - } - if (log.isInfoEnabled()) - log.info("Flushed " + n - + " unsynchronized buffers totalling " + m - + " elements"); - } - } - - /** - * Reset each of the synchronized buffers, discarding their buffered - * writes. - * <p> - * Note: This method is used during error processing, therefore it - * DOES NOT check {@link JoinTask#halt}. - */ - public void reset() { - synchronized (list) { - int n = 0; - for (T b : list) { - // #of elements in the buffer before reset(). - final int size = b.size(); - // reset the buffer. - b.reset(); - if (log.isDebugEnabled()) - log.debug("Reset buffer: size=" + size); - } - if (log.isInfoEnabled()) - log.info("Reset " + n + " unsynchronized buffers"); - } - } - - // /** - // * Reset the per-{@link Thread} unsynchronized output buffers - // (used as - // * part of error handling for the {@link JoinTask}). - // */ - // final protected void resetUnsyncBuffers() throws Exception { - // - // final int n = threadLocalBufferFactory.reset(); - // .close(new - // Visitor<AbstractUnsynchronizedArrayBuffer<IBindingSet>>() { - // - // @Override - // public void meet( - // final AbstractUnsynchronizedArrayBuffer<IBindingSet> b) - // throws Exception { - // - // - // } - - /** - * Create and return a new object. - */ - abstract protected T initialValue(); - - } - - final private ThreadLocalFactory<AbstractUnsynchronizedArrayBuffer<IBindingSet>, IBindingSet> threadLocalBufferFactory = new ThreadLocalFactory<AbstractUnsynchronizedArrayBuffer<IBindingSet>, IBindingSet>() { - @Override protected AbstractUnsynchronizedArrayBuffer<IBindingSet> initialValue() { @@ -579,6 +391,14 @@ return newUnsyncOutputBuffer(); } + + @Override + protected void halted() { + + JoinTask.this.halted(); + + } + }; public String toString() { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -0,0 +1,232 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.join; + +import java.util.LinkedList; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.engine.Haltable; +import com.bigdata.btree.AbstractBTree; +import com.bigdata.relation.accesspath.IBuffer; +import com.bigdata.relation.rule.eval.pipeline.JoinTask; + +/** + * A factory pattern for per-thread objects whose life cycle is tied to some + * container. For example, there may be an instance of this pool for a + * {@link JoinTask} or an {@link AbstractBTree}. The pool can be torn down when + * the container is torn down, which prevents its thread-local references from + * escaping. + * + * @author tho...@us... + * @version $Id$ + * @param <T> + * The generic type of the thread-local object. + * + * @todo There should be two implementations of a common interface or abstract + * base class: one based on a private {@link ConcurrentHashMap} and the + * other on striped locks. The advantage of the {@link ConcurrentHashMap} + * is approximately 3x higher concurrency. The advantage of striped locks + * is that you can directly manage the #of buffers when when the threads + * using those buffers is unbounded. However, doing so could lead to + * deadlock since two threads can be hashed onto the same buffer object. + * + * @todo refactor into our concurrency package? + */ +abstract public class ThreadLocalBufferFactory<T extends IBuffer<E>, E> { + + static private final Logger log = Logger + .getLogger(ThreadLocalBufferFactory.class); + + /** + * The thread-local queues. + */ + private final ConcurrentHashMap<Thread, T> map; + + /** + * A list of all objects visible to the caller. This is used to ensure that + * any objects allocated by the factory are visited. + * + * <p> + * Note: Since the collection is not thread-safe, synchronization is + * required when adding to the collection and when visiting the elements of + * the collection. + */ + private final LinkedList<T> list = new LinkedList<T>(); + + protected ThreadLocalBufferFactory() { + + this(16/* initialCapacity */, .75f/* loadFactor */, 16/* concurrencyLevel */); + + } + + protected ThreadLocalBufferFactory(final int initialCapacity, + final float loadFactor, final int concurrencyLevel) { + + map = new ConcurrentHashMap<Thread, T>(initialCapacity, loadFactor, + concurrencyLevel); + + } + + /** + * Return the #of thread-local objects. + */ + final public int size() { + + return map.size(); + + } + + /** + * Add the element to the thread-local buffer. + * + * @param e + * An element. + * + * @throws IllegalStateException + * if the factory is asynchronously closed. + */ + public void add(final E e) { + + get().add(e); + + } + + /** + * Return a thread-local buffer + * + * @return The thread-local buffer. + * + * @throws RuntimeException + * if the join is halted. + */ + final public T get() { + final Thread t = Thread.currentThread(); + T tmp = map.get(t); + if (tmp == null) { + if (map.put(t, tmp = initialValue()) != null) { + /* + * Note: Since the key is the thread it is not possible for + * there to be a concurrent put of an entry under the same key + * so we do not have to use putIfAbsent(). + */ + throw new AssertionError(); + } + // Add to list. + synchronized (list) { + list.add(tmp); + } + } + halted(); + return tmp; + } + + /** + * Flush each of the unsynchronized buffers onto their backing synchronized + * buffer. + * + * @throws RuntimeException + * if the join is halted. + */ + public void flush() { + synchronized (list) { + int n = 0; + long m = 0L; + for (T b : list) { + halted(); + // #of elements to be flushed. + final int size = b.size(); + // flush, returning total #of elements written onto this + // buffer. + final long counter = b.flush(); + m += counter; + if (log.isDebugEnabled()) + log.debug("Flushed buffer: size=" + size + ", counter=" + + counter); + } + if (log.isInfoEnabled()) + log.info("Flushed " + n + " unsynchronized buffers totalling " + + m + " elements"); + } + } + + /** + * Reset each of the synchronized buffers, discarding their buffered writes. + * <p> + * Note: This method is used during error processing, therefore it DOES NOT + * check {@link JoinTask#halt}. + */ + public void reset() { + synchronized (list) { + int n = 0; + for (T b : list) { + // #of elements in the buffer before reset(). + final int size = b.size(); + // reset the buffer. + b.reset(); + if (log.isDebugEnabled()) + log.debug("Reset buffer: size=" + size); + } + if (log.isInfoEnabled()) + log.info("Reset " + n + " unsynchronized buffers"); + } + } + + // /** + // * Reset the per-{@link Thread} unsynchronized output buffers + // (used as + // * part of error handling for the {@link JoinTask}). + // */ + // final protected void resetUnsyncBuffers() throws Exception { + // + // final int n = threadLocalBufferFactory.reset(); + // .close(new + // Visitor<AbstractUnsynchronizedArrayBuffer<IBindingSet>>() { + // + // @Override + // public void meet( + // final AbstractUnsynchronizedArrayBuffer<IBindingSet> b) + // throws Exception { + // + // + // } + + /** + * Create and return a new object. + */ + abstract protected T initialValue(); + + /** + * Test to see if the process has been halted. + * + * @see Haltable#halted() + */ + abstract protected void halted(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/ThreadLocalBufferFactory.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/aggregation/TestDistinctBindingSets.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -27,12 +27,35 @@ package com.bigdata.bop.aggregation; -import com.bigdata.bop.ap.DistinctElementFilter; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import junit.framework.TestCase2; +import com.bigdata.bop.ArrayBindingSet; +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.HashBindingSet; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.Var; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.TestQueryEngine; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.ITx; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + /** - * Unit tests for {@link DistinctElementFilter}. + * Unit tests for {@link DistinctBindingSetOp}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -52,14 +75,157 @@ super(name); } + @Override + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + Journal jnl = null; + + List<IBindingSet> data = null; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + setUpData(); + + } + /** - * @todo write unit tests for distinct based on purely local evaluation. + * Setup the data. + */ + private void setUpData() { + + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + + data = new LinkedList<IBindingSet>(); + IBindingSet bset = null; + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("John")); + bset.set(y, new Constant<String>("Mary")); + data.add(bset); + } + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("Mary")); + bset.set(y, new Constant<String>("Paul")); + data.add(bset); + } + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("Mary")); + bset.set(y, new Constant<String>("Jane")); + data.add(bset); + } + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("Paul")); + bset.set(y, new Constant<String>("Leon")); + data.add(bset); + } + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("Paul")); + bset.set(y, new Constant<String>("John")); + data.add(bset); + } + { + bset = new HashBindingSet(); + bset.set(x, new Constant<String>("Leon")); + bset.set(y, new Constant<String>("Paul")); + data.add(bset); + } + + } + + public void tearDown() throws Exception { + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + // clear reference. + data = null; + + } + + /** + * Unit test for distinct. * - * @todo write unit tests for distinct based on a hash partitioned DISTINCT - * filter. + * @throws ExecutionException + * @throws InterruptedException */ - public void test_something() { - fail("write tests"); + public void test_distinct() throws InterruptedException, ExecutionException { + + final Var<?> x = Var.var("x"); +// final Var<?> y = Var.var("y"); + + final int distinctId = 1; + + final DistinctBindingSetOp query = new DistinctBindingSetOp(new BOp[]{}, + NV.asMap(new NV[]{// + new NV(DistinctBindingSetOp.Annotations.BOP_ID,distinctId),// + new NV(DistinctBindingSetOp.Annotations.VARIABLES,new IVariable[]{x}),// + })); + + // the expected solutions + final IBindingSet[] expected = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { x },// + new IConstant[] { new Constant<String>("John") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x },// + new IConstant[] { new Constant<String>("Mary") }// + ), new ArrayBindingSet(// + new IVariable[] { x },// + new IConstant[] { new Constant<String>("Paul") }// + ), new ArrayBindingSet(// + new IVariable[] { x },// + new IConstant[] { new Constant<String>("Leon") }// + ), }; + + final BOpStats stats = query.newStats(); + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { data.toArray(new IBindingSet[0]) }); + + final IBlockingBuffer<IBindingSet[]> sink = query.newBuffer(); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + null/* fed */, jnl/* indexManager */, + ITx.READ_COMMITTED/* readTimestamp */, + ITx.UNISOLATED/* writeTimestamp */, -1/* partitionId */, stats, + source, sink, null/* sink2 */); + + // get task. + final FutureTask<Void> ft = query.eval(context); + + // execute task. + jnl.getExecutorService().execute(ft); + + TestQueryEngine.assertSolutions(expected, sink.iterator()); + + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + ft.get(); // verify nothing thrown. + + assertEquals(1L, stats.chunksIn.get()); + assertEquals(6L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -56,6 +56,7 @@ import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; import com.bigdata.bop.NoSuchBOpException; +import com.bigdata.bop.aggregation.DistinctBindingSetOp; import com.bigdata.bop.ap.Predicate; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -187,10 +188,12 @@ * <p> * This is guarded by the {@link #runningStateLock}. * - * FIXME {@link IConstraint}s for {@link PipelineJoin}, distinct elements - * and other filters for {@link IPredicate}s, conditional routing for - * binding sets in the pipeline (to route around an optional join group - * based on an {@link IConstraint}), and then buffer management for s/o. + * FIXME {@link IConstraint}s for {@link PipelineJoin}, non-distributed + * filters for {@link IPredicate}s, distinct element filter for + * {@link IPredicate} which is capable of distributed operations, + * conditional routing for binding sets in the pipeline (to route around an + * optional join group based on an {@link IConstraint}), SPARQL to BOP + * translation, and then buffer management for s/o. * * @todo SCALEOUT: Life cycle management of the operators and the query * implies both a per-query bop:NodeList map on the query coordinator Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -153,14 +153,20 @@ public void tearDown() throws Exception { - if (queryEngine != null) + if (queryEngine != null) { queryEngine.shutdownNow(); + queryEngine = null; + } - if (bufferService != null) + if (bufferService != null) { bufferService.shutdownNow(); + bufferService = null; + } - if (jnl != null) + if (jnl != null) { jnl.destroy(); + jnl = null; + } } @@ -573,7 +579,7 @@ * @param expected * @param itr */ - protected void assertSolutions(final IBindingSet[] expected, + static public void assertSolutions(final IBindingSet[] expected, final IAsynchronousIterator<IBindingSet[]> itr) { try { int n = 0; @@ -588,9 +594,10 @@ fail("n=" + n + ", expected=" + expected[n] + ", actual=" + e[i]); } + n++; } - n++; } + assertEquals("Wrong number of solutions", expected.length, n); } finally { itr.close(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-01 21:16:52 UTC (rev 3492) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-09-01 22:48:57 UTC (rev 3493) @@ -50,6 +50,7 @@ import com.bigdata.bop.ap.E; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; +import com.bigdata.bop.engine.TestQueryEngine; import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; @@ -143,8 +144,13 @@ public void tearDown() throws Exception { - if (jnl != null) + if (jnl != null) { + jnl.destroy(); + + jnl = null; + + } } @@ -252,20 +258,22 @@ // execute task. jnl.getExecutorService().execute(ft); - final IAsynchronousIterator<IBindingSet[]> itr = sink.iterator(); - try { - int n = 0; - while (itr.hasNext()) { - final IBindingSet[] chunk = itr.next(); - if (log.isInfoEnabled()) - log.info(n + " : chunkSize=" + chunk.length); - for (int i = 0; i < chunk.length; i++) { - assertTrue(expected[n++].equals(chunk[i])); - } - } - } finally { - itr.close(); - } + TestQueryEngine.assertSolutions(expected, sink.iterator()); +// final IAsynchronousIterator<IBindingSet[]> itr = sink.iterator(); +// try { +// int n = 0; +// while (itr.hasNext()) { +// final IBindingSet[] chunk = itr.next(); +// if (log.isInfoEnabled()) +// log.info(n + " : chunkSize=" + chunk.length); +// for (int i = 0; i < chunk.length; i++) { +// assertTrue(expected[n++].equals(chunk[i])); +// } +// } +// assertEquals(n, expected.length); +// } finally { +// itr.close(); +// } // join task assertEquals(1L, stats.chunksIn.get()); @@ -366,6 +374,7 @@ // assertTrue(expected[n++].equals(chunk[i])); // } // } +// assertEquals(n, expected.length); // } finally { // itr.close(); // } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |