From: Bryan T. <tho...@us...> - 2007-02-15 22:01:28
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv24236/src/java/com/bigdata/objndx Modified Files: IndexSegmentBuilder.java Log Message: Modified the IndexSegmentBuilder to use the new TemporaryStore and removed the fullyBuffer boolean option since data will be automatically buffered out to 100M and the spill over onto disk. Index: IndexSegmentBuilder.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentBuilder.java,v retrieving revision 1.23 retrieving revision 1.24 diff -C2 -d -r1.23 -r1.24 *** IndexSegmentBuilder.java 9 Feb 2007 16:13:18 -0000 1.23 --- IndexSegmentBuilder.java 15 Feb 2007 22:01:18 -0000 1.24 *************** *** 52,56 **** import java.io.ByteArrayOutputStream; import java.io.File; - import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; --- 52,55 ---- *************** *** 65,74 **** import com.bigdata.journal.Journal; import com.bigdata.objndx.IndexSegment.CustomAddressSerializer; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; - import com.bigdata.rawstore.SimpleFileRawStore; - import com.bigdata.rawstore.SimpleMemoryRawStore; /** --- 64,72 ---- import com.bigdata.journal.Journal; + import com.bigdata.journal.TemporaryStore; import com.bigdata.objndx.IndexSegment.CustomAddressSerializer; import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; import com.bigdata.rawstore.IRawStore; /** *************** *** 157,175 **** /** - * The temporary file created to hold leaves unless the index build - * operation is fully buffered. When created, this file is deleted - * regardless of the outcome of the operation. - */ - public final File leafFile; - - /** - * The temporary file created to hold nodes unless either (a) the index - * build operation is fully buffered; or (b) the index segment will consist - * of just a root leaf. When created, this file is deleted regardless of the - * outcome of the operation. - */ - public final File nodeFile; - - /** * The file on which the {@link IndexSegment} is written. The file is closed * regardless of the outcome of the operation. --- 155,158 ---- *************** *** 181,185 **** * a region of the {@link #outFile}. */ ! protected Buffer leafBuffer; /** --- 164,168 ---- * a region of the {@link #outFile}. */ ! protected TemporaryStore leafBuffer; /** *************** *** 187,191 **** * a region of the {@link #outFile}. */ ! protected Buffer nodeBuffer; /** --- 170,174 ---- * a region of the {@link #outFile}. */ ! protected TemporaryStore nodeBuffer; /** *************** *** 364,372 **** * @throws IOException * ! * @todo make fullyBuffer, checksum, and record compression parameters in ! * this constructor variant * ! * FIXME test with and without each of these options {fullyBuffer, ! * useChecksum, recordCompressor}. */ public IndexSegmentBuilder(File outFile, File tmpDir, AbstractBTree btree, --- 347,355 ---- * @throws IOException * ! * @todo make checksum, and record compression parameters in this ! * constructor variant * ! * FIXME test with and without each of these options { useChecksum, ! * recordCompressor}. */ public IndexSegmentBuilder(File outFile, File tmpDir, AbstractBTree btree, *************** *** 375,380 **** this(outFile, tmpDir, btree.getEntryCount(), btree.entryIterator(), m, ! btree.nodeSer.valueSerializer, true/* fullyBuffer */, ! false/* useChecksum */, null/*new RecordCompressor()*/, errorRate); } --- 358,363 ---- this(outFile, tmpDir, btree.getEntryCount(), btree.entryIterator(), m, ! btree.nodeSer.valueSerializer, false/* useChecksum */, ! null/* new RecordCompressor() */, errorRate); } *************** *** 403,414 **** * @param valueSerializer * Used to serialize values in the new {@link IndexSegment}. - * @param fullyBuffer - * When true the nodes and leaves will be serialized into memory - * until the build is complete and then batched onto disk - * (faster, but more memory overhead). When false a temporary - * file will be used to store the nodes and the leaves will be - * written to disk as they are populated (slower, but less memory - * overhead). If latency is a concern then specify - * <code>fullyBuffer := true</code>. * @param useChecksum * whether or not checksums are computed for nodes and leaves. --- 386,389 ---- *************** *** 436,440 **** public IndexSegmentBuilder(File outFile, File tmpDir, final int entryCount, IEntryIterator entryIterator, final int m, ! IValueSerializer valueSerializer, boolean fullyBuffer, boolean useChecksum, RecordCompressor recordCompressor, final double errorRate) throws IOException { --- 411,415 ---- public IndexSegmentBuilder(File outFile, File tmpDir, final int entryCount, IEntryIterator entryIterator, final int m, ! IValueSerializer valueSerializer, boolean useChecksum, RecordCompressor recordCompressor, final double errorRate) throws IOException { *************** *** 546,563 **** } - /* - * the temporary file used to buffer leaves if the index build operation - * is not fully buffered. - */ - leafFile = (!fullyBuffer ? File.createTempFile("index", - ".leaves.seg", tmpDir) : null); - - /* - * the temporary file is used if there are nodes to write and the build - * operation is not fully buffered. - */ - nodeFile = (plan.nnodes > 0 && !fullyBuffer ? File.createTempFile( - "index", ".nodes.seg", tmpDir) : null); - final FileChannel outChannel; --- 521,524 ---- *************** *** 588,593 **** * index build operation. */ ! leafBuffer = fullyBuffer ? new MemoryBuffer() : new FileBuffer( ! leafFile, mode); /* --- 549,553 ---- * index build operation. */ ! leafBuffer = new TemporaryStore(); /* *************** *** 599,604 **** * abstraction for a disk file. */ ! nodeBuffer = plan.nnodes > 0 && fullyBuffer ? new MemoryBuffer() ! : plan.nnodes > 0 ? new FileBuffer(nodeFile, mode) : null; /* --- 559,563 ---- * abstraction for a disk file. */ ! nodeBuffer = plan.nnodes > 0 ? new TemporaryStore() : null; /* *************** *** 799,812 **** if (leafBuffer != null && leafBuffer.isOpen()) { try { ! leafBuffer.close(); } catch (Throwable t) { } } - if (leafFile != null) { - if(!leafFile.delete()) { - log.warn("Could not delete temporary file: " - + leafFile.getAbsoluteFile()); - } - } /* --- 758,765 ---- if (leafBuffer != null && leafBuffer.isOpen()) { try { ! leafBuffer.close(); // also deletes the file if any. } catch (Throwable t) { } } /* *************** *** 815,828 **** if (nodeBuffer != null && nodeBuffer.isOpen()) { try { ! nodeBuffer.close(); } catch (Throwable t) { } } - if (nodeFile != null) { - if(!nodeFile.delete()) { - log.warn("Could not delete temporary file: " - + nodeFile.getAbsoluteFile()); - } - } } --- 768,775 ---- if (nodeBuffer != null && nodeBuffer.isOpen()) { try { ! nodeBuffer.close(); // also deletes the file if any. } catch (Throwable t) { } } } *************** *** 1318,1322 **** // Transfer the leaf buffer en mass onto the output channel. ! long count = leafBuffer.transferTo(out); // The offset to the start of the node region. --- 1265,1269 ---- // Transfer the leaf buffer en mass onto the output channel. ! long count = leafBuffer.getBufferStrategy().transferTo(out); // The offset to the start of the node region. *************** *** 1341,1345 **** // transfer the nodes en mass onto the output channel. ! long count = nodeBuffer.transferTo(out); // Close the buffer. --- 1288,1292 ---- // transfer the nodes en mass onto the output channel. ! long count = nodeBuffer.getBufferStrategy().transferTo(out); // Close the buffer. *************** *** 1717,1947 **** } - - /** - * An interface that abstracts the buffering of nodes or leaves. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - static interface Buffer extends IRawStore { - - /** - * A block operation that transfers the serialized records en mass from - * the buffer onto a file. The buffered records are written "in order" - * starting at the current position on the file. The file is grow if - * necessary. - * - * @param out - * The file to which the buffer contents will be transferred. - * - * @return The #of bytes written. - * - * @throws IOException - */ - public long transferTo(RandomAccessFile out) throws IOException; - - } - - /** - * Note: the comments below really apply to the base - * {@link SimpleFileRawStore} class. - * - * @todo consider the file mode and buffering. We should at least buffer - * several pages of data per write and can experiment with writing - * through (vs caching writes in the OS layer). The file does not need - * to be "live" until it is completely written, so there is no need to - * update file metadata until the end of the build process. - * - * @todo Consider using - * {@link FileOutputStream#FileOutputStream(File, boolean)} to open - * the temporary file in an append only mode and then get the - * {@link FileChannel} from {@link FileOutputStream#getChannel()}. - * Does this improve performance? Can we still read from the channel? - * Try this on the {@link Journal} as well, at least for cases where - * we will never read from the journal (i.e., fully buffered). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - static class FileBuffer extends SimpleFileRawStore implements Buffer { - - public FileBuffer(File file, String mode) throws IOException { - - super(file,mode); - - } - - // public long write(ByteBuffer data) { - // - // System.err.println("\nwriting "+data.remaining()+" bytes on buffer."); - // - // return super.write(data); - // - // } - - public long transferTo(RandomAccessFile out) throws IOException { - - final long begin = System.currentTimeMillis(); - - // #of bytes to transfer. - final long count = raf.length(); - - final FileChannel outChannel = out.getChannel(); - - // current position on the output channel. - final long toPosition = outChannel.position(); - - if(toPosition + count > Integer.MAX_VALUE) { - - throw new IOException("Index segment exceeds int32 bytes."); - - } - - /* - * Transfer data from channel to channel. - */ - - final FileChannel tmpChannel = raf.getChannel(); - - // Set the fromPosition on source channel. - tmpChannel.position(0); - - /* - * Extend the output file. This is required at least for some - * circumstances. - */ - out.setLength(toPosition+count); - - /* - * Transfer the data. It is possible that this will take multiple - * writes for at least some implementations. - */ - - // System.err.println("fromPosition="+tmpChannel.position()+", toPosition="+toPosition+", count="+count); - - int nwrites = 0; // #of write operations. - - { - - long n = count; - - long to = toPosition; - - while (n > 0) { - - long nxfer = outChannel.transferFrom(tmpChannel, to, n); - - to += nxfer; - - n -= nxfer; - - nwrites++; - - // // Verify transfer is complete. - // if (nxfer != count) { - // - // throw new IOException("Expected to transfer " + count - // + ", but transferred " + nxfer); - // - // } - - } - - } - - /* - * Update the position on the output channel since transferFrom does - * NOT do this itself. - */ - outChannel.position(toPosition+count); - - final long elapsed = System.currentTimeMillis() - begin; - - System.err.println("\nTransferred " + count - + " bytes from disk channel to disk channel (offset=" - + toPosition + ") in " + nwrites + " writes and " + elapsed - + "ms"); - - return count; - - } - - } - - /** - * Buffers in memory and bulks transfers all buffered data to the output - * channel in a single nio operation (maximum speed, but also maximum memory - * overhead). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - static class MemoryBuffer extends SimpleMemoryRawStore implements Buffer { - - // public long write(ByteBuffer data) { - // - // System.err.println("\nwriting "+data.remaining()+" bytes on buffer."); - // - // return super.write(data); - // - // } - - public long transferTo(RandomAccessFile out) throws IOException { - - long count = 0L; - - final int n = records.size(); - - ByteBuffer[] bufs = new ByteBuffer[n]; - - for( int i=0; i<n; i++) { - - final byte[] b = records.get(i); - - count += b.length; - - bufs[i] = ByteBuffer.wrap(b); - - } - - final FileChannel outChannel = out.getChannel(); - - // current position on the output channel. - final long toPosition = outChannel.position(); - - if(toPosition + count > Integer.MAX_VALUE) { - - throw new IOException("Index segment exceeds int32 bytes."); - - } - - /* - * use a single nio operation to write all the data onto the output - * channel. - */ - - final long begin = System.currentTimeMillis(); - - // write the data. - final long nwritten = outChannel.write(bufs); - - if( nwritten != count ) { - - throw new AssertionError("Expected to write " + count - + " bytes but wrote " + nwritten); - - } - - final long elapsed = System.currentTimeMillis() - begin; - - System.err.println("\nTransferred " + count + " bytes in " + n - + " records from memory to disk at offset=" + toPosition - + " in " + elapsed + "ms"); - - return count; - - } - - } } --- 1664,1667 ---- |