From: <sgo...@us...> - 2010-09-15 20:45:22
|
Revision: 3560 http://bigdata.svn.sourceforge.net/bigdata/?rev=3560&view=rev Author: sgossard Date: 2010-09-15 20:45:14 +0000 (Wed, 15 Sep 2010) Log Message: ----------- [maven_scaleout] : Breaking all direct dependency cycles with package 'com.bigdata.util.concurrent', but still appears to have transitive cycles via package 'com.bigdata.counters'. Modified Paths: -------------- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/counters/ActiveProcess.java branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/ConcurrencyManager.java branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/AbstractHaltableProcess.java branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/ThreadPoolExecutorStatisticsTask.java Added Paths: ----------- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/WriteExecutorServiceStatisticsTask.java Modified: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/counters/ActiveProcess.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/counters/ActiveProcess.java 2010-09-15 20:04:06 UTC (rev 3559) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/counters/ActiveProcess.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -34,10 +34,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; -import com.bigdata.util.concurrent.DaemonThreadFactory; /** * Command manages the execution and termination of a native process and an @@ -55,9 +56,19 @@ * by the {@link #process}. */ protected final ExecutorService readService = Executors - .newSingleThreadExecutor(new DaemonThreadFactory(getClass() - .getName() - + ".readService")); + .newSingleThreadExecutor( + //Don't use com.bigdata.util.concurrent.DaemonThreadFactory here, trying to break a package loop. -gossard + new ThreadFactory() { + String prefix = getClass().getName()+ ".readService"; + AtomicInteger count = new AtomicInteger(0); + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName( prefix + count.incrementAndGet() ); + t.setDaemon(true); + return t; + } + } + ); protected Process process = null; Modified: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/ConcurrencyManager.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/ConcurrencyManager.java 2010-09-15 20:04:06 UTC (rev 3559) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/ConcurrencyManager.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -879,7 +879,7 @@ final long delay = 1000; // delay in ms. final TimeUnit unit = TimeUnit.MILLISECONDS; - writeServiceQueueStatisticsTask = new ThreadPoolExecutorStatisticsTask("writeService", + writeServiceQueueStatisticsTask = new WriteExecutorServiceStatisticsTask("writeService", writeService, countersUN, w); txWriteServiceQueueStatisticsTask = new ThreadPoolExecutorStatisticsTask("txWriteService", @@ -934,7 +934,7 @@ * Sampling instruments for the various queues giving us the moving average * of the queue length. */ - private final ThreadPoolExecutorStatisticsTask writeServiceQueueStatisticsTask; + private final WriteExecutorServiceStatisticsTask writeServiceQueueStatisticsTask; private final ThreadPoolExecutorStatisticsTask txWriteServiceQueueStatisticsTask; private final ThreadPoolExecutorStatisticsTask readServiceQueueStatisticsTask; @@ -1066,7 +1066,7 @@ * (exceptions may be thrown if the task fails or the commit fails). The * purpose of group commits is to provide higher throughput for writes on * the store by only syncing the data to disk periodically rather than after - * every write. Group commits are scheduled by the {@link #commitService}. + * every write. Group commits are scheduled by the commitService. * The trigger conditions for group commits may be configured using * {@link ConcurrencyManager.Options}. If you are using the store in a * single threaded context then you may set Added: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/WriteExecutorServiceStatisticsTask.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/WriteExecutorServiceStatisticsTask.java (rev 0) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/WriteExecutorServiceStatisticsTask.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -0,0 +1,326 @@ +/* + * User: gossard + * Date: Sep 15, 2010 + * Time: 1:44:48 PM + */ +package com.bigdata.journal; + +import com.bigdata.counters.CounterSet; +import com.bigdata.counters.Instrument; +import com.bigdata.util.concurrent.IQueueCounters; +import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask; +import com.bigdata.util.concurrent.WriteTaskCounters; + +/** + * Extension of {@link com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask} that will collect additional + * information about {@link WriteExecutorService} pools. + * + */ +public class WriteExecutorServiceStatisticsTask extends ThreadPoolExecutorStatisticsTask<WriteExecutorService,WriteTaskCounters> { + //TODO: The chaining on the calculateXXX methods is probably too fragile. In order to guarantee the exact same behavior, + //TODO: I chained the calls off methods that executed directly before them. If there is no ordering dependency, the calculations + //TODO: could be done after the parents calculateAll(). -gossard + + protected double averageActiveCountWithLocksHeld = 0d; + /** time waiting for resource locks. */ + protected double averageLockWaitingTime = 0d; + protected double averageCommitWaitingTime = 0d; + protected double averageCommitServiceTime = 0d; + protected double averageCommitGroupSize = 0d; + protected double averageByteCountPerCommit = 0d; + protected long lockWaitingTime = 0L; + protected long commitWaitingTime = 0L; + protected long commitServiceTime = 0L; + protected double averageReadyCount; + + + public WriteExecutorServiceStatisticsTask(String serviceName, WriteExecutorService service) { + super(serviceName, service); + } + + public WriteExecutorServiceStatisticsTask(String serviceName, WriteExecutorService service, WriteTaskCounters taskCounters) { + super(serviceName, service, taskCounters); + } + + public WriteExecutorServiceStatisticsTask(String serviceName, WriteExecutorService service, WriteTaskCounters taskCounters, double w) { + super(serviceName, service, taskCounters, w); + } + + + @Override + protected void calculateBasicQueueInfo() { + super.calculateBasicQueueInfo(); + calculateActiveCountWithLocks(); + } + + @Override + protected void calculateAverageQueueWait() { + super.calculateAverageQueueWait(); + calculateAverageLockWaitTime(); + } + + @Override + protected void calculateAverageCheckpointTime() { + super.calculateAverageCheckpointTime(); + calculateWriteSpecificValues(); + } + + protected void calculateWriteSpecificValues() { + + final WriteExecutorService tmp = service; + + final WriteTaskCounters finalWriteTaskCounters = taskCounters; + + final long groupCommitCount = tmp.getGroupCommitCount(); + + if (groupCommitCount > 0) { + + // Time waiting for the commit. + { + + final long newValue = finalWriteTaskCounters.commitWaitingNanoTime + .get(); + + final long delta = newValue - commitWaitingTime; + + assert delta >= 0 : "" + delta; + + commitWaitingTime = newValue; + + averageCommitWaitingTime = getMovingAverage( + averageCommitWaitingTime, + (delta * scalingFactor / groupCommitCount), + w); + + } + + // Time servicing the commit. + { + + final long newValue = finalWriteTaskCounters.commitServiceNanoTime + .get(); + + final long delta = newValue - commitServiceTime; + + assert delta >= 0 : "" + delta; + + commitServiceTime = newValue; + + averageCommitServiceTime = getMovingAverage( + averageCommitServiceTime, + (delta * scalingFactor / groupCommitCount), + w); + + } + + } + + // moving average of the size nready. + averageReadyCount = getMovingAverage( + averageReadyCount, tmp.getReadyCount(), w); + + // moving average of the size of the commit groups. + averageCommitGroupSize = getMovingAverage( + averageCommitGroupSize, tmp.getCommitGroupSize(), w); + + // moving average of the #of bytes written since the + // previous commit. + averageByteCountPerCommit = getMovingAverage( + averageByteCountPerCommit, tmp + .getByteCountPerCommit(), w); + + } + + protected void calculateAverageLockWaitTime() { + /* + * Time waiting on resource lock(s). + */ + + final long newValue = taskCounters.lockWaitingNanoTime.get(); + + final long delta = newValue - lockWaitingTime; + + assert delta >= 0 : "" + delta; + + lockWaitingTime = newValue; + + averageLockWaitingTime = getMovingAverage( + averageLockWaitingTime, + (delta * scalingFactor / taskCounters.taskCompleteCount.get()), + w); + + } + + protected void calculateActiveCountWithLocks() { + /* + * Note: For the WriteExecutorService we compute a variant of + * [activeCount] the which only counts tasks that are currently + * holding their exclusive resource lock(s). This is the real + * concurrency of the write service since tasks without locks + * are waiting on other tasks so that they can obtain their + * lock(s) and "run". + */ + + final int activeCountWithLocksHeld = service.getActiveTaskCountWithLocksHeld(); + + averageActiveCountWithLocksHeld = getMovingAverage( + averageActiveCountWithLocksHeld, activeCountWithLocksHeld, w); + + } + + + + + @Override + protected void fillCounterSet(CounterSet counterSet) { + super.fillCounterSet(counterSet);//fills out all the basic executor queue info. + + final WriteExecutorService writeService = service; + + /* + * Simple counters. + */ + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.CommitCount, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getGroupCommitCount()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.AbortCount, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getAbortCount()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.OverflowCount, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getOverflowCount()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.RejectedExecutionCount, + new Instrument<Long>() { + public void sample() { + setValue(writeService + .getRejectedExecutionCount()); + } + }); + + /* + * Maximum observed values. + */ + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.MaxCommitWaitingTime, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getMaxCommitWaitingTime()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.MaxCommitServiceTime, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getMaxCommitServiceTime()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.MaxCommitGroupSize, + new Instrument<Long>() { + public void sample() { + setValue((long) writeService + .getMaxCommitGroupSize()); + } + }); + + counterSet.addCounter(IQueueCounters.IWriteServiceExecutorCounters.MaxRunning, + new Instrument<Long>() { + public void sample() { + setValue(writeService.getMaxRunning()); + } + }); + + /* + * Moving averages available only for the write executor + * service. + */ + + counterSet + .addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageActiveCountWithLocksHeld, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageActiveCountWithLocksHeld); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageReadyCount, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageReadyCount); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageCommitGroupSize, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageCommitGroupSize); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageLockWaitingTime, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageLockWaitingTime); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageCheckpointTime, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageCheckpointTime); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageCommitWaitingTime, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageCommitWaitingTime); + } + }); + + counterSet.addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageCommitServiceTime, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageCommitServiceTime); + } + }); + + counterSet + .addCounter( + IQueueCounters.IWriteServiceExecutorCounters.AverageByteCountPerCommit, + new Instrument<Double>() { + @Override + protected void sample() { + setValue(averageByteCountPerCommit); + } + }); + + } +} Property changes on: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/journal/WriteExecutorServiceStatisticsTask.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2010-09-15 20:04:06 UTC (rev 3559) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -40,6 +40,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.bigdata.util.InnerCause; import org.apache.log4j.Logger; import com.bigdata.btree.keys.KVO; @@ -1222,6 +1223,13 @@ } + @Override + protected <T extends Throwable> void logInnerCause(T cause) { + //avoid logging warnings on BufferClosedException. Not done in AbstractHaltableProcess to avoid introducing a package cycle. -gossard + if (!InnerCause.isInnerCause(cause, BufferClosedException.class)) + super.logInnerCause(cause); + } + /** * This timeout is used to log warning messages when a sink is slow. */ Modified: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/AbstractHaltableProcess.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/AbstractHaltableProcess.java 2010-09-15 20:04:06 UTC (rev 3559) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/AbstractHaltableProcess.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -36,7 +36,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; -import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.util.InnerCause; /** @@ -101,36 +100,14 @@ halt = true; - final boolean isFirstCause = firstCause.compareAndSet( - null/* expect */, cause); + firstCause.compareAndSet( null/* expect */, cause); if (log.isEnabledFor(Level.WARN)) try { - if (!InnerCause.isInnerCause(cause, InterruptedException.class) - && !InnerCause.isInnerCause(cause, - CancellationException.class) - && !InnerCause.isInnerCause(cause, - ClosedByInterruptException.class) - && !InnerCause.isInnerCause(cause, - RejectedExecutionException.class) - && !InnerCause.isInnerCause(cause, - BufferClosedException.class)) { + logInnerCause(cause); - /* - * This logs all unexpected causes, not just the first one - * to be reported for this join task. - * - * Note: The master will log the firstCause that it receives - * as an error. - */ - - log.warn(this + " : isFirstCause=" + isFirstCause + " : " - + cause, cause); - - } - } catch (Throwable ex) { // error in logging system - ignore. @@ -141,4 +118,27 @@ } + protected <T extends Throwable> void logInnerCause(T cause) { + if (!InnerCause.isInnerCause(cause, InterruptedException.class) + && !InnerCause.isInnerCause(cause, + CancellationException.class) + && !InnerCause.isInnerCause(cause, + ClosedByInterruptException.class) + && !InnerCause.isInnerCause(cause, + RejectedExecutionException.class) ) + { + /* + * This logs all unexpected causes, not just the first one + * to be reported for this join task. + * + * Note: The master will log the firstCause that it receives + * as an error. + */ + + log.warn(this + " : isFirstCause=" + (firstCause.get() == cause) + " : " + + cause, cause); + + } + } + } Modified: branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/ThreadPoolExecutorStatisticsTask.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/ThreadPoolExecutorStatisticsTask.java 2010-09-15 20:04:06 UTC (rev 3559) +++ branches/maven_scaleout/bigdata-core/src/main/java/com/bigdata/util/concurrent/ThreadPoolExecutorStatisticsTask.java 2010-09-15 20:45:14 UTC (rev 3560) @@ -20,25 +20,24 @@ * including the moving average of its queue length, queuing times, etc. * * @todo refactor to layer {@link QueueSizeMovingAverageTask} then - * {@link ThreadPoolExecutorBaseStatisticsTask}, then this class, then a - * derived class for the {@link WriteServiceExecutor}. + * {@link ThreadPoolExecutorBaseStatisticsTask}, then this class, then sub-classes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public class ThreadPoolExecutorStatisticsTask implements Runnable { +public class ThreadPoolExecutorStatisticsTask<EXEC extends ThreadPoolExecutor,COUNTERS extends TaskCounters> implements Runnable { protected static final Logger log = Logger.getLogger(ThreadPoolExecutorStatisticsTask.class); /** * The label for the executor service (used in log messages). */ - private final String serviceName; + protected final String serviceName; /** * The executor service that is being monitored. */ - private final ThreadPoolExecutor service; + protected final EXEC service; // /** // * The time when we started to collect data about the {@link #service} (set by the ctor). @@ -48,26 +47,25 @@ /** * The weight used to compute the moving average. */ - private final double w; + protected final double w; /** * #of samples taken so far. */ - private long nsamples = 0; + protected long nsamples = 0; /* * There are several different moving averages which are computed. */ // private double averageQueueSize = 0d; - private double averageActiveCount = 0d; - private double averageQueueLength = 0d; - private double averageActiveCountWithLocksHeld = 0d; + protected double averageActiveCount = 0d; + protected double averageQueueLength = 0d; /** * Data collected about {@link AbstractTask}s run on a service (optional). */ - private final TaskCounters taskCounters; + protected final COUNTERS taskCounters; /* * These are moving averages based on the optional TaskCounters. @@ -80,40 +78,29 @@ */ /** time waiting on the queue until the task begins to execute. */ - private double averageQueueWaitingTime = 0d; - /** time waiting for resource locks. */ - private double averageLockWaitingTime = 0d; + protected double averageQueueWaitingTime = 0d; + /** time doing work (does not include time to acquire resources locks or commit time). */ - private double averageServiceTime = 0d; + protected double averageServiceTime = 0d; /** time checkpointing indices (included in the {@link #averageServiceTime}). */ - private double averageCheckpointTime = 0d; + protected double averageCheckpointTime = 0d; + /** total time from submit to completion. */ - private double averageQueuingTime = 0d; + protected double averageQueuingTime = 0d; - private double averageCommitWaitingTime = 0d; - private double averageCommitServiceTime = 0d; - private double averageCommitGroupSize = 0d; - private double averageByteCountPerCommit = 0d; - /* * private variables used to compute the delta in various counters since * they were last sampled. */ - private long queueWaitingTime = 0L; - private long lockWaitingTime = 0L; - private long serviceTime = 0L; - private long checkpointTime = 0L; // Note: checkpointTime is included in the serviceTime. - private long queuingTime = 0L; + protected long queueWaitingTime = 0L; + protected long serviceTime = 0L; + protected long checkpointTime = 0L; // Note: checkpointTime is included in the serviceTime. + protected long queuingTime = 0L; - private long commitWaitingTime = 0L; - private long commitServiceTime = 0L; - - private double averageReadyCount; - /** * Scaling factor converts nanoseconds to milliseconds. */ - static final double scalingFactor = 1d / TimeUnit.NANOSECONDS.convert(1, + static final protected double scalingFactor = 1d / TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); /** @@ -150,7 +137,7 @@ * @param service * The service to be monitored. */ - public ThreadPoolExecutorStatisticsTask(String serviceName, ThreadPoolExecutor service) { + public ThreadPoolExecutorStatisticsTask(String serviceName, EXEC service) { this(serviceName, service, null/* taskCounters */, DEFAULT_WEIGHT); @@ -168,8 +155,8 @@ * The per-task counters used to compute the latency data for * tasks run on that service. */ - public ThreadPoolExecutorStatisticsTask(String serviceName, ThreadPoolExecutor service, - TaskCounters taskCounters) { + public ThreadPoolExecutorStatisticsTask(String serviceName, EXEC service, + COUNTERS taskCounters) { this(serviceName, service, taskCounters, DEFAULT_WEIGHT); @@ -189,8 +176,8 @@ * The weight to be used by * {@link #getMovingAverage(double, double, double)} */ - public ThreadPoolExecutorStatisticsTask(String serviceName, ThreadPoolExecutor service, - TaskCounters taskCounters, double w) { + public ThreadPoolExecutorStatisticsTask(String serviceName, EXEC service, + COUNTERS taskCounters, double w) { if (serviceName == null) throw new IllegalArgumentException(); @@ -240,7 +227,7 @@ /** * The moving average of the queue size. */ - private final MovingAverageTask queueSizeTask = new MovingAverageTask( + protected final MovingAverageTask queueSizeTask = new MovingAverageTask( "queueSize", new Callable<Integer>() { public Integer call() { return service.getQueue().size(); @@ -252,7 +239,7 @@ * * @see TaskCounters#interArrivalNanoTime */ - private DeltaMovingAverageTask interArrivalNanoTimeTask = new DeltaMovingAverageTask( + protected DeltaMovingAverageTask interArrivalNanoTimeTask = new DeltaMovingAverageTask( "interArrivalTime", new Callable<Long>() { public Long call() { return taskCounters.interArrivalNanoTime.get(); @@ -264,7 +251,7 @@ * * @see TaskCounters#serviceNanoTime */ - private DeltaMovingAverageTask serviceNanoTimeTask = new DeltaMovingAverageTask( + protected DeltaMovingAverageTask serviceNanoTimeTask = new DeltaMovingAverageTask( "serviceNanoTime", new Callable<Long>() { public Long call() { return taskCounters.serviceNanoTime.get(); @@ -282,281 +269,213 @@ try { - { + calculateAll(); + + nsamples++; - queueSizeTask.run(); - - // queueSize := #of tasks in the queue. - final int queueSize = service.getQueue().size(); + } catch (Exception ex) { - // activeCount := #of tasks assigned a worker thread - final int activeCount = service.getActiveCount(); + log.warn(serviceName, ex); -//// This is just the tasks that are currently waiting to run (not -//// assigned to any thread). -// averageQueueSize = getMovingAverage(averageQueueSize, -// queueSize, w); + } + + } - // This is just the tasks that are currently running (assigned - // to a worker thread). - averageActiveCount = getMovingAverage(averageActiveCount, - activeCount, w); + /** + * + * Calculates all averages and updates the task counters if provided. + * Currently this method calculates the basic executor queue info, and if task counters are present, + * will also calculate queue wait time and checkpoint times. All of these calculations are done in seperate methods + * so that subclasses can override them if needed. + * + */ + protected void calculateAll() { + calculateBasicQueueInfo(); + calculateTaskCountersIfPresent(); + } - /* - * Note: this is the primary average of interest - it includes - * both the tasks waiting to be run and those that are currently - * running in the definition of the "queue length". - */ - averageQueueLength = getMovingAverage(averageQueueLength, - (activeCount + queueSize), w); + /** + * Calculates average queue wait and checkpoint times if a task counter was provided. Sub-classes may override this + * method to introduce additional task counter calculations. + */ + protected void calculateTaskCountersIfPresent() { + if (taskCounters != null) { - } + /* + * Compute some latency data that relies on the task counters. + */ - if (service instanceof WriteExecutorService) { + // #of tasks that have been submitted so far. + final long taskCount = taskCounters.taskCompleteCount.get(); - /* - * Note: For the WriteExecutorService we compute a variant of - * [activeCount] the which only counts tasks that are currently - * holding their exclusive resource lock(s). This is the real - * concurrency of the write service since tasks without locks - * are waiting on other tasks so that they can obtain their - * lock(s) and "run". - */ - - final int activeCountWithLocksHeld = ((WriteExecutorService) service) - .getActiveTaskCountWithLocksHeld(); + if (taskCount > 0) { - averageActiveCountWithLocksHeld = getMovingAverage( - averageActiveCountWithLocksHeld, activeCountWithLocksHeld, w); + calculateAverageQueueWait(); + calculateAverageCheckpointTime(); + } - if (taskCounters != null) { - - /* - * Compute some latency data that relies on the task counters. - */ + } + } - // #of tasks that have been submitted so far. - final long taskCount = taskCounters.taskCompleteCount.get(); + protected void calculateAverageCheckpointTime() { + /* + * Time that the task is being serviced (after its obtained + * any locks). + */ + { - if (taskCount > 0) { + final long newValue = taskCounters.serviceNanoTime.get(); - /* - * Time waiting on the queue to begin execution. - */ - { + final long delta = newValue - serviceTime; - final long newValue = taskCounters.queueWaitingNanoTime.get(); + assert delta >= 0 : "" + delta; - final long delta = newValue - queueWaitingTime; + serviceTime = newValue; - assert delta >= 0 : "" + delta; + averageServiceTime = getMovingAverage( + averageServiceTime, + (delta * scalingFactor / taskCounters.taskCompleteCount.get()), + w); - queueWaitingTime = newValue; + } - averageQueueWaitingTime = getMovingAverage( - averageQueueWaitingTime, - (delta * scalingFactor / taskCounters.taskCompleteCount.get()), - w); + /* + * The moving average of the change in the cumulative + * inter-arrival time. + */ + interArrivalNanoTimeTask.run(); - } + /* + * The moving average of the change in the total task + * service time. + */ + serviceNanoTimeTask.run(); - /* - * Time waiting on resource lock(s). - */ - if(service instanceof WriteExecutorService) { - - final long newValue = ((WriteTaskCounters) taskCounters).lockWaitingNanoTime - .get(); + /* + * Time that the task is busy checkpoint its indices (this + * is already reported as part of the service time but which + * is broken out here as a detail). + */ + { - final long delta = newValue - lockWaitingTime; + final long newValue = taskCounters.checkpointNanoTime.get(); - assert delta >= 0 : "" + delta; + final long delta = newValue - checkpointTime; - lockWaitingTime = newValue; + assert delta >= 0 : "" + delta; - averageLockWaitingTime = getMovingAverage( - averageLockWaitingTime, - (delta * scalingFactor / taskCounters.taskCompleteCount.get()), - w); + checkpointTime = newValue; - } - - /* - * Time that the task is being serviced (after its obtained - * any locks). - */ - { + averageCheckpointTime = getMovingAverage( + averageCheckpointTime, + (delta * scalingFactor / taskCounters.taskCompleteCount.get()), + w); - final long newValue = taskCounters.serviceNanoTime.get(); + } - final long delta = newValue - serviceTime; + /* + * Queuing time (elapsed time from submit until completion). + */ + { - assert delta >= 0 : "" + delta; + final long newValue = taskCounters.queuingNanoTime.get(); - serviceTime = newValue; + final long delta = newValue - queuingTime; - averageServiceTime = getMovingAverage( - averageServiceTime, - (delta * scalingFactor / taskCounters.taskCompleteCount.get()), - w); + assert delta >= 0 : "" + delta; - } + queuingTime = newValue; - /* - * The moving average of the change in the cumulative - * inter-arrival time. - */ - interArrivalNanoTimeTask.run(); + averageQueuingTime = getMovingAverage( + averageQueuingTime, + (delta * scalingFactor / taskCounters.taskCompleteCount.get()), + w); - /* - * The moving average of the change in the total task - * service time. - */ - serviceNanoTimeTask.run(); - - /* - * Time that the task is busy checkpoint its indices (this - * is already reported as part of the service time but which - * is broken out here as a detail). - */ - { + } + } - final long newValue = taskCounters.checkpointNanoTime.get(); - final long delta = newValue - checkpointTime; - assert delta >= 0 : "" + delta; + protected void calculateAverageQueueWait() { + /* + * Time waiting on the queue to begin execution. + */ + { - checkpointTime = newValue; + final long newValue = taskCounters.queueWaitingNanoTime.get(); - averageCheckpointTime = getMovingAverage( - averageCheckpointTime, - (delta * scalingFactor / taskCounters.taskCompleteCount.get()), - w); + final long delta = newValue - queueWaitingTime; - } + assert delta >= 0 : "" + delta; - /* - * Queuing time (elapsed time from submit until completion). - */ - { + queueWaitingTime = newValue; - final long newValue = taskCounters.queuingNanoTime.get(); + averageQueueWaitingTime = getMovingAverage( + averageQueueWaitingTime, + (delta * scalingFactor / taskCounters.taskCompleteCount.get()), + w); - final long delta = newValue - queuingTime; + } + } - assert delta >= 0 : "" + delta; + protected void calculateBasicQueueInfo() { + queueSizeTask.run(); - queuingTime = newValue; + // queueSize := #of tasks in the queue. + final int queueSize = service.getQueue().size(); - averageQueuingTime = getMovingAverage( - averageQueuingTime, - (delta * scalingFactor / taskCounters.taskCompleteCount.get()), - w); + // activeCount := #of tasks assigned a worker thread + final int activeCount = service.getActiveCount(); - } +//// This is just the tasks that are currently waiting to run (not +//// assigned to any thread). +// averageQueueSize = getMovingAverage(averageQueueSize, +// queueSize, w); - } + // This is just the tasks that are currently running (assigned + // to a worker thread). + averageActiveCount = getMovingAverage(averageActiveCount, + activeCount, w); - if (service instanceof WriteExecutorService) { + /* + * Note: this is the primary average of interest - it includes + * both the tasks waiting to be run and those that are currently + * running in the definition of the "queue length". + */ + averageQueueLength = getMovingAverage(averageQueueLength, + (activeCount + queueSize), w); + } - final WriteExecutorService tmp = (WriteExecutorService) service; + /** + * Convenience call to generate a counter set. Currently creates a new CounterSet and calls fillCounterSet + * to populate the data. + * + * @return A newly created and filled <i>counterSet</i> + */ + public CounterSet getCounters() { - final WriteTaskCounters writeTaskCounters = (WriteTaskCounters) taskCounters; - - final long groupCommitCount = tmp.getGroupCommitCount(); + final CounterSet counterSet = new CounterSet(); + fillCounterSet(counterSet); - if (groupCommitCount > 0) { - // Time waiting for the commit. - { + return counterSet; - final long newValue = writeTaskCounters.commitWaitingNanoTime - .get(); +} - final long delta = newValue - commitWaitingTime; - - assert delta >= 0 : "" + delta; - - commitWaitingTime = newValue; - - averageCommitWaitingTime = getMovingAverage( - averageCommitWaitingTime, - (delta * scalingFactor / groupCommitCount), - w); - - } - - // Time servicing the commit. - { - - final long newValue = writeTaskCounters.commitServiceNanoTime - .get(); - - final long delta = newValue - commitServiceTime; - - assert delta >= 0 : "" + delta; - - commitServiceTime = newValue; - - averageCommitServiceTime = getMovingAverage( - averageCommitServiceTime, - (delta * scalingFactor / groupCommitCount), - w); - - } - - } - - // moving average of the size nready. - averageReadyCount = getMovingAverage( - averageReadyCount, tmp.getReadyCount(), w); - - // moving average of the size of the commit groups. - averageCommitGroupSize = getMovingAverage( - averageCommitGroupSize, tmp.getCommitGroupSize(), w); - - // moving average of the #of bytes written since the - // previous commit. - averageByteCountPerCommit = getMovingAverage( - averageByteCountPerCommit, tmp - .getByteCountPerCommit(), w); - - } // end (if service instanceof WriteExecutorService ) - - } - - nsamples++; - - } catch (Exception ex) { - - log.warn(serviceName, ex); - - } - - } - /** * Adds counters for all innate variables defined for a * {@link ThreadPoolExecutor} and for each of the variables computed by this - * class. Note that some variables (e.g., the lock waiting time) are only - * available when the <i>service</i> specified to the ctor is a - * {@link WriteExecutorService}. - * - * @param counterSet - * The counters will be added to this {@link CounterSet}. - * - * @return The caller's <i>counterSet</i> + * class. Sub-classes can override this method to fill in additional counters in the provided counter set. + * + * @param counterSet the set that will have the counters added to it. + * */ - public CounterSet getCounters() { - - final CounterSet counterSet = new CounterSet(); - + protected void fillCounterSet(CounterSet counterSet) { /* - * Defined for ThreadPoolExecutor. - */ + * Defined for ThreadPoolExecutor. + */ // Note: reported as moving average instead. // counterSet.addCounter("#active", @@ -565,7 +484,7 @@ // setValue(service.getActiveCount()); // } // }); -// +// // Note: reported as moving average instead. // counterSet.addCounter("#queued", // new Instrument<Integer>() { @@ -783,7 +702,7 @@ setValue(1d / t); } }); - + counterSet .addCounter( IThreadPoolExecutorTaskCounters.AverageQueueWaitingTime, @@ -813,163 +732,7 @@ }); } + } - /* - * These data are available only for the write service. - */ - if (service instanceof WriteExecutorService) { - final WriteExecutorService writeService = (WriteExecutorService) service; - - /* - * Simple counters. - */ - - counterSet.addCounter(IWriteServiceExecutorCounters.CommitCount, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getGroupCommitCount()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.AbortCount, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getAbortCount()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.OverflowCount, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getOverflowCount()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.RejectedExecutionCount, - new Instrument<Long>() { - public void sample() { - setValue(writeService - .getRejectedExecutionCount()); - } - }); - - /* - * Maximum observed values. - */ - - counterSet.addCounter(IWriteServiceExecutorCounters.MaxCommitWaitingTime, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getMaxCommitWaitingTime()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.MaxCommitServiceTime, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getMaxCommitServiceTime()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.MaxCommitGroupSize, - new Instrument<Long>() { - public void sample() { - setValue((long) writeService - .getMaxCommitGroupSize()); - } - }); - - counterSet.addCounter(IWriteServiceExecutorCounters.MaxRunning, - new Instrument<Long>() { - public void sample() { - setValue(writeService.getMaxRunning()); - } - }); - - /* - * Moving averages available only for the write executor - * service. - */ - - counterSet - .addCounter( - IWriteServiceExecutorCounters.AverageActiveCountWithLocksHeld, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageActiveCountWithLocksHeld); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageReadyCount, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageReadyCount); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageCommitGroupSize, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageCommitGroupSize); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageLockWaitingTime, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageLockWaitingTime); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageCheckpointTime, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageCheckpointTime); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageCommitWaitingTime, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageCommitWaitingTime); - } - }); - - counterSet.addCounter( - IWriteServiceExecutorCounters.AverageCommitServiceTime, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageCommitServiceTime); - } - }); - - counterSet - .addCounter( - IWriteServiceExecutorCounters.AverageByteCountPerCommit, - new Instrument<Double>() { - @Override - protected void sample() { - setValue(averageByteCountPerCommit); - } - }); - - } - - return counterSet; - } - -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |