From: <tho...@us...> - 2010-11-04 12:45:25
|
Revision: 3887 http://bigdata.svn.sourceforge.net/bigdata/?rev=3887&view=rev Author: thompsonbry Date: 2010-11-04 12:45:18 +0000 (Thu, 04 Nov 2010) Log Message: ----------- Added some fields from AbstractBufferStrategy for error messages to RWStrategy. Added support for HA (readFromLocalStore and writeRawBuffer). Modified Paths: -------------- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -55,8 +55,8 @@ import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IReopenChannel; import com.bigdata.journal.AbstractBufferStrategy; -import com.bigdata.journal.DiskOnlyStrategy; import com.bigdata.journal.StoreTypeEnum; +import com.bigdata.journal.WORMStrategy; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; @@ -75,7 +75,7 @@ * <ol> * <li>Gathered writes. This case is used by the {@link RWStore}.</li> * <li>Pure append of sequentially allocated records. This case is used by the - * {@link DiskOnlyStrategy} (WORM) and by the {@link IndexSegmentBuilder}.</li> + * {@link WORMStrategy} (WORM) and by the {@link IndexSegmentBuilder}.</li> * <li>Write of a single large buffer owned by the caller. This case may be used * when the caller wants to manage the buffers or when the caller's buffer is * larger than the write cache.</li> @@ -1482,7 +1482,7 @@ /** * A {@link WriteCache} implementation suitable for an append-only file such - * as the {@link DiskOnlyStrategy} or the output file of the + * as the {@link WORMStrategy} or the output file of the * {@link IndexSegmentBuilder}. * * @author <a href="mailto:tho...@us...">Bryan @@ -1622,10 +1622,12 @@ * Called by WriteCacheService to process a direct write for large * blocks and also to flush data from dirty caches. */ - protected boolean writeOnChannel(final ByteBuffer data, final long firstOffsetIgnored, - final Map<Long, RecordMetadata> recordMap, final long nanos) throws InterruptedException, IOException { + protected boolean writeOnChannel(final ByteBuffer data, + final long firstOffsetIgnored, + final Map<Long, RecordMetadata> recordMap, final long nanos) + throws InterruptedException, IOException { - final long begin = System.nanoTime(); + final long begin = System.nanoTime(); final int nbytes = data.remaining(); Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -76,7 +76,7 @@ * offset plus record length exceeds the {@link #nextOffset} on which data * would be written may be easily detected. */ - protected static final String ERR_ADDRESS_NOT_WRITTEN = "Address never written."; + public static final String ERR_ADDRESS_NOT_WRITTEN = "Address never written."; /** * Text of the error message used when a ZERO (0L) is passed as an address @@ -99,19 +99,19 @@ * array or native memory (both are limited to int32 bytes since they * are addressed by a Java <code>int</code>). */ - protected static final String ERR_INT32 = "Would exceed int32 bytes (not allowed unless backed by disk)."; + public static final String ERR_INT32 = "Would exceed int32 bytes (not allowed unless backed by disk)."; /** * Text of the error message used when * {@link IBufferStrategy#truncate(long)} would truncate data that has * already been written. */ - protected static final String ERR_TRUNCATE = "Would truncate written data."; + public static final String ERR_TRUNCATE = "Would truncate written data."; /** * Error message used when the writes are not allowed. */ - protected static final String ERR_READ_ONLY = "Read only"; + public static final String ERR_READ_ONLY = "Read only"; /** * Error message used when the record size is invalid (e.g., negative). @@ -119,14 +119,21 @@ * @todo There is some overlap with {@link #ERR_RECORD_LENGTH_ZERO} and * {@link #ERR_BUFFER_EMPTY}. */ - protected static final String ERR_BAD_RECORD_SIZE = "Bad record size"; + public static final String ERR_BAD_RECORD_SIZE = "Bad record size"; /** - * Error message used when the store is closed. + * Error message used when the store is closed but the operation requires + * that the store is open. */ public static final String ERR_NOT_OPEN = "Not open"; /** + * Error message used when the store is open by the operation requires that + * the store is closed. + */ + public static final String ERR_OPEN = "Open"; + + /** * Error message used when an operation would write more data than would be * permitted onto a buffer. */ Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -33,7 +33,7 @@ import org.apache.log4j.Logger; import com.bigdata.counters.CounterSet; -import com.bigdata.journal.WORMStrategy.StoreCounters; +import com.bigdata.ha.QuorumRead; import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.mdi.IResourceMetadata; import com.bigdata.quorum.Quorum; @@ -41,6 +41,8 @@ import com.bigdata.rawstore.IAddressManager; import com.bigdata.rwstore.IAllocationContext; import com.bigdata.rwstore.RWStore; +import com.bigdata.rwstore.RWStore.StoreCounters; +import com.bigdata.util.ChecksumError; /** * A highly scalable persistent {@link IBufferStrategy} wrapping the @@ -89,15 +91,26 @@ */ final private long m_initialExtent; + /** + * The HA {@link Quorum} (optional). + */ + private final Quorum<?,?> m_quorum; + /** * * @param fileMetadata - * @param quorum + * @param quorum The HA {@link Quorum} (optional). */ RWStrategy(final FileMetadata fileMetadata, final Quorum<?, ?> quorum) { + if (fileMetadata == null) + throw new IllegalArgumentException(); + m_uuid = fileMetadata.rootBlock.getUUID(); + // MAY be null. + m_quorum = quorum; + m_store = new RWStore(fileMetadata, quorum); m_initialExtent = fileMetadata.file.length(); @@ -110,31 +123,50 @@ } - /* - * FIXME This does not handle the read-from-peer HA integration. See - * WORMStrategy#read(). - * - * FIXME This does not update the StoreCounters. - */ public ByteBuffer read(final long addr) { - final int rwaddr = decodeAddr(addr); - final int sze = decodeSize(addr); + try { + // Try reading from the local store. + return readFromLocalStore(addr); + } catch (InterruptedException e) { + // wrap and rethrow. + throw new RuntimeException(e); + } catch (ChecksumError e) { + /* + * Note: This assumes that the ChecksumError is not wrapped by + * another exception. If it is, then the ChecksumError would not be + * caught. + */ + // log the error. + try { + log.error(e + " : addr=" + toString(addr), e); + } catch (Throwable ignored) { + // ignore error in logging system. + } + // update the performance counters. + final StoreCounters<?> c = (StoreCounters<?>) m_store.getStoreCounters() + .acquire(); + try { + c.checksumErrorCount++; + } finally { + c.release(); + } + if (m_quorum != null && m_quorum.isHighlyAvailable()) { + if (m_quorum.isQuorumMet()) { + try { + // Read on another node in the quorum. + final byte[] a = ((QuorumRead<?>) m_quorum.getMember()) + .readFromQuorum(m_uuid, addr); + return ByteBuffer.wrap(a); + } catch (Throwable t) { + throw new RuntimeException("While handling: " + e, t); + } + } + } + // Otherwise rethrow the checksum error. + throw e; + } - if (rwaddr == 0L || sze == 0) { - throw new IllegalArgumentException(); - } - - /** - * Allocate buffer to include checksum to allow single read - * but then return ByteBuffer excluding those bytes - */ - final byte buf[] = new byte[sze+4]; // 4 bytes for checksum - - m_store.getData(rwaddr, buf); - - return ByteBuffer.wrap(buf, 0, sze); - } public long write(final ByteBuffer data) { @@ -145,11 +177,9 @@ public long write(final ByteBuffer data, final IAllocationContext context) { - if (!isOpen()) - throw new IllegalStateException(); - - if (data == null) - throw new IllegalArgumentException(); + if (data == null) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BUFFER_NULL); if (data.hasArray() && data.arrayOffset() != 0) { /* @@ -166,7 +196,8 @@ final int nbytes = data.remaining(); if (nbytes == 0) - throw new IllegalArgumentException(); + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BUFFER_EMPTY); final long rwaddr = m_store.alloc(data.array(), nbytes, context); @@ -208,9 +239,18 @@ * this data, and if not free immediately, otherwise defer. */ public void delete(final long addr, final IAllocationContext context) { + + final int rwaddr = decodeAddr(addr); - final int rwaddr = decodeAddr(addr); final int sze = decodeSize(addr); + + if (rwaddr == 0L) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_ADDRESS_IS_NULL); + + if (sze == 0) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BAD_RECORD_SIZE); m_store.free(rwaddr, sze, context); @@ -316,10 +356,10 @@ private void assertOpen() { if (!m_store.isOpen()) - throw new IllegalStateException(); - + throw new IllegalStateException(AbstractBufferStrategy.ERR_NOT_OPEN); + } - + public void close() { // throw exception if open per the API. @@ -332,7 +372,7 @@ public void deleteResources() { if (m_store.isOpen()) - throw new IllegalStateException(); + throw new IllegalStateException(AbstractBufferStrategy.ERR_OPEN); final File file = m_store.getStoreFile(); @@ -534,25 +574,39 @@ /* * IHABufferStrategy */ - - /** - * Operation is not supported. - */ - public void writeRawBuffer(HAWriteMessage msg, ByteBuffer b) + + public void writeRawBuffer(final HAWriteMessage msg, final ByteBuffer b) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); + m_store.writeRawBuffer(msg, b); } - /** - * Operation is not supported. - */ public ByteBuffer readFromLocalStore(final long addr) throws InterruptedException { - throw new UnsupportedOperationException(); + final int rwaddr = decodeAddr(addr); + final int sze = decodeSize(addr); + + if (rwaddr == 0L) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_ADDRESS_IS_NULL); + + if (sze == 0) + throw new IllegalArgumentException( + AbstractBufferStrategy.ERR_BAD_RECORD_SIZE); + + /** + * Allocate buffer to include checksum to allow single read but then + * return ByteBuffer excluding those bytes + */ + final byte buf[] = new byte[sze + 4]; // 4 bytes for checksum + + m_store.getData(rwaddr, buf); + + return ByteBuffer.wrap(buf, 0, sze); + } /** Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -2231,9 +2231,6 @@ } - /** - * Extend file if required for HAWriteMessage - just call through to truncate - */ public void setExtentForLocalStore(final long extent) throws IOException, InterruptedException { Modified: branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 11:04:52 UTC (rev 3886) +++ branches/JOURNAL_HA_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2010-11-04 12:45:18 UTC (rev 3887) @@ -59,6 +59,8 @@ import com.bigdata.io.IReopenChannel; import com.bigdata.io.writecache.BufferedWrite; import com.bigdata.io.writecache.WriteCache; +import com.bigdata.io.writecache.WriteCacheService; +import com.bigdata.journal.AbstractBufferStrategy; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.CommitRecordIndex; import com.bigdata.journal.CommitRecordSerializer; @@ -71,7 +73,7 @@ import com.bigdata.journal.Options; import com.bigdata.journal.RootBlockUtility; import com.bigdata.journal.RootBlockView; -import com.bigdata.journal.WORMStrategy.StoreCounters; +import com.bigdata.journal.ha.HAWriteMessage; import com.bigdata.quorum.Quorum; import com.bigdata.rawstore.IRawStore; import com.bigdata.util.ChecksumUtility; @@ -376,31 +378,23 @@ } - /* - * FIXME Update counters when writing on the disk. + /** + * {@inheritDoc} + * <p> + * Note: The performance counters for writes to the disk are reported by + * the {@link WriteCacheService}. The {@link RWStore} never writes + * directly onto the disk (other than the root blocks). */ @Override protected boolean writeOnChannel(final ByteBuffer data, final long firstOffsetignored, final Map<Long, RecordMetadata> recordMap, final long nanos) throws InterruptedException, IOException { -// final long begin = System.nanoTime(); final Lock readLock = m_extensionLock.readLock(); readLock.lock(); try { boolean ret = super.writeOnChannel(data, firstOffsetignored, recordMap, nanos); -// // Update counters. -// final long elapsed = (System.nanoTime() - begin); -// final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() -// .acquire(); -// try { -// c.ndiskWrite += nwrites; -// c.bytesWrittenOnDisk += nbytes; -// c.elapsedDiskWriteNanos += elapsed; -// } finally { -// c.release(); -// } return ret; } finally { readLock.unlock(); @@ -416,14 +410,6 @@ }; -// private String m_filename; - -// private final FileMetadataView m_fmv; - -// private volatile IRootBlockView m_rb; - -// volatile private long m_commitCounter; - volatile private int m_metaBitsAddr; /** @@ -571,8 +557,10 @@ } private void assertOpen() { - if(!m_open) - throw new IllegalStateException(); + + if (!m_open) + throw new IllegalStateException(AbstractBufferStrategy.ERR_NOT_OPEN); + } synchronized public void close() { @@ -987,7 +975,9 @@ * address of the BlobHeader record. */ public void getData(final long addr, final byte buf[]) { - getData(addr, buf, 0, buf.length); + + getData(addr, buf, 0, buf.length); + } public void getData(final long addr, final byte buf[], final int offset, @@ -999,6 +989,8 @@ return; } + final long begin = System.nanoTime(); + final Lock readLock = m_extensionLock.readLock(); readLock.lock(); @@ -1048,23 +1040,43 @@ } } - try { - final long paddr = physicalAddress((int) addr); - if (paddr == 0) { - assertAllocators(); - - log.warn("Address " + addr + " did not resolve to physical address"); - - throw new IllegalArgumentException("Address " + addr + " did not resolve to physical address"); + { + final StoreCounters<?> storeCounters = (StoreCounters<?>) this.storeCounters + .get().acquire(); + try { + final int nbytes = length; + if (nbytes > storeCounters.maxReadSize) { + storeCounters.maxReadSize = nbytes; + } + } finally { + storeCounters.release(); + } + } + + try { + + final long paddr = physicalAddress((int) addr); + + if (paddr == 0) { + + assertAllocators(); + + final String msg = "Address did not resolve to physical address: " + + addr; + + log.warn(msg); + + throw new IllegalArgumentException(msg); + } - /** - * Check WriteCache first - * - * Note that the buffer passed in should include the checksum - * value, so the cached data is 4 bytes less than the - * buffer size. - */ + /** + * Check WriteCache first + * + * Note that the buffer passed in should include the checksum + * value, so the cached data is 4 bytes less than the buffer + * size. + */ final ByteBuffer bbuf; try { bbuf = m_writeCache.read(paddr); @@ -1089,7 +1101,24 @@ buf[offset+i] = in[i]; } m_cacheReads++; + /* + * Hit on the write cache. + * + * Update the store counters. + */ + final StoreCounters<?> c = (StoreCounters<?>) storeCounters + .get().acquire(); + try { + final int nbytes = length; + c.nreads++; + c.bytesRead += nbytes; + c.elapsedReadNanos += (System.nanoTime() - begin); + } finally { + c.release(); + } } else { + // Read through to the disk. + final long beginDisk = System.nanoTime(); // If checksum is required then the buffer should be sized to include checksum in final 4 bytes final ByteBuffer bb = ByteBuffer.wrap(buf, offset, length); FileChannelUtility.readAll(m_reopener, bb, paddr); @@ -1111,6 +1140,19 @@ } m_diskReads++; + // Update counters. + final StoreCounters<?> c = (StoreCounters<?>) storeCounters.get() + .acquire(); + try { + final int nbytes = length; + c.nreads++; + c.bytesRead += nbytes; + c.bytesReadFromDisk += nbytes; + c.elapsedReadNanos += (System.nanoTime() - begin); + c.elapsedDiskReadNanos += (System.nanoTime() - beginDisk); + } finally { + c.release(); + } } } catch (Throwable e) { log.error(e,e); @@ -3232,10 +3274,14 @@ return tmp; } - + /** * Striped performance counters for {@link IRawStore} access, including * operations that read or write through to the underlying media. + * <p> + * Note: The performance counters for writes to the disk are reported by the + * {@link WriteCacheService}. The {@link RWStore} never writes directly onto + * the disk (other than the root blocks). * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> @@ -3243,8 +3289,8 @@ * * @todo report elapsed time and average latency for force, reopen, and * writeRootBlock. - * - * FIXME CAT may be much faster than striped locks (2-3x faster). + * + * FIXME CAT may be much faster than striped locks (2-3x faster). */ static public class StoreCounters<T extends StoreCounters<T>> extends StripedCounters<T> { @@ -3289,10 +3335,11 @@ */ public volatile long nwrites; - /** - * #of write requests that write through to the backing file. - */ - public volatile long ndiskWrite; + // This is reported by the WriteCacheService. +// /** +// * #of write requests that write through to the backing file. +// */ +// public volatile long ndiskWrite; /** * The size of the largest record read. @@ -3308,21 +3355,23 @@ * #of bytes written. */ public volatile long bytesWritten; + + // This is reported by the WriteCacheService. +// /** +// * #of bytes that have been written on the disk. +// */ +// public volatile long bytesWrittenOnDisk; /** - * #of bytes that have been written on the disk. - */ - public volatile long bytesWrittenOnDisk; - - /** * Total elapsed time for writes. */ public volatile long elapsedWriteNanos; - /** - * Total elapsed time for writing on the disk. - */ - public volatile long elapsedDiskWriteNanos; + // This is reported by the WriteCacheService. +// /** +// * Total elapsed time for writing on the disk. +// */ +// public volatile long elapsedDiskWriteNanos; /** * #of times the data were forced to the disk. @@ -3381,12 +3430,12 @@ checksumErrorCount += o.checksumErrorCount; nwrites += o.nwrites; - ndiskWrite += o.ndiskWrite; +// ndiskWrite += o.ndiskWrite; maxWriteSize = Math.max(maxWriteSize, o.maxWriteSize); bytesWritten += o.bytesWritten; - bytesWrittenOnDisk += o.bytesWrittenOnDisk; +// bytesWrittenOnDisk += o.bytesWrittenOnDisk; elapsedWriteNanos += o.elapsedWriteNanos; - elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; +// elapsedDiskWriteNanos += o.elapsedDiskWriteNanos; nforce += o.nforce; ntruncate += o.ntruncate; @@ -3412,12 +3461,12 @@ t.checksumErrorCount -= o.checksumErrorCount; t.nwrites -= o.nwrites; - t.ndiskWrite -= o.ndiskWrite; +// t.ndiskWrite -= o.ndiskWrite; t.maxWriteSize -= o.maxWriteSize; // @todo report max? min? t.bytesWritten -= o.bytesWritten; - t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; +// t.bytesWrittenOnDisk -= o.bytesWrittenOnDisk; t.elapsedWriteNanos -= o.elapsedWriteNanos; - t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; +// t.elapsedDiskWriteNanos -= o.elapsedDiskWriteNanos; t.nforce -= o.nforce; t.ntruncate -= o.ntruncate; @@ -3442,12 +3491,12 @@ checksumErrorCount = 0; nwrites = 0; - ndiskWrite = 0; +// ndiskWrite = 0; maxWriteSize = 0; bytesWritten = 0; - bytesWrittenOnDisk = 0; +// bytesWrittenOnDisk = 0; elapsedWriteNanos = 0; - elapsedDiskWriteNanos = 0; +// elapsedDiskWriteNanos = 0; nforce = 0; ntruncate = 0; @@ -3605,51 +3654,51 @@ * write */ - disk.addCounter("nwrites", new Instrument<Long>() { - public void sample() { - setValue(ndiskWrite); - } - }); +// disk.addCounter("nwrites", new Instrument<Long>() { +// public void sample() { +// setValue(ndiskWrite); +// } +// }); +// +// disk.addCounter("bytesWritten", new Instrument<Long>() { +// public void sample() { +// setValue(bytesWrittenOnDisk); +// } +// }); +// +// disk.addCounter("bytesPerWrite", new Instrument<Double>() { +// public void sample() { +// final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d +// : (bytesWrittenOnDisk / (double) ndiskWrite)); +// setValue(bytesPerDiskWrite); +// } +// }); +// +// disk.addCounter("writeSecs", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// setValue(diskWriteSecs); +// } +// }); +// +// disk.addCounter("bytesWrittenPerSec", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d +// : bytesWrittenOnDisk / diskWriteSecs); +// setValue(bytesWrittenPerSec); +// } +// }); +// +// disk.addCounter("secsPerWrite", new Instrument<Double>() { +// public void sample() { +// final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); +// final double writeLatency = (diskWriteSecs == 0 ? 0d +// : diskWriteSecs / ndiskWrite); +// setValue(writeLatency); +// } +// }); - disk.addCounter("bytesWritten", new Instrument<Long>() { - public void sample() { - setValue(bytesWrittenOnDisk); - } - }); - - disk.addCounter("bytesPerWrite", new Instrument<Double>() { - public void sample() { - final double bytesPerDiskWrite = (ndiskWrite == 0 ? 0d - : (bytesWrittenOnDisk / (double) ndiskWrite)); - setValue(bytesPerDiskWrite); - } - }); - - disk.addCounter("writeSecs", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - setValue(diskWriteSecs); - } - }); - - disk.addCounter("bytesWrittenPerSec", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double bytesWrittenPerSec = (diskWriteSecs == 0L ? 0d - : bytesWrittenOnDisk / diskWriteSecs); - setValue(bytesWrittenPerSec); - } - }); - - disk.addCounter("secsPerWrite", new Instrument<Double>() { - public void sample() { - final double diskWriteSecs = (elapsedDiskWriteNanos / 1000000000.); - final double writeLatency = (diskWriteSecs == 0 ? 0d - : diskWriteSecs / ndiskWrite); - setValue(writeLatency); - } - }); - /* * other */ @@ -3752,4 +3801,12 @@ } + public void writeRawBuffer(final HAWriteMessage msg, final ByteBuffer b) + throws IOException, InterruptedException { + + m_writeCache.newWriteCache(b, true/* useChecksums */, + true/* bufferHasData */, m_reopener).flush(false/* force */); + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |