|
From: <tho...@us...> - 2010-09-15 19:52:07
|
Revision: 3558
http://bigdata.svn.sourceforge.net/bigdata/?rev=3558&view=rev
Author: thompsonbry
Date: 2010-09-15 19:52:00 +0000 (Wed, 15 Sep 2010)
Log Message:
-----------
Added stress tests and tracked down the query termination problem. In fact, it was an assertion on totalAvailableChunkCount in RunState. The total #of available chunks can transiently become negative during query evaluation due to the interleaving of operators for a given query. The assertion has been removed (conditionally disabled).
Modified Paths:
--------------
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java
branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java
branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties
branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 19:52:00 UTC (rev 3558)
@@ -72,7 +72,7 @@
* operator.
*/
final public int altSinkChunksOut;
-
+
/**
* The statistics for the execution of the bop against the partition on the
* service.
@@ -119,4 +119,21 @@
this.taskStats = taskStats;
}
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(getClass().getName());
+ sb.append("{queryId=" + queryId);
+ sb.append(",bopId=" + bopId);
+ sb.append(",partitionId=" + partitionId);
+ sb.append(",serviceId=" + serviceId);
+ if (cause != null)
+ sb.append(",cause=" + cause);
+ sb.append(",sinkId=" + sinkId);
+ sb.append(",sinkChunksOut=" + sinkChunksOut);
+ sb.append(",altSinkId=" + altSinkId);
+ sb.append(",altSinkChunksOut=" + altSinkChunksOut);
+ sb.append(",stats=" + taskStats);
+ sb.append("}");
+ return sb.toString();
+ }
+
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 19:52:00 UTC (rev 3558)
@@ -42,7 +42,9 @@
import com.bigdata.bop.BOp;
/**
- * The run state for a {@link RunningQuery}.
+ * The run state for a {@link RunningQuery}. This class is NOT thread-safe.
+ * {@link RunningQuery} uses an internal lock to serialize requests against the
+ * public methods of this class.
*
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
* @version $Id$
@@ -61,6 +63,16 @@
}
/**
+ * Note: Due to concurrency, it is possible for an {@link IChunkMessage} to
+ * be accepted and the corresponding chunk task started, before a
+ * {@link RunState#startOp(StartOpMessage)} transition has been fully
+ * processed. This means that the {@link RunState#totalAvailableChunkCount}
+ * can become transiently negative. This flag disables asserts which would
+ * otherwise fail on legal transient negatives.
+ */
+ static private boolean availableChunkCountMayBeNegative = true;
+
+ /**
* The query.
*/
private final RunningQuery query;
@@ -76,19 +88,13 @@
private long nsteps = 0;
/**
- * The #of tasks for this query which have started but not yet halted and
- * ZERO (0) if this is not the query coordinator.
- * <p>
- * This is guarded by the {@link #runningStateLock}.
+ * The #of tasks for this query which have started but not yet halted.
*/
private long totalRunningTaskCount = 0;
/**
* The #of chunks for this query of which a running task has made available
- * but which have not yet been accepted for processing by another task and
- * ZERO (0) if this is not the query coordinator.
- * <p>
- * This is guarded by the {@link #runningStateLock}.
+ * but which have not yet been accepted for processing by another task.
*/
private long totalAvailableChunkCount = 0;
@@ -100,27 +106,22 @@
* <p>
* The movement of the intermediate binding set chunks forms an acyclic
* directed graph. This map is used to track the #of chunks available for
- * each bop in the pipeline. When a bop has no more incoming chunks, we send
- * an asynchronous message to all nodes on which that bop had executed
- * informing the {@link QueryEngine} on that node that it should immediately
- * release all resources associated with that bop.
- * <p>
- * This is guarded by the {@link #runningStateLock}.
+ * each {@link BOp} in the pipeline. When a {@link BOp} has no more incoming
+ * chunks, we send an asynchronous message to all nodes on which that
+ * {@link BOp} had executed informing the {@link QueryEngine} on that node
+ * that it should immediately release all resources associated with that
+ * {@link BOp}.
*/
private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>();
/**
* A collection reporting on the #of instances of a given {@link BOp} which
* are concurrently executing.
- * <p>
- * This is guarded by the {@link #runningStateLock}.
*/
private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningTaskCountMap = new LinkedHashMap<Integer, AtomicLong>();
/**
* A collection of the operators which have executed at least once.
- * <p>
- * This is guarded by the {@link #runningStateLock}.
*/
private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>();
@@ -140,6 +141,9 @@
// query.lifeCycleSetUpQuery();
+ if (log.isInfoEnabled())
+ log.info(msg.toString());
+
final Integer bopId = Integer.valueOf(msg.getBOpId());
totalAvailableChunkCount++;
@@ -161,11 +165,6 @@
}
- if (log.isInfoEnabled())
- log.info("queryId=" + queryId + ",totalRunningTaskCount="
- + totalRunningTaskCount + ",totalAvailableChunkCount="
- + totalAvailableChunkCount);
-
if (TableLog.tableLog.isInfoEnabled()) {
/*
* Note: RunState is only used by the query controller so this will
@@ -180,13 +179,14 @@
TableLog.tableLog.info("\n\nqueryId=" + queryId + "\n");
// TableLog.tableLog.info(query.getQuery().toString()+"\n");
TableLog.tableLog.info(getTableHeader());
- TableLog.tableLog.info(getTableRow("startQ", serviceId,
- -1/* shardId */, 1/* fanIn */));
+ TableLog.tableLog
+ .info(getTableRow("startQ", serviceId, msg.getBOpId(),
+ -1/* shardId */, 1/* fanIn */, null/* stats */));
}
- System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId
- + ",totalRunningTaskCount=" + totalRunningTaskCount
- + ",totalAvailableTaskCount=" + totalAvailableChunkCount);
+// System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId
+// + ",totalRunningTaskCount=" + totalRunningTaskCount
+// + ",totalAvailableTaskCount=" + totalAvailableChunkCount);
}
@@ -206,7 +206,9 @@
totalRunningTaskCount++;
assert totalRunningTaskCount >= 1 : "runningTaskCount="
- + totalRunningTaskCount + " :: msg=" + msg;
+ + totalRunningTaskCount + " :: runState=" + this + ", msg="
+ + msg;
+
final boolean firstTime;
{
@@ -218,7 +220,7 @@
final long tmp = n.incrementAndGet();
assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId="
- + msg.bopId + " :: msg=" + msg;
+ + msg.bopId + " :: runState=" + this + ", msg=" + msg;
firstTime = startedSet.add(bopId);
//
@@ -231,35 +233,38 @@
totalAvailableChunkCount -= msg.nchunks;
- assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount="
- + totalAvailableChunkCount + " :: msg=" + msg;
+ assert availableChunkCountMayBeNegative || totalAvailableChunkCount >= 0 : "totalAvailableChunkCount="
+ + totalAvailableChunkCount + " :: runState=" + this + ", msg="
+ + msg;
{
AtomicLong n = availableChunkCountMap.get(bopId);
if (n == null)
- throw new AssertionError();
+ availableChunkCountMap.put(bopId, n = new AtomicLong());
final long tmp = n.addAndGet(-msg.nchunks);
- assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
- + msg.bopId + " :: msg=" + msg;
+ assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
+ + msg.bopId + " :: runState=" + this + ", msg=" + msg;
}
- System.err.println("startOp: nstep="+nsteps+", bopId=" + bopId
- + ",totalRunningTaskCount=" + totalRunningTaskCount
- + ",totalAvailableChunkCount=" + totalAvailableChunkCount
- + ",fanIn=" + msg.nchunks);
+// System.err.println("startOp: nstep=" + nsteps + ", bopId=" + bopId
+// + ",totalRunningTaskCount=" + totalRunningTaskCount
+// + ",totalAvailableChunkCount=" + totalAvailableChunkCount
+// + ",fanIn=" + msg.nchunks);
if (TableLog.tableLog.isInfoEnabled()) {
- TableLog.tableLog.info(getTableRow("startOp", msg.serviceId,
- msg.partitionId, msg.nchunks/* fanIn */));
+ TableLog.tableLog
+ .info(getTableRow("startOp", msg.serviceId, msg.bopId,
+ msg.partitionId, msg.nchunks/* fanIn */, null/* stats */));
}
// check deadline.
final long deadline = query.getDeadline();
+
if (deadline < System.currentTimeMillis()) {
if (log.isTraceEnabled())
@@ -271,6 +276,7 @@
query.cancel(true/* mayInterruptIfRunning */);
}
+
return firstTime;
}
@@ -291,8 +297,9 @@
totalAvailableChunkCount += fanOut;
- assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount="
- + totalAvailableChunkCount + " :: msg=" + msg;
+ assert availableChunkCountMayBeNegative || totalAvailableChunkCount >= 0 : "totalAvailableChunkCount="
+ + totalAvailableChunkCount + " :: runState=" + this
+ + ", msg=" + msg;
if (msg.sinkId != null) {
AtomicLong n = availableChunkCountMap.get(msg.sinkId);
@@ -302,8 +309,8 @@
final long tmp = n.addAndGet(msg.sinkChunksOut);
- assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
- + msg.sinkId + " :: msg=" + msg;
+ assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
+ + msg.sinkId + " :: runState=" + this + ", msg=" + msg;
}
@@ -317,8 +324,9 @@
final long tmp = n.addAndGet(msg.altSinkChunksOut);
- assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
- + msg.altSinkId + " :: msg=" + msg;
+ assert availableChunkCountMayBeNegative || tmp >= 0 : "availableChunkCount=" + tmp + " for bopId="
+ + msg.altSinkId + " :: runState=" + this + ", msg="
+ + msg;
}
@@ -328,7 +336,8 @@
totalRunningTaskCount--;
assert totalRunningTaskCount >= 0 : "runningTaskCount="
- + totalRunningTaskCount + " :: msg=" + msg;
+ + totalRunningTaskCount + " :: runState=" + this + ", msg="
+ + msg;
{
@@ -340,32 +349,35 @@
final long tmp = n.decrementAndGet();
assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId="
- + msg.bopId + " :: msg=" + msg;
+ + msg.bopId + " :: runState=" + this + ", msg=" + msg;
}
// Figure out if this operator is done.
final boolean isDone = isOperatorDone(msg.bopId);
- System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId
- + ",totalRunningTaskCount=" + totalRunningTaskCount
- + ",totalAvailableTaskCount=" + totalAvailableChunkCount
- + ",fanOut=" + fanOut);
+// System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId
+// + ",totalRunningTaskCount=" + totalRunningTaskCount
+// + ",totalAvailableTaskCount=" + totalAvailableChunkCount
+// + ",fanOut=" + fanOut);
if (TableLog.tableLog.isInfoEnabled()) {
TableLog.tableLog.info(getTableRow("haltOp", msg.serviceId,
- msg.partitionId, fanOut));
+ msg.bopId, msg.partitionId, fanOut, msg.taskStats));
}
- if (log.isTraceEnabled())
- log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId
- + ",serviceId=" + query.getQueryEngine().getServiceUUID()
- + ", nchunks=" + fanOut + " : totalRunningTaskCount="
- + totalRunningTaskCount + ", totalAvailableChunkCount="
- + totalAvailableChunkCount);
+// if (log.isTraceEnabled())
+// log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId
+// + ",serviceId=" + query.getQueryEngine().getServiceUUID()
+// + ", nchunks=" + fanOut + " : totalRunningTaskCount="
+// + totalRunningTaskCount + ", totalAvailableChunkCount="
+// + totalAvailableChunkCount);
- // test termination criteria
+ /*
+ * Test termination criteria
+ */
final long deadline = query.getDeadline();
+
if (msg.cause != null) {
// operator failed on this chunk.
@@ -458,7 +470,7 @@
Arrays.sort(bopIds);
// header 2.
- sb.append("step\tlabel\tshardId\tfanIO\tavail\trun");
+ sb.append("step\tlabel\tbopId\tshardId\tfanIO\tavail\trun");
for (int i = 0; i < bopIds.length; i++) {
@@ -470,6 +482,10 @@
sb.append("\tserviceId");
+ sb.append("\tbop");
+
+ sb.append("\tstats");
+
sb.append('\n');
return sb.toString();
@@ -485,17 +501,23 @@
* @param label
* The state change level (startQ, startOp, haltOp).
* @param serviceId
- * The node on which the operator is/was executed.
+ * The node on which the operator will be / was executed.
+ * @param bopId
+ * The identifier for the bop which will be / was executed.
* @param shardId
* The index partition against which the operator was running and
* <code>-1</code> if the operator was not evaluated against a
* specific index partition.
- * @param
* @param fanIO
* The fanIn (startQ,startOp) or fanOut (haltOp).
+ * @param stats
+ * The statistics from the operator evaluation and
+ * <code>null</code> unless {@link #haltOp(HaltOpMessage)} is
+ * the invoker.
*/
private String getTableRow(final String label, final UUID serviceId,
- final int shardId, final int fanIO) {
+ final int bopId, final int shardId, final int fanIO,
+ final BOpStats stats) {
final StringBuilder sb = new StringBuilder();
@@ -503,6 +525,8 @@
sb.append('\t');
sb.append(label);
sb.append('\t');
+ sb.append(Integer.toString(bopId));
+ sb.append('\t');
sb.append(Integer.toString(shardId));
sb.append('\t');
sb.append(Integer.toString(fanIO));
@@ -534,6 +558,15 @@
sb.append('\t');
sb.append(serviceId == null ? "N/A" : serviceId.toString());
+ sb.append('\t');
+ sb.append(query.bopIndex.get(bopId));
+
+ if (stats != null) {
+ // @todo use a multi-column version of stats.
+ sb.append('\t');
+ sb.append(stats.toString());
+ }
+
sb.append('\n');
return sb.toString();
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 19:52:00 UTC (rev 3558)
@@ -776,7 +776,7 @@
if (altSink != null && altSink != queryBuffer
&& !altSink.isEmpty()) {
/*
- * Handle alt sink output , sending appropriate chunk
+ * Handle alt sink output, sending appropriate chunk
* message(s).
*
* Note: This maps output over shards/nodes in s/o.
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 19:52:00 UTC (rev 3558)
@@ -48,4 +48,10 @@
this.nchunks = nchunks;
}
+ public String toString() {
+ return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId
+ + ",partitionId=" + partitionId + ",serviceId=" + serviceId
+ + ",nchunks=" + nchunks + "}";
+ }
+
}
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-15 19:52:00 UTC (rev 3558)
@@ -211,8 +211,8 @@
log4j.appender.dest2.layout.ConversionPattern=%-5p: %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n
##
-# Rule execution log. This is a formatted log file (comma delimited).
-log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog
+# BOp run state trace (tab delimited file). Uncomment the next line to enable.
+#log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog
log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false
log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender
log4j.appender.queryRunStateLog.Threshold=ALL
Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java
===================================================================
--- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 15:54:52 UTC (rev 3557)
+++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 19:52:00 UTC (rev 3558)
@@ -38,6 +38,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import junit.framework.TestCase2;
@@ -639,6 +640,12 @@
protected int doStressTest(final long timeout, final int ntrials,
final int poolSize) throws Exception {
+ // start time in nanos.
+ final long begin = System.nanoTime();
+
+ // timeout in nanos.
+ final long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
+
final Executor service = new LatchedExecutor(jnl.getExecutorService(),
poolSize);
@@ -646,9 +653,12 @@
for (int i = 0; i < ntrials; i++) {
+ final int trial = i;
final FutureTask<Void> ft = new FutureTask<Void>(new Runnable() {
public void run() {
try {
+ if (log.isInfoEnabled())
+ log.info("trial=" + trial);
test_query_join2();
} catch (Exception e) {
// wrap exception.
@@ -662,26 +672,30 @@
service.execute(ft);
}
-
- Thread.sleep(timeout);
-
+
int nerror = 0;
int ncancel = 0;
+ int ntimeout = 0;
int nsuccess = 0;
for (FutureTask<Void> ft : futures) {
- ft.cancel(true/* mayInterruptIfRunning */);
+ // remaining nanoseconds.
+ final long remaining = nanos - (System.nanoTime() - begin);
+ if (remaining <= 0)
+ ft.cancel(true/* mayInterruptIfRunning */);
try {
- ft.get();
+ ft.get(remaining, TimeUnit.NANOSECONDS);
nsuccess++;
} catch (CancellationException ex) {
ncancel++;
+ } catch (TimeoutException ex) {
+ ntimeout++;
} catch (ExecutionException ex) {
nerror++;
}
}
final String msg = "nerror=" + nerror + ", ncancel=" + ncancel
- + ", nsuccess=" + nsuccess;
+ + ", ntimeout=" + ntimeout + ", nsuccess=" + nsuccess;
if(log.isInfoEnabled())
log.info(msg);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|