From: <tho...@us...> - 2010-08-01 18:53:27
|
Revision: 3382 http://bigdata.svn.sourceforge.net/bigdata/?rev=3382&view=rev Author: thompsonbry Date: 2010-08-01 18:53:21 +0000 (Sun, 01 Aug 2010) Log Message: ----------- Modified scale-out to use the WORMStrategy. Interned some class names in IndexMetadata which were being redundantly stored in the heap. Changed the index segment file naming convention to use enough digits (10) to represent an int32 index partition identifier. Fixed the DirectBufferPool statistics. Since these now represent a collection of pools, they have to be dynamically reattached in order to update correctly. Fixed the benchmark.txt queries for the DirectBufferPools. Restored the use of the parallel old generation GC mode to the bigdataCluster config files, but commented out the explicit assignment of a number of cores to be used for GC since the JVM default is the #of cores on the machine and this is otherwise machine specific. Turned down the BlockingBuffer logging level (to ERROR) since it was cluttering the detail.log file. Modified Paths: -------------- trunk/bigdata/src/java/com/bigdata/btree/IndexMetadata.java trunk/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java trunk/bigdata/src/java/com/bigdata/io/WriteCache.java trunk/bigdata/src/java/com/bigdata/journal/AbstractJournal.java trunk/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java trunk/bigdata/src/java/com/bigdata/resources/OverflowManager.java trunk/bigdata/src/java/com/bigdata/resources/ResourceEvents.java trunk/bigdata/src/java/com/bigdata/resources/StoreManager.java trunk/bigdata/src/java/com/bigdata/service/DataService.java trunk/bigdata/src/java/com/bigdata/service/DefaultServiceFederationDelegate.java trunk/src/resources/analysis/queries/benchmark.txt trunk/src/resources/config/bigdataCluster.config trunk/src/resources/config/bigdataCluster16.config trunk/src/resources/config/log4j.properties Modified: trunk/bigdata/src/java/com/bigdata/btree/IndexMetadata.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/btree/IndexMetadata.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/btree/IndexMetadata.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -2049,10 +2049,14 @@ // Note: default assumes NOT an index partition. this.pmd = null; + /* Intern'd to reduce duplication on the heap. Will be com.bigdata.btree.BTree or + * com.bigdata.btree.IndexSegment and occasionally a class derived from BTree. + */ this.btreeClassName = getProperty(indexManager, properties, namespace, - Options.BTREE_CLASS_NAME, BTree.class.getName().toString()); + Options.BTREE_CLASS_NAME, BTree.class.getName()).intern(); - this.checkpointClassName = Checkpoint.class.getName(); + // Intern'd to reduce duplication on the heap. + this.checkpointClassName = Checkpoint.class.getName().intern(); // this.addrSer = AddressSerializer.INSTANCE; Modified: trunk/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/counters/AbstractStatisticsCollector.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -277,19 +277,19 @@ AbstractStatisticsCollector .addGarbageCollectorMXBeanCounters(serviceRoot .makePath(ICounterHierarchy.Memory_GarbageCollectors)); - - /* - * Add counters reporting on the various DirectBufferPools. - */ - { - // general purpose pool. - serviceRoot.makePath( - IProcessCounters.Memory + ICounterSet.pathSeparator - + "DirectBufferPool").attach( - DirectBufferPool.getCounters()); - - } + // Moved since counters must be dynamically reattached to reflect pool hierarchy. +// /* +// * Add counters reporting on the various DirectBufferPools. +// */ +// { +// +// serviceRoot.makePath( +// IProcessCounters.Memory + ICounterSet.pathSeparator +// + "DirectBufferPool").attach( +// DirectBufferPool.getCounters()); +// +// } if (LRUNexus.INSTANCE != null) { Modified: trunk/bigdata/src/java/com/bigdata/io/WriteCache.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/io/WriteCache.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/io/WriteCache.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -51,7 +51,7 @@ import com.bigdata.counters.Instrument; import com.bigdata.journal.AbstractBufferStrategy; import com.bigdata.journal.DiskOnlyStrategy; -import com.bigdata.journal.DiskOnlyStrategy.StoreCounters; +//import com.bigdata.journal.DiskOnlyStrategy.StoreCounters; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; import com.bigdata.rwstore.RWStore; Modified: trunk/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -1027,33 +1027,33 @@ } - case Disk: { +// case Disk: { +// +// /* +// * Setup the buffer strategy. +// */ +// +// fileMetadata = new FileMetadata(file, BufferMode.Disk, +// useDirectBuffers, initialExtent, maximumExtent, create, +// isEmptyFile, deleteOnExit, readOnly, forceWrites, +// offsetBits, //readCacheCapacity, readCacheMaxRecordSize, +// //readOnly ? null : writeCache, +// writeCacheEnabled, +// validateChecksum, +// createTime, checker, alternateRootBlock); +// +// _bufferStrategy = new DiskOnlyStrategy( +// 0L/* soft limit for maximumExtent */, +//// minimumExtension, +// fileMetadata); +// +// this._rootBlock = fileMetadata.rootBlock; +// +// break; +// +// } - /* - * Setup the buffer strategy. - */ - - fileMetadata = new FileMetadata(file, BufferMode.Disk, - useDirectBuffers, initialExtent, maximumExtent, create, - isEmptyFile, deleteOnExit, readOnly, forceWrites, - offsetBits, //readCacheCapacity, readCacheMaxRecordSize, - //readOnly ? null : writeCache, - writeCacheEnabled, - validateChecksum, - createTime, checker, alternateRootBlock); - - _bufferStrategy = new DiskOnlyStrategy( - 0L/* soft limit for maximumExtent */, -// minimumExtension, - fileMetadata); - - this._rootBlock = fileMetadata.rootBlock; - - break; - - } - -// case Disk: + case Disk: case DiskWORM: { /* Modified: trunk/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -46,6 +46,7 @@ import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; +import com.bigdata.journal.WORMStrategy.StoreCounters; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; import com.bigdata.resources.StoreManager.ManagedJournal; @@ -501,7 +502,7 @@ writeCache.flush(); - storeCounters.ncacheFlush++; +// storeCounters.ncacheFlush++; } @@ -544,551 +545,551 @@ } - /** - * Counters for {@link IRawStore} access, including operations that read or - * write through to the underlying media. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - * - * @todo report elapsed time and average latency for force, reopen, and - * writeRootBlock. - * - * @todo counters need to be atomic if we want to avoid the possibility of - * concurrent <code>x++</code> operations failing to correctly - * increment <code>x</code> for each request. - */ - public static class StoreCounters { - - /** - * #of read requests. - */ - public long nreads; - - /** - * #of read requests that are satisfied by our write cache (vs the - * OS or disk level write cache). - */ - public long ncacheRead; - - /** - * #of read requests that read through to the backing file. - */ - public long ndiskRead; - - /** - * #of bytes read. - */ - public long bytesRead; - - /** - * #of bytes that have been read from the disk. - */ - public long bytesReadFromDisk; - - /** - * The size of the largest record read. - */ - public long maxReadSize; - - /** - * Total elapsed time for reads. - */ - public long elapsedReadNanos; - - /** - * Total elapsed time checking the disk write cache for records to be - * read. - */ - public long elapsedCacheReadNanos; - - /** - * Total elapsed time for reading on the disk. - */ - public long elapsedDiskReadNanos; - - /** - * #of write requests. - */ - public long nwrites; - - /** - * #of write requests that are absorbed by our write cache (vs the OS or - * disk level write cache). - */ - public long ncacheWrite; - - /** - * #of times the write cache was flushed to disk. - */ - public long ncacheFlush; - - /** - * #of write requests that write through to the backing file. - */ - public long ndiskWrite; - - /** - * The size of the largest record written. - */ - public long maxWriteSize; - - /** - * #of bytes written. - */ - public long bytesWritten; - - /** - * #of bytes that have been written on the disk. - */ - public long bytesWrittenOnDisk; - - /** - * Total elapsed time for writes. - */ - public long elapsedWriteNanos; - - /** - * Total elapsed time writing records into the cache (does not count - * time to flush the cache when it is full or to write records that do - * not fit in the cache directly to the disk). - */ - public long elapsedCacheWriteNanos; - - /** - * Total elapsed time for writing on the disk. - */ - public long elapsedDiskWriteNanos; - - /** - * #of times the data were forced to the disk. - */ - public long nforce; - - /** - * #of times the length of the file was changed (typically, extended). - */ - public long ntruncate; - - /** - * #of times the file has been reopened after it was closed by an - * interrupt. - */ - public long nreopen; - - /** - * #of times one of the root blocks has been written. - */ - public long nwriteRootBlock; - - /** - * Initialize a new set of counters. - */ - public StoreCounters() { - - } - - /** - * Copy ctor. - * @param o - */ - public StoreCounters(final StoreCounters o) { - - add( o ); - - } - - /** - * Adds counters to the current counters. - * - * @param o - */ - public void add(final StoreCounters o) { - - nreads += o.nreads; - ncacheRead += o.ncacheRead; - ndiskRead += o.ndiskRead; - bytesRead += o.bytesRead; - bytesReadFromDisk += o.bytesReadFromDisk; - maxReadSize += o.maxReadSize; - elapsedReadNanos += o.elapsedReadNanos; - elapsedCacheReadNanos += o.elapsedCacheReadNanos; - elapsedDiskReadNanos += o.elapsedDiskReadNanos; - - nwrites += o.nwrites; - ncacheWrite += o.ncacheWrite; - ncacheFlush += o.ncacheFlush; - ndiskWrite += o.ndiskWrite; - maxWriteSize += o.maxWriteSize; - bytesWritten += o.bytesWritten; - bytesWrittenOnDisk += o.bytesWrittenOnDisk; - elapsedWriteNanos += o.elapsedWriteNanos; - elapsedCacheWriteNanos += o.elapsedCacheWriteNanos; - elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; - - nforce += o.nforce; - ntruncate += o.ntruncate; - nreopen += o.nreopen; - nwriteRootBlock += o.nwriteRootBlock; - - } - - /** - * Returns a new {@link StoreCounters} containing the current counter values - * minus the given counter values. - * - * @param o - * - * @return - */ - public StoreCounters subtract(final StoreCounters o) { - - // make a copy of the current counters. - final StoreCounters t = new StoreCounters(this); - - // subtract out the given counters. - t.nreads -= o.nreads; - t.ncacheRead -= o.ncacheRead; - t.ndiskRead -= o.ndiskRead; - t.bytesRead -= o.bytesRead; - t.bytesReadFromDisk -= o.bytesReadFromDisk; - t.maxReadSize -= o.maxReadSize; - t.elapsedReadNanos -= o.elapsedReadNanos; - t.elapsedCacheReadNanos -= o.elapsedCacheReadNanos; - t.elapsedDiskReadNanos -= o.elapsedDiskReadNanos; - - t.nwrites -= o.nwrites; - t.ncacheWrite -= o.ncacheWrite; - t.ncacheFlush -= o.ncacheFlush; - t.ndiskWrite -= o.ndiskWrite; - t.maxWriteSize -= o.maxWriteSize; - t.bytesWritten -= o.bytesWritten; - t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; - t.elapsedWriteNanos -= o.elapsedWriteNanos; - t.elapsedCacheWriteNanos -= o.elapsedCacheWriteNanos; - t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; - - t.nforce -= o.nforce; - t.ntruncate -= o.ntruncate; - t.nreopen -= o.nreopen; - t.nwriteRootBlock -= o.nwriteRootBlock; - - return t; - - } - - synchronized public CounterSet getCounters() { - - if (root == null) { - - root = new CounterSet(); - - // IRawStore API - { - - /* - * reads - */ - - root.addCounter("nreads", new Instrument<Long>() { - public void sample() { - setValue(nreads); - } - }); - - root.addCounter("bytesRead", new Instrument<Long>() { - public void sample() { - setValue(bytesRead); - } - }); - - root.addCounter("readSecs", new Instrument<Double>() { - public void sample() { - final double elapsedReadSecs = (elapsedReadNanos / 1000000000.); - setValue(elapsedReadSecs); - } - }); - - root.addCounter("bytesReadPerSec", - new Instrument<Double>() { - public void sample() { - final double readSecs = (elapsedReadNanos / 1000000000.); - final double bytesReadPerSec = (readSecs == 0L ? 0d - : (bytesRead / readSecs)); - setValue(bytesReadPerSec); - } - }); - - root.addCounter("maxReadSize", new Instrument<Long>() { - public void sample() { - setValue(maxReadSize); - } - }); - - /* - * writes - */ - - root.addCounter("nwrites", new Instrument<Long>() { - public void sample() { - setValue(nwrites); - } - }); - - root.addCounter("bytesWritten", new Instrument<Long>() { - public void sample() { - setValue(bytesWritten); - } - }); - - root.addCounter("writeSecs", new Instrument<Double>() { - public void sample() { - final double writeSecs = (elapsedWriteNanos / 1000000000.); - setValue(writeSecs); - } - }); - - root.addCounter("bytesWrittenPerSec", - new Instrument<Double>() { - public void sample() { - final double writeSecs = (elapsedWriteNanos / 1000000000.); - final double bytesWrittenPerSec = (writeSecs == 0L ? 0d - : (bytesWritten / writeSecs)); - setValue(bytesWrittenPerSec); - } - }); - - root.addCounter("maxWriteSize", new Instrument<Long>() { - public void sample() { - setValue(maxWriteSize); - } - }); - - } - - /* - * write cache statistics - */ - { - - final CounterSet writeCache = root.makePath("writeCache"); - - /* - * read - */ - writeCache.addCounter("nread", new Instrument<Long>() { - public void sample() { - setValue(ncacheRead); - } - }); - - writeCache.addCounter("readHitRate", new Instrument<Double>() { - public void sample() { - setValue(nreads == 0L ? 0d : (double) ncacheRead - / nreads); - } - }); - - writeCache.addCounter("readSecs", new Instrument<Double>() { - public void sample() { - setValue(elapsedCacheReadNanos / 1000000000.); - } - }); - - /* - * write - */ - - // #of writes on the write cache. - writeCache.addCounter("nwrite", new Instrument<Long>() { - public void sample() { - setValue(ncacheWrite); - } - }); - - /* - * % of writes that are buffered vs writing through to the - * disk. - * - * Note: This will be 1.0 unless you are writing large - * records. Large records are written directly to the disk - * rather than first into the write cache. When this happens - * the writeHitRate on the cache can be less than one. - */ - writeCache.addCounter("writeHitRate", new Instrument<Double>() { - public void sample() { - setValue(nwrites == 0L ? 0d : (double) ncacheWrite - / nwrites); - } - }); - - writeCache.addCounter("writeSecs", new Instrument<Double>() { - public void sample() { - setValue(elapsedCacheWriteNanos / 1000000000.); - } - }); - - // #of times the write cache was flushed to the disk. - writeCache.addCounter("nflush", new Instrument<Long>() { - public void sample() { - setValue(ncacheFlush); - } - }); - - } - - // disk statistics - { - final CounterSet disk = root.makePath("disk"); - - /* - * read - */ - - disk.addCounter("nreads", new Instrument<Long>() { - public void sample() { - setValue(ndiskRead); - } - }); - - disk.addCounter("bytesRead", new Instrument<Long>() { - public void sample() { - setValue(bytesReadFromDisk); - } - }); - - disk.addCounter("bytesPerRead", new Instrument<Double>() { - public void sample() { - final double bytesPerDiskRead = (ndiskRead == 0 ? 0d - : (bytesReadFromDisk / (double)ndiskRead)); - setValue(bytesPerDiskRead); - } - }); - - disk.addCounter("readSecs", new Instrument<Double>() { - public void sample() { - final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); - setValue(diskReadSecs); - } - }); - - disk.addCounter("bytesReadPerSec", - new Instrument<Double>() { - public void sample() { - final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); - final double bytesReadPerSec = (diskReadSecs == 0L ? 0d - : bytesReadFromDisk / diskReadSecs); - setValue(bytesReadPerSec); - } - }); - - disk.addCounter("secsPerRead", new Instrument<Double>() { - public void sample() { - final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); - final double readLatency = (diskReadSecs == 0 ? 0d - : diskReadSecs / ndiskRead); - setValue(readLatency); - } - }); - - /* - * write - */ - - disk.addCounter("nwrites", new Instrument<Long>() { - public void sample() { - setValue(ndiskWrite); - } - }); - - disk.addCounter("bytesWritten", new Instrument<Long>() { - public void sample() { - setValue(bytesWrittenOnDisk); - } - }); - - disk.addCounter("bytesPerWrite", new Instrument<Double>() { - public void sample() { - final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d - : (bytesWrittenOnDisk / (double)ndiskWrite)); - setValue(bytesPerDiskWrite); - } - }); - - disk.addCounter("writeSecs", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - setValue(diskWriteSecs); - } - }); - - disk.addCounter("bytesWrittenPerSec", - new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d - : bytesWrittenOnDisk - / diskWriteSecs); - setValue(bytesWrittenPerSec); - } - }); - - disk.addCounter("secsPerWrite", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double writeLatency = (diskWriteSecs == 0 ? 0d - : diskWriteSecs / ndiskWrite); - setValue(writeLatency); - } - }); - - /* - * other - */ - - disk.addCounter("nforce", new Instrument<Long>() { - public void sample() { - setValue(nforce); - } - }); - - disk.addCounter("nextend", new Instrument<Long>() { - public void sample() { - setValue(ntruncate); - } - }); - - disk.addCounter("nreopen", new Instrument<Long>() { - public void sample() { - setValue(nreopen); - } - }); - - disk.addCounter("rootBlockWrites", new Instrument<Long>() { - public void sample() { - setValue(nwriteRootBlock); - } - }); - - } - - } - - return root; - - } - private CounterSet root; - - /** - * Human readable representation of the counters. - */ - public String toString() { - - return getCounters().toString(); - - } - - } +// /** +// * Counters for {@link IRawStore} access, including operations that read or +// * write through to the underlying media. +// * +// * @author <a href="mailto:tho...@us...">Bryan Thompson</a> +// * @version $Id$ +// * +// * @todo report elapsed time and average latency for force, reopen, and +// * writeRootBlock. +// * +// * @todo counters need to be atomic if we want to avoid the possibility of +// * concurrent <code>x++</code> operations failing to correctly +// * increment <code>x</code> for each request. +// */ +// public static class StoreCounters { +// +// /** +// * #of read requests. +// */ +// public long nreads; +// +// /** +// * #of read requests that are satisfied by our write cache (vs the +// * OS or disk level write cache). +// */ +// public long ncacheRead; +// +// /** +// * #of read requests that read through to the backing file. +// */ +// public long ndiskRead; +// +// /** +// * #of bytes read. +// */ +// public long bytesRead; +// +// /** +// * #of bytes that have been read from the disk. +// */ +// public long bytesReadFromDisk; +// +// /** +// * The size of the largest record read. +// */ +// public long maxReadSize; +// +// /** +// * Total elapsed time for reads. +// */ +// public long elapsedReadNanos; +// +// /** +// * Total elapsed time checking the disk write cache for records to be +// * read. +// */ +// public long elapsedCacheReadNanos; +// +// /** +// * Total elapsed time for reading on the disk. +// */ +// public long elapsedDiskReadNanos; +// +// /** +// * #of write requests. +// */ +// public long nwrites; +// +// /** +// * #of write requests that are absorbed by our write cache (vs the OS or +// * disk level write cache). +// */ +// public long ncacheWrite; +// +// /** +// * #of times the write cache was flushed to disk. +// */ +// public long ncacheFlush; +// +// /** +// * #of write requests that write through to the backing file. +// */ +// public long ndiskWrite; +// +// /** +// * The size of the largest record written. +// */ +// public long maxWriteSize; +// +// /** +// * #of bytes written. +// */ +// public long bytesWritten; +// +// /** +// * #of bytes that have been written on the disk. +// */ +// public long bytesWrittenOnDisk; +// +// /** +// * Total elapsed time for writes. +// */ +// public long elapsedWriteNanos; +// +// /** +// * Total elapsed time writing records into the cache (does not count +// * time to flush the cache when it is full or to write records that do +// * not fit in the cache directly to the disk). +// */ +// public long elapsedCacheWriteNanos; +// +// /** +// * Total elapsed time for writing on the disk. +// */ +// public long elapsedDiskWriteNanos; +// +// /** +// * #of times the data were forced to the disk. +// */ +// public long nforce; +// +// /** +// * #of times the length of the file was changed (typically, extended). +// */ +// public long ntruncate; +// +// /** +// * #of times the file has been reopened after it was closed by an +// * interrupt. +// */ +// public long nreopen; +// +// /** +// * #of times one of the root blocks has been written. +// */ +// public long nwriteRootBlock; +// +// /** +// * Initialize a new set of counters. +// */ +// public StoreCounters() { +// +// } +// +// /** +// * Copy ctor. +// * @param o +// */ +// public StoreCounters(final StoreCounters o) { +// +// add( o ); +// +// } +// +// /** +// * Adds counters to the current counters. +// * +// * @param o +// */ +// public void add(final StoreCounters o) { +// +// nreads += o.nreads; +// ncacheRead += o.ncacheRead; +// ndiskRead += o.ndiskRead; +// bytesRead += o.bytesRead; +// bytesReadFromDisk += o.bytesReadFromDisk; +// maxReadSize += o.maxReadSize; +// elapsedReadNanos += o.elapsedReadNanos; +// elapsedCacheReadNanos += o.elapsedCacheReadNanos; +// elapsedDiskReadNanos += o.elapsedDiskReadNanos; +// +// nwrites += o.nwrites; +// ncacheWrite += o.ncacheWrite; +// ncacheFlush += o.ncacheFlush; +// ndiskWrite += o.ndiskWrite; +// maxWriteSize += o.maxWriteSize; +// bytesWritten += o.bytesWritten; +// bytesWrittenOnDisk += o.bytesWrittenOnDisk; +// elapsedWriteNanos += o.elapsedWriteNanos; +// elapsedCacheWriteNanos += o.elapsedCacheWriteNanos; +// elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; +// +// nforce += o.nforce; +// ntruncate += o.ntruncate; +// nreopen += o.nreopen; +// nwriteRootBlock += o.nwriteRootBlock; +// +// } +// +// /** +// * Returns a new {@link StoreCounters} containing the current counter values +// * minus the given counter values. +// * +// * @param o +// * +// * @return +// */ +// public StoreCounters subtract(final StoreCounters o) { +// +// // make a copy of the current counters. +// final StoreCounters t = new StoreCounters(this); +// +// // subtract out the given counters. +// t.nreads -= o.nreads; +// t.ncacheRead -= o.ncacheRead; +// t.ndiskRead -= o.ndiskRead; +// t.bytesRead -= o.bytesRead; +// t.bytesReadFromDisk -= o.bytesReadFromDisk; +// t.maxReadSize -= o.maxReadSize; +// t.elapsedReadNanos -= o.elapsedReadNanos; +// t.elapsedCacheReadNanos -= o.elapsedCacheReadNanos; +// t.elapsedDiskReadNanos -= o.elapsedDiskReadNanos; +// +// t.nwrites -= o.nwrites; +// t.ncacheWrite -= o.ncacheWrite; +// t.ncacheFlush -= o.ncacheFlush; +// t.ndiskWrite -= o.ndiskWrite; +// t.maxWriteSize -= o.maxWriteSize; +// t.bytesWritten -= o.bytesWritten; +// t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; +// t.elapsedWriteNanos -= o.elapsedWriteNanos; +// t.elapsedCacheWriteNanos -= o.elapsedCacheWriteNanos; +// t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; +// +// t.nforce -= o.nforce; +// t.ntruncate -= o.ntruncate; +// t.nreopen -= o.nreopen; +// t.nwriteRootBlock -= o.nwriteRootBlock; +// +// return t; +// +// } +// +// synchronized public CounterSet getCounters() { +// +// if (root == null) { +// +// root = new CounterSet(); +// +// // IRawStore API +// { +// +// /* +// * reads +// */ +// +// root.addCounter("nreads", new Instrument<Long>() { +// public void sample() { +// setValue(nreads); +// } +// }); +// +// root.addCounter("bytesRead", new Instrument<Long>() { +// public void sample() { +// setValue(bytesRead); +// } +// }); +// +// root.addCounter("readSecs", new Instrument<Double>() { +// public void sample() { +// final double elapsedReadSecs = (elapsedReadNanos / 1000000000.); +// setValue(elapsedReadSecs); +// } +// }); +// +// root.addCounter("bytesReadPerSec", +// new Instrument<Double>() { +// public void sample() { +// final double readSecs = (elapsedReadNanos / 1000000000.); +// final double bytesReadPerSec = (readSecs == 0L ? 0d +// : (bytesRead / readSecs)); +// setValue(bytesReadPerSec); +// } +// }); +// +// root.addCounter("maxReadSize", new Instrument<Long>() { +// public void sample() { +// setValue(maxReadSize); +// } +// }); +// +// /* +// * writes +// */ +// +// root.addCounter("nwrites", new Instrument<Long>() { +// public void sample() { +// setValue(nwrites); +// } +// }); +// +// root.addCounter("bytesWritten", new Instrument<Long>() { +// public void sample() { +// setValue(bytesWritten); +// } +// }); +// +// root.addCounter("writeSecs", new Instrument<Double>() { +// public void sample() { +// final double writeSecs = (elapsedWriteNanos / 1000000000.); +// setValue(writeSecs); +// } +// }); +// +// root.addCounter("bytesWrittenPerSec", +// new Instrument<Double>() { +// public void sample() { +// final double writeSecs = (elapsedWriteNanos / 1000000000.); +// final double bytesWrittenPerSec = (writeSecs == 0L ? 0d +// : (bytesWritten / writeSecs)); +// setValue(bytesWrittenPerSec); +// } +// }); +// +// root.addCounter("maxWriteSize", new Instrument<Long>() { +// public void sample() { +// setValue(maxWriteSize); +// } +// }); +// +// } +// +// /* +// * write cache statistics +// */ +// { +// +// final CounterSet writeCache = root.makePath("writeCache"); +// +// /* +// * read +// */ +// writeCache.addCounter("nread", new Instrument<Long>() { +// public void sample() { +// setValue(ncacheRead); +// } +// }); +// +// writeCache.addCounter("readHitRate", new Instrument<Double>() { +// public void sample() { +// setValue(nreads == 0L ? 0d : (double) ncacheRead +// / nreads); +// } +// }); +// +// writeCache.addCounter("readSecs", new Instrument<Double>() { +// public void sample() { +// setValue(elapsedCacheReadNanos / 1000000000.); +// } +// }); +// +// /* +// * write +// */ +// +// // #of writes on the write cache. +// writeCache.addCounter("nwrite", new Instrument<Long>() { +// public void sample() { +// setValue(ncacheWrite); +// } +// }); +// +// /* +// * % of writes that are buffered vs writing through to the +// * disk. +// * +// * Note: This will be 1.0 unless you are writing large +// * records. Large records are written directly to the disk +// * rather than first into the write cache. When this happens +// * the writeHitRate on the cache can be less than one. +// */ +// writeCache.addCounter("writeHitRate", new Instrument<Double>() { +// public void sample() { +// setValue(nwrites == 0L ? 0d : (double) ncacheWrite +// / nwrites); +// } +// }); +// +// writeCache.addCounter("writeSecs", new Instrument<Double>() { +// public void sample() { +// setValue(elapsedCacheWriteNanos / 1000000000.); +// } +// }); +// +// // #of times the write cache was flushed to the disk. +// writeCache.addCounter("nflush", new Instrument<Long>() { +// public void sample() { +// setValue(ncacheFlush); +// } +// }); +// +// } +// +// // disk statistics +// { +// final CounterSet disk = root.makePath("disk"); +// +// /* +// * read +// */ +// +// disk.addCounter("nreads", new Instrument<Long>() { +// public void sample() { +// setValue(ndiskRead); +// } +// }); +// +// disk.addCounter("bytesRead", new Instrument<Long>() { +// public void sample() { +// setValue(bytesReadFromDisk); +// } +// }); +// +// disk.addCounter("bytesPerRead", new Instrument<Double>() { +// public void sample() { +// final double bytesPerDiskRead = (ndiskRead == 0 ? 0d +// : (bytesReadFromDisk / (double)ndiskRead)); +// setValue(bytesPerDiskRead); +// } +// }); +// +// disk.addCounter("readSecs", new Instrument<Double>() { +// public void sample() { +// final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); +// setValue(diskReadSecs); +// } +// }); +// +// disk.addCounter("bytesReadPerSec", +// new Instrument<Double>() { +// public void sample() { +// final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); +// final double bytesReadPerSec = (diskReadSecs == 0L ? 0d +// : bytesReadFromDisk / diskReadSecs); +// setValue(bytesReadPerSec); +// } +// }); +// +// disk.addCounter("secsPerRead", new Instrument<Double>() { +// public void sample() { +// final double diskReadSecs = (elapsedDiskReadNanos / 1000000000.); +// final double readLatency = (diskReadSecs == 0 ? 0d +// : diskReadSecs / ndiskRead); +// setValue(readLatency); +// } +// }); +// +// /* +// * write +// */ +// +// disk.addCounter("nwrites", new Instrument<Long>() { +// public void sample() { +// setValue(ndiskWrite); +// } +// }); +// +// disk.addCounter("bytesWritten", new Instrument<Long>() { +// public void sample() { +// setValue(bytesWrittenOnDisk); +// } +// }); +// +// disk.addCounter("bytesPerWrite", new Instrument<Double>() { +// public void sample() { +// final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d +// : (bytesWrittenOnDisk / (double)ndiskWrite)); +// setValue(bytesPerDiskWrite); +// } +// }); +// +// disk.addCounter("writeSecs", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// setValue(diskWriteSecs); +// } +// }); +// +// disk.addCounter("bytesWrittenPerSec", +// new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d +// : bytesWrittenOnDisk +// / diskWriteSecs); +// setValue(bytesWrittenPerSec); +// } +// }); +// +// disk.addCounter("secsPerWrite", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double writeLatency = (diskWriteSecs == 0 ? 0d +// : diskWriteSecs / ndiskWrite); +// setValue(writeLatency); +// } +// }); +// +// /* +// * other +// */ +// +// disk.addCounter("nforce", new Instrument<Long>() { +// public void sample() { +// setValue(nforce); +// } +// }); +// +// disk.addCounter("nextend", new Instrument<Long>() { +// public void sample() { +// setValue(ntruncate); +// } +// }); +// +// disk.addCounter("nreopen", new Instrument<Long>() { +// public void sample() { +// setValue(nreopen); +// } +// }); +// +// disk.addCounter("rootBlockWrites", new Instrument<Long>() { +// public void sample() { +// setValue(nwriteRootBlock); +// } +// }); +// +// } +// +// } +// +// return root; +// +// } +// private CounterSet root; +// +// /** +// * Human readable representation of the counters. +// */ +// public String toString() { +// +// return getCounters().toString(); +// +// } +// +// } // class StoreCounters /** * Performance counters for this class. @@ -1615,7 +1616,7 @@ */ storeCounters.nreads++; storeCounters.bytesRead+=nbytes; - storeCounters.ncacheRead++; +// storeCounters.ncacheRead++; storeCounters.elapsedReadNanos+=(System.nanoTime()-begin); // return the new buffer. @@ -1623,7 +1624,7 @@ } else { - storeCounters.elapsedCacheReadNanos+=(System.nanoTime()-beginCache); +// storeCounters.elapsedCacheReadNanos+=(System.nanoTime()-beginCache); } @@ -2109,10 +2110,10 @@ writeCache.write(addr, data); - storeCounters.ncacheWrite++; +// storeCounters.ncacheWrite++; +// +// storeCounters.elapsedCacheWriteNanos+=(System.nanoTime()-beginCache); - storeCounters.elapsedCacheWriteNanos+=(System.nanoTime()-beginCache); - } } else { Modified: trunk/bigdata/src/java/com/bigdata/resources/OverflowManager.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/resources/OverflowManager.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/resources/OverflowManager.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -724,33 +724,36 @@ + ".movePercentCpuTimeThreshold"; String DEFAULT_MOVE_PERCENT_CPU_TIME_THRESHOLD = ".7"; - - /** - * The maximum #of optional compacting merge operations that will be - * performed during a single overflow event (default - * {@value #DEFAULT_OPTIONAL_COMPACTING_MERGES_PER_OVERFLOW}). - * <p> - * Once this #of optional compacting merge tasks have been identified - * for a given overflow event, the remainder of the index partitions - * that are neither split, joined, moved, nor copied will use - * incremental builds. An incremental build is generally cheaper since - * it only copies the data on the mutable {@link BTree} for the - * lastCommitTime rather than the fused view. A compacting merge permits - * the older index segments to be released and results in a simpler view - * with view {@link IndexSegment}s. Either a compacting merge or an - * incremental build will permit old journals to be released once the - * commit points on those journals are no longer required. - * <p> - * Note: Mandatory compacting merges are identified based on - * {@link #MAXIMUM_JOURNALS_PER_VIEW} and - * {@link #MAXIMUM_SEGMENTS_PER_VIEW}. There is NO limit the #of - * mandatory compacting merges that will be performed during an - * asynchronous overflow event. However, each mandatory compacting merge - * does count towards the maximum #of optional merges. Therefore if the - * #of mandatory compacting merges is greater than this parameter then - * NO optional compacting merges will be selected in a given overflow - * cycle. - */ + + /** + * The maximum #of optional compacting merge operations that will be + * performed during a single overflow event (default + * {@value #DEFAULT_OPTIONAL_COMPACTING_MERGES_PER_OVERFLOW}). + * <p> + * Once this #of optional compacting merge tasks have been identified + * for a given overflow event, the remainder of the index partitions + * that are neither split, joined, moved, nor copied will use + * incremental builds. An incremental build is generally cheaper since + * it only copies the data on the mutable {@link BTree} for the + * lastCommitTime rather than the fused view. A compacting merge permits + * the older index segments to be released and results in a simpler view + * with view {@link IndexSegment}s. Either a compacting merge or an + * incremental build will permit old journals to be released once the + * commit points on those journals are no longer required. + * <p> + * Note: Mandatory compacting merges are identified based on + * {@link #MAXIMUM_JOURNALS_PER_VIEW} and + * {@link #MAXIMUM_SEGMENTS_PER_VIEW}. There is NO limit the #of + * mandatory compacting merges that will be performed during an + * asynchronous overflow event. However, each mandatory compacting merge + * does count towards the maximum #of optional merges. Therefore if the + * #of mandatory compacting merges is greater than this parameter then + * NO optional compacting merges will be selected in a given overflow + * cycle. + * + * @deprecated merges are now performed in priority order while time + * remains in a given asynchronous overflow cycle. + */ String MAXIMUM_OPTIONAL_MERGES_PER_OVERFLOW = OverflowManager.class .getName() + ".maximumOptionalMergesPerOverflow"; Modified: trunk/bigdata/src/java/com/bigdata/resources/ResourceEvents.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/resources/ResourceEvents.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/resources/ResourceEvents.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -112,7 +112,8 @@ /** * Leading zeros without commas used to format the partition identifiers - * into index segment file names. + * into index segment file names. This uses 10 digits, which is enough + * to represent {@link Integer#MAX_VALUE}. */ static NumberFormat leadingZeros; @@ -130,7 +131,7 @@ leadingZeros = NumberFormat.getIntegerInstance(); - leadingZeros.setMinimumIntegerDigits(5); + leadingZeros.setMinimumIntegerDigits(10); leadingZeros.setGroupingUsed(false); Modified: trunk/bigdata/src/java/com/bigdata/resources/StoreManager.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -85,8 +85,9 @@ import com.bigdata.journal.ITx; import com.bigdata.journal.Name2Addr; import com.bigdata.journal.TemporaryStore; +import com.bigdata.journal.WORMStrategy; import com.bigdata.journal.WriteExecutorService; -import com.bigdata.journal.DiskOnlyStrategy.StoreCounters; +import com.bigdata.journal.WORMStrategy.StoreCounters; import com.bigdata.mdi.IPartitionMetadata; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.mdi.IndexPartitionCause; @@ -2454,6 +2455,11 @@ ((DiskOnlyStrategy) getBufferStrategy()) .setStoreCounters(getStoreCounters()); + } else if (getBufferStrategy() instanceof WORMStrategy) { + + ((WORMStrategy) getBufferStrategy()) + .setStoreCounters(getStoreCounters()); + } } @@ -4556,7 +4562,7 @@ // make sure that directory exists. indexDir.mkdirs(); - final String partitionStr = (partitionId == -1 ? "" : "_part" + final String partitionStr = (partitionId == -1 ? "" : "_shardId" + leadingZeros.format(partitionId)); final String prefix = mungedName + "" + partitionStr + "_"; Modified: trunk/bigdata/src/java/com/bigdata/service/DataService.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/service/DataService.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/service/DataService.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -413,6 +413,7 @@ * reattaching the counters for the live {@link ManagedJournal} during * synchronous overflow. */ + @Override synchronized public void reattachDynamicCounters() { final long now = System.currentTimeMillis(); @@ -422,6 +423,9 @@ if (service.isOpen() && service.resourceManager.isRunning() && elapsed > 5000/* ms */) { + // inherit base class behavior + super.reattachDynamicCounters(); + // The service's counter set hierarchy. final CounterSet serviceRoot = service.getFederation() .getServiceCounterSet(); Modified: trunk/bigdata/src/java/com/bigdata/service/DefaultServiceFederationDelegate.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/service/DefaultServiceFederationDelegate.java 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/bigdata/src/java/com/bigdata/service/DefaultServiceFederationDelegate.java 2010-08-01 18:53:21 UTC (rev 3382) @@ -41,7 +41,11 @@ import org.apache.log4j.Logger; import com.bigdata.counters.CounterSet; +import com.bigdata.counters.ICounterSet; +import com.bigdata.counters.IProcessCounters; import com.bigdata.counters.httpd.CounterSetHTTPD; +import com.bigdata.io.DirectBufferPool; +import com.bigdata.journal.ConcurrencyManager.IConcurrencyManagerCounters; import com.bigdata.util.httpd.AbstractHTTPD; /** @@ -92,9 +96,31 @@ } - /** NOP */ - public void reattachDynamicCounters() { + /** Reattaches the {@link DirectBufferPool} counters. */ + public void reattachDynamicCounters() { + // The service's counter set hierarchy. + final CounterSet serviceRoot = service.getFederation() + .getServiceCounterSet(); + + // Ensure path exists. + final CounterSet tmp = serviceRoot.makePath(IProcessCounters.Memory); + + /* + * Add counters reporting on the various DirectBufferPools. + */ + synchronized (tmp) { + + // detach the old counters (if any). + tmp.detach("DirectBufferPool"); + + // attach the current counters. + tmp.makePath("DirectBufferPool").attach( + DirectBufferPool.getCounters()); + + } + + } /** Modified: trunk/src/resources/analysis/queries/benchmark.txt =================================================================== --- trunk/src/resources/analysis/queries/benchmark.txt 2010-07-31 00:52:15 UTC (rev 3381) +++ trunk/src/resources/analysis/queries/benchmark.txt 2010-08-01 18:53:21 UTC (rev 3382) @@ -75,7 +75,8 @@ http://localhost:8080/?regex=/([^/]*)/.*Memory/Bytes%20Free&correlated=true&depth=3&file=memory/BytesFree http://localhost:8080/?regex=/([^/]*)/.*Memory/Swap%20Bytes%20Used&correlated=true&depth=3&period=Minutes&file=memory/SwapBytesUsed http://localhost:8080/?regex=/([^/]*)/.*Memory/Major%20Page%20Faults%20Per%20Second&correlated=true&depth=3&period=Minutes&file=memory/MajorPageFaultsPerSecond -http://localhost:8080/?regex=/([^/]*)/.*IDataService/.*/Memory/DirectBufferPool/poolSize&correlated=true&depth=12&file=memory/directBufferPool/poolSize +http://localhost:8080/?regex=/([^/]*)/.*IDataService/.*/Memory/DirectBufferPool/totalBytesUsed&correlated=true&depth=12&file=memory/directBufferPool/totalBytesUsed +http://localhost:8080/?regex=/([^/]*)/.*IDataService/.*/Memory/DirectBufferPool/(.*)/poolSize&correlated=true&depth=12&file=memory/directBufferPool/poolSize http://localhost:8080/?regex=/([^/]*)/.*(IDataService|IClientService)/.*/Memory/Virtual%20Size&correlated=true&depth=12&file=memory/VirtualSize http://localhost:8080/?regex=/... [truncated message content] |