From: <tho...@us...> - 2010-09-16 21:09:48
|
Revision: 3575 http://bigdata.svn.sourceforge.net/bigdata/?rev=3575&view=rev Author: thompsonbry Date: 2010-09-16 21:09:42 +0000 (Thu, 16 Sep 2010) Log Message: ----------- Modified SliceOp to be thread safe. It may be a CAS hotspot, but that can be addressed by batching each chunk through a lock. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-16 20:29:51 UTC (rev 3574) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-16 21:09:42 UTC (rev 3575) @@ -63,12 +63,6 @@ // */ // private final long startTime; -// /** -// * The index partition for which these statistics were collected or -1 -// * if the statistics are aggregated across index partitions. -// */ -// public final int partitionId; - /** * #of chunks in. */ @@ -116,7 +110,7 @@ sb.append(",unitsIn=" + unitsIn.estimate_get()); sb.append(",chunksOut=" + chunksOut.estimate_get()); sb.append(",unitsOut=" + unitsOut.estimate_get()); - toString(sb); + toString(sb); // extension hook sb.append("}"); return sb.toString(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-16 20:29:51 UTC (rev 3574) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-16 21:09:42 UTC (rev 3575) @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; @@ -55,11 +56,18 @@ * <p> * Note: When running on an {@link IBigdataFederation}, this operator must be * imposed on the query controller so it can count the solutions as they flow - * through. + * through - see {@link #getEvaluationContext()}. + * <p> + * Note: {@link SliceOp} is safe for concurrent invocations for the same query. + * Multiple chunks may flow through multiple invocations of the operator so long + * as they use the same {@link BOpStats} object. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * + * @todo unit test with stress test for concurrent {@link SliceOp} invocations + * against a streaming chunk producer. + * * @todo If this operator is invoked for each chunk output by a query onto the * pipeline then it will over produce unless (A) it is given the same * {@link BOpStats} each time; and (B) it is not invoked for two chunks @@ -156,7 +164,55 @@ return getProperty(Annotations.LIMIT, Annotations.DEFAULT_LIMIT); } + + /** + * Extends {@link BOpStats} to capture the state of the {@link SliceOp}. + */ + public static class SliceStats extends BOpStats { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public final AtomicLong nseen = new AtomicLong(); + + public final AtomicLong naccepted = new AtomicLong(); + + @Override + public void add(final BOpStats o) { + + super.add(o); + + if (o instanceof SliceStats) { + + final SliceStats t = (SliceStats) o; + + nseen.addAndGet(t.nseen.get()); + + naccepted.addAndGet(t.naccepted.get()); + + } + + } + + @Override + protected void toString(final StringBuilder sb) { + + sb.append(",nseed=" + nseen); + + sb.append(",naccepted=" + naccepted); + + } + + } + public SliceStats newStats() { + + return new SliceStats(); + + } + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { return new FutureTask<Void>(new SliceTask(this, context)); @@ -179,12 +235,14 @@ /** #of solutions to accept. */ private final long limit; - /** #of solutions visited. */ - private long nseen; - - /** #of solutions accepted. */ - private long naccepted; - +// /** #of solutions visited. */ +// private long nseen; +// +// /** #of solutions accepted. */ +// private long naccepted; +// + private final SliceStats stats; + SliceTask(final SliceOp op, final BOpContext<IBindingSet> context) { this.op = op; @@ -201,6 +259,8 @@ if (limit <= 0) throw new IllegalArgumentException(Annotations.LIMIT); + this.stats = (SliceStats) context.getStats(); + } public Void call() throws Exception { @@ -211,14 +271,9 @@ final IAsynchronousIterator<IBindingSet[]> source = context .getSource(); - /* - * @todo This needs to be wrapping to automatically update the #of - * chunks actually output in order to have correct reporting. Review - * all of the other operators for this same issue. - */ final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - final BOpStats stats = context.getStats(); +// final BOpStats stats = context.getStats(); try { @@ -230,17 +285,20 @@ while (source.hasNext()) { + /* + * @todo batch each chunk through a lock for better + * concurrency (avoids CAS contention). + */ final IBindingSet[] chunk = source.next(); - + stats.chunksIn.increment(); for (int i = 0; i < chunk.length; i++) { stats.unitsIn.increment(); - if (nseen < offset) { + if (stats.nseen.incrementAndGet() <= offset) { // skip solution. - nseen++; if(log.isTraceEnabled()) log.trace(toString()); continue; @@ -258,9 +316,7 @@ // stats.unitsOut.increment(); - naccepted++; - nseen++; - if (naccepted >= limit) { + if (stats.naccepted.incrementAndGet() >= limit) { if (!out.isEmpty()) { out.flush(); // stats.chunksOut.increment(); @@ -306,8 +362,8 @@ public String toString() { return getClass().getName() + "{offset=" + offset + ",limit=" - + limit + ",nseen=" + nseen + ",naccepted=" + naccepted - + "}"; + + limit + ",nseen=" + stats.nseen + ",naccepted=" + + stats.naccepted + "}"; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 20:29:51 UTC (rev 3574) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 21:09:42 UTC (rev 3575) @@ -643,7 +643,9 @@ * state across distinct invocations and is cancelling the query as soon as * it exhausts its input. In order to have correct decision boundaries, * slice needs to be invoked either once, concurrently if using {@link CAT} - * s, or in a series of presentations otherwise. + * s, or in a series of presentations otherwise (single-threaded operator or + * internal locking in the operator implementation on its {@link SliceOp} to + * achieve chunk-wise serialization of processing). * <p> * The easiest way to fix this is to have {@link SliceOp} specialize the * {@link BOpStats}s and carry its state there. That will also make it safe This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |