From: <tho...@us...> - 2010-09-15 19:52:07
|
Revision: 3558 http://bigdata.svn.sourceforge.net/bigdata/?rev=3558&view=rev Author: thompsonbry Date: 2010-09-15 19:52:00 +0000 (Wed, 15 Sep 2010) Log Message: ----------- Added stress tests and tracked down the query termination problem. In fact, it was an assertion on totalAvailableChunkCount in RunState. The total #of available chunks can transiently become negative during query evaluation due to the interleaving of operators for a given query. The assertion has been removed (conditionally disabled). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 19:52:00 UTC (rev 3558) @@ -72,7 +72,7 @@ * operator. */ final public int altSinkChunksOut; - + /** * The statistics for the execution of the bop against the partition on the * service. @@ -119,4 +119,21 @@ this.taskStats = taskStats; } + public String toString() { + final StringBuilder sb = new StringBuilder(getClass().getName()); + sb.append("{queryId=" + queryId); + sb.append(",bopId=" + bopId); + sb.append(",partitionId=" + partitionId); + sb.append(",serviceId=" + serviceId); + if (cause != null) + sb.append(",cause=" + cause); + sb.append(",sinkId=" + sinkId); + sb.append(",sinkChunksOut=" + sinkChunksOut); + sb.append(",altSinkId=" + altSinkId); + sb.append(",altSinkChunksOut=" + altSinkChunksOut); + sb.append(",stats=" + taskStats); + sb.append("}"); + return sb.toString(); + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 19:52:00 UTC (rev 3558) @@ -42,7 +42,9 @@ import com.bigdata.bop.BOp; /** - * The run state for a {@link RunningQuery}. + * The run state for a {@link RunningQuery}. This class is NOT thread-safe. + * {@link RunningQuery} uses an internal lock to serialize requests against the + * public methods of this class. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -61,6 +63,16 @@ } /** + * Note: Due to concurrency, it is possible for an {@link IChunkMessage} to + * be accepted and the corresponding chunk task started, before a + * {@link RunState#startOp(StartOpMessage)} transition has been fully + * processed. This means that the {@link RunState#totalAvailableChunkCount} + * can become transiently negative. This flag disables asserts which would + * otherwise fail on legal transient negatives. + */ + static private boolean availableChunkCountMayBeNegative = true; + + /** * The query. */ private final RunningQuery query; @@ -76,19 +88,13 @@ private long nsteps = 0; /** - * The #of tasks for this query which have started but not yet halted and - * ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. + * The #of tasks for this query which have started but not yet halted. */ private long totalRunningTaskCount = 0; /** * The #of chunks for this query of which a running task has made available - * but which have not yet been accepted for processing by another task and - * ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. + * but which have not yet been accepted for processing by another task. */ private long totalAvailableChunkCount = 0; @@ -100,27 +106,22 @@ * <p> * The movement of the intermediate binding set chunks forms an acyclic * directed graph. This map is used to track the #of chunks available for - * each bop in the pipeline. When a bop has no more incoming chunks, we send - * an asynchronous message to all nodes on which that bop had executed - * informing the {@link QueryEngine} on that node that it should immediately - * release all resources associated with that bop. - * <p> - * This is guarded by the {@link #runningStateLock}. + * each {@link BOp} in the pipeline. When a {@link BOp} has no more incoming + * chunks, we send an asynchronous message to all nodes on which that + * {@link BOp} had executed informing the {@link QueryEngine} on that node + * that it should immediately release all resources associated with that + * {@link BOp}. */ private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); /** * A collection reporting on the #of instances of a given {@link BOp} which * are concurrently executing. - * <p> - * This is guarded by the {@link #runningStateLock}. */ private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningTaskCountMap = new LinkedHashMap<Integer, AtomicLong>(); /** * A collection of the operators which have executed at least once. - * <p> - * This is guarded by the {@link #runningStateLock}. */ private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>(); @@ -140,6 +141,9 @@ // query.lifeCycleSetUpQuery(); + if (log.isInfoEnabled()) + log.info(msg.toString()); + final Integer bopId = Integer.valueOf(msg.getBOpId()); totalAvailableChunkCount++; @@ -161,11 +165,6 @@ } - if (log.isInfoEnabled()) - log.info("queryId=" + queryId + ",totalRunningTaskCount=" - + totalRunningTaskCount + ",totalAvailableChunkCount=" - + totalAvailableChunkCount); - if (TableLog.tableLog.isInfoEnabled()) { /* * Note: RunState is only used by the query controller so this will @@ -180,13 +179,14 @@ TableLog.tableLog.info("\n\nqueryId=" + queryId + "\n"); // TableLog.tableLog.info(query.getQuery().toString()+"\n"); TableLog.tableLog.info(getTableHeader()); - TableLog.tableLog.info(getTableRow("startQ", serviceId, - -1/* shardId */, 1/* fanIn */)); + TableLog.tableLog + .info(getTableRow("startQ", serviceId, msg.getBOpId(), + -1/* shardId */, 1/* fanIn */, null/* stats */)); } - System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId - + ",totalRunningTaskCount=" + totalRunningTaskCount - + ",totalAvailableTaskCount=" + totalAvailableChunkCount); +// System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId +// + ",totalRunningTaskCount=" + totalRunningTaskCount +// + ",totalAvailableTaskCount=" + totalAvailableChunkCount); } @@ -206,7 +206,9 @@ totalRunningTaskCount++; assert totalRunningTaskCount >= 1 : "runningTaskCount=" - + totalRunningTaskCount + " :: msg=" + msg; + + totalRunningTaskCount + " :: runState=" + this + ", msg=" + + msg; + final boolean firstTime; { @@ -218,7 +220,7 @@ final long tmp = n.incrementAndGet(); assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId=" - + msg.bopId + " :: msg=" + msg; + + msg.bopId + " :: runState=" + this + ", msg=" + msg; firstTime = startedSet.add(bopId); // @@ -231,35 +233,38 @@ totalAvailableChunkCount -= msg.nchunks; - assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" - + totalAvailableChunkCount + " :: msg=" + msg; + assert availableChunkCountMayBeNegative || totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" + + totalAvailableChunkCount + " :: runState=" + this + ", msg=" + + msg; { AtomicLong n = availableChunkCountMap.get(bopId); if (n == null) - throw new AssertionError(); + availableChunkCountMap.put(bopId, n = new AtomicLong()); final long tmp = n.addAndGet(-msg.nchunks); - assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" - + msg.bopId + " :: msg=" + msg; + assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.bopId + " :: runState=" + this + ", msg=" + msg; } - System.err.println("startOp: nstep="+nsteps+", bopId=" + bopId - + ",totalRunningTaskCount=" + totalRunningTaskCount - + ",totalAvailableChunkCount=" + totalAvailableChunkCount - + ",fanIn=" + msg.nchunks); +// System.err.println("startOp: nstep=" + nsteps + ", bopId=" + bopId +// + ",totalRunningTaskCount=" + totalRunningTaskCount +// + ",totalAvailableChunkCount=" + totalAvailableChunkCount +// + ",fanIn=" + msg.nchunks); if (TableLog.tableLog.isInfoEnabled()) { - TableLog.tableLog.info(getTableRow("startOp", msg.serviceId, - msg.partitionId, msg.nchunks/* fanIn */)); + TableLog.tableLog + .info(getTableRow("startOp", msg.serviceId, msg.bopId, + msg.partitionId, msg.nchunks/* fanIn */, null/* stats */)); } // check deadline. final long deadline = query.getDeadline(); + if (deadline < System.currentTimeMillis()) { if (log.isTraceEnabled()) @@ -271,6 +276,7 @@ query.cancel(true/* mayInterruptIfRunning */); } + return firstTime; } @@ -291,8 +297,9 @@ totalAvailableChunkCount += fanOut; - assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" - + totalAvailableChunkCount + " :: msg=" + msg; + assert availableChunkCountMayBeNegative || totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" + + totalAvailableChunkCount + " :: runState=" + this + + ", msg=" + msg; if (msg.sinkId != null) { AtomicLong n = availableChunkCountMap.get(msg.sinkId); @@ -302,8 +309,8 @@ final long tmp = n.addAndGet(msg.sinkChunksOut); - assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" - + msg.sinkId + " :: msg=" + msg; + assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.sinkId + " :: runState=" + this + ", msg=" + msg; } @@ -317,8 +324,9 @@ final long tmp = n.addAndGet(msg.altSinkChunksOut); - assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" - + msg.altSinkId + " :: msg=" + msg; + assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.altSinkId + " :: runState=" + this + ", msg=" + + msg; } @@ -328,7 +336,8 @@ totalRunningTaskCount--; assert totalRunningTaskCount >= 0 : "runningTaskCount=" - + totalRunningTaskCount + " :: msg=" + msg; + + totalRunningTaskCount + " :: runState=" + this + ", msg=" + + msg; { @@ -340,32 +349,35 @@ final long tmp = n.decrementAndGet(); assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId=" - + msg.bopId + " :: msg=" + msg; + + msg.bopId + " :: runState=" + this + ", msg=" + msg; } // Figure out if this operator is done. final boolean isDone = isOperatorDone(msg.bopId); - System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId - + ",totalRunningTaskCount=" + totalRunningTaskCount - + ",totalAvailableTaskCount=" + totalAvailableChunkCount - + ",fanOut=" + fanOut); +// System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId +// + ",totalRunningTaskCount=" + totalRunningTaskCount +// + ",totalAvailableTaskCount=" + totalAvailableChunkCount +// + ",fanOut=" + fanOut); if (TableLog.tableLog.isInfoEnabled()) { TableLog.tableLog.info(getTableRow("haltOp", msg.serviceId, - msg.partitionId, fanOut)); + msg.bopId, msg.partitionId, fanOut, msg.taskStats)); } - if (log.isTraceEnabled()) - log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId - + ",serviceId=" + query.getQueryEngine().getServiceUUID() - + ", nchunks=" + fanOut + " : totalRunningTaskCount=" - + totalRunningTaskCount + ", totalAvailableChunkCount=" - + totalAvailableChunkCount); +// if (log.isTraceEnabled()) +// log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId +// + ",serviceId=" + query.getQueryEngine().getServiceUUID() +// + ", nchunks=" + fanOut + " : totalRunningTaskCount=" +// + totalRunningTaskCount + ", totalAvailableChunkCount=" +// + totalAvailableChunkCount); - // test termination criteria + /* + * Test termination criteria + */ final long deadline = query.getDeadline(); + if (msg.cause != null) { // operator failed on this chunk. @@ -458,7 +470,7 @@ Arrays.sort(bopIds); // header 2. - sb.append("step\tlabel\tshardId\tfanIO\tavail\trun"); + sb.append("step\tlabel\tbopId\tshardId\tfanIO\tavail\trun"); for (int i = 0; i < bopIds.length; i++) { @@ -470,6 +482,10 @@ sb.append("\tserviceId"); + sb.append("\tbop"); + + sb.append("\tstats"); + sb.append('\n'); return sb.toString(); @@ -485,17 +501,23 @@ * @param label * The state change level (startQ, startOp, haltOp). * @param serviceId - * The node on which the operator is/was executed. + * The node on which the operator will be / was executed. + * @param bopId + * The identifier for the bop which will be / was executed. * @param shardId * The index partition against which the operator was running and * <code>-1</code> if the operator was not evaluated against a * specific index partition. - * @param * @param fanIO * The fanIn (startQ,startOp) or fanOut (haltOp). + * @param stats + * The statistics from the operator evaluation and + * <code>null</code> unless {@link #haltOp(HaltOpMessage)} is + * the invoker. */ private String getTableRow(final String label, final UUID serviceId, - final int shardId, final int fanIO) { + final int bopId, final int shardId, final int fanIO, + final BOpStats stats) { final StringBuilder sb = new StringBuilder(); @@ -503,6 +525,8 @@ sb.append('\t'); sb.append(label); sb.append('\t'); + sb.append(Integer.toString(bopId)); + sb.append('\t'); sb.append(Integer.toString(shardId)); sb.append('\t'); sb.append(Integer.toString(fanIO)); @@ -534,6 +558,15 @@ sb.append('\t'); sb.append(serviceId == null ? "N/A" : serviceId.toString()); + sb.append('\t'); + sb.append(query.bopIndex.get(bopId)); + + if (stats != null) { + // @todo use a multi-column version of stats. + sb.append('\t'); + sb.append(stats.toString()); + } + sb.append('\n'); return sb.toString(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 19:52:00 UTC (rev 3558) @@ -776,7 +776,7 @@ if (altSink != null && altSink != queryBuffer && !altSink.isEmpty()) { /* - * Handle alt sink output , sending appropriate chunk + * Handle alt sink output, sending appropriate chunk * message(s). * * Note: This maps output over shards/nodes in s/o. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 19:52:00 UTC (rev 3558) @@ -48,4 +48,10 @@ this.nchunks = nchunks; } + public String toString() { + return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId + + ",partitionId=" + partitionId + ",serviceId=" + serviceId + + ",nchunks=" + nchunks + "}"; + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-15 19:52:00 UTC (rev 3558) @@ -211,8 +211,8 @@ log4j.appender.dest2.layout.ConversionPattern=%-5p: %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n ## -# Rule execution log. This is a formatted log file (comma delimited). -log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog +# BOp run state trace (tab delimited file). Uncomment the next line to enable. +#log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender log4j.appender.queryRunStateLog.Threshold=ALL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 15:54:52 UTC (rev 3557) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 19:52:00 UTC (rev 3558) @@ -38,6 +38,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import junit.framework.TestCase2; @@ -639,6 +640,12 @@ protected int doStressTest(final long timeout, final int ntrials, final int poolSize) throws Exception { + // start time in nanos. + final long begin = System.nanoTime(); + + // timeout in nanos. + final long nanos = TimeUnit.MILLISECONDS.toNanos(timeout); + final Executor service = new LatchedExecutor(jnl.getExecutorService(), poolSize); @@ -646,9 +653,12 @@ for (int i = 0; i < ntrials; i++) { + final int trial = i; final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() { public void run() { try { + if (log.isInfoEnabled()) + log.info("trial=" + trial); test_query_join2(); } catch (Exception e) { // wrap exception. @@ -662,26 +672,30 @@ service.execute(ft); } - - Thread.sleep(timeout); - + int nerror = 0; int ncancel = 0; + int ntimeout = 0; int nsuccess = 0; for (FutureTask<Void> ft : futures) { - ft.cancel(true/* mayInterruptIfRunning */); + // remaining nanoseconds. + final long remaining = nanos - (System.nanoTime() - begin); + if (remaining <= 0) + ft.cancel(true/* mayInterruptIfRunning */); try { - ft.get(); + ft.get(remaining, TimeUnit.NANOSECONDS); nsuccess++; } catch (CancellationException ex) { ncancel++; + } catch (TimeoutException ex) { + ntimeout++; } catch (ExecutionException ex) { nerror++; } } final String msg = "nerror=" + nerror + ", ncancel=" + ncancel - + ", nsuccess=" + nsuccess; + + ", ntimeout=" + ntimeout + ", nsuccess=" + nsuccess; if(log.isInfoEnabled()) log.info(msg); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |