From: Bryan T. <tho...@us...> - 2007-02-15 22:01:28
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/journal In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv24236/src/java/com/bigdata/journal Modified Files: DiskBackedBufferStrategy.java IBufferStrategy.java Tx.java BasicBufferStrategy.java DirectBufferStrategy.java DiskOnlyStrategy.java AbstractBufferStrategy.java TemporaryStore.java Added Files: IDiskBasedStrategy.java Log Message: Modified the IndexSegmentBuilder to use the new TemporaryStore and removed the fullyBuffer boolean option since data will be automatically buffered out to 100M and the spill over onto disk. Index: DiskOnlyStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/DiskOnlyStrategy.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** DiskOnlyStrategy.java 15 Feb 2007 20:59:21 -0000 1.16 --- DiskOnlyStrategy.java 15 Feb 2007 22:01:18 -0000 1.17 *************** *** 25,29 **** * nextOffset (i.e., has written all data that is dirty on the buffer). */ ! public class DiskOnlyStrategy extends AbstractBufferStrategy { /** --- 25,30 ---- * nextOffset (i.e., has written all data that is dirty on the buffer). */ ! public class DiskOnlyStrategy extends AbstractBufferStrategy implements ! IDiskBasedStrategy { /** *************** *** 61,76 **** private boolean open; ! public File getFile() { ! return file; } ! // public FileChannel getFileChannel() { ! // ! // return channel; ! // ! // } DiskOnlyStrategy(long maximumExtent, FileMetadata fileMetadata) { --- 62,83 ---- private boolean open; ! final public int getHeaderSize() { ! return headerSize; } ! final public File getFile() { ! ! return file; ! ! } + final public RandomAccessFile getRandomAccessFile() { + + return raf; + + } + DiskOnlyStrategy(long maximumExtent, FileMetadata fileMetadata) { *************** *** 377,381 **** force(true); ! System.err.println("Disk file: newLength="+newExtent); } catch(IOException ex) { --- 384,388 ---- force(true); ! System.err.println("Disk file: newLength="+cf.format(newExtent)); } catch(IOException ex) { *************** *** 391,393 **** --- 398,406 ---- } + public long transferTo(RandomAccessFile out) throws IOException { + + return super.transferFromDiskTo(this, out); + + } + } Index: AbstractBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/AbstractBufferStrategy.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** AbstractBufferStrategy.java 15 Feb 2007 20:59:21 -0000 1.9 --- AbstractBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.10 *************** *** 1,5 **** --- 1,10 ---- package com.bigdata.journal; + import java.io.IOException; + import java.io.RandomAccessFile; import java.nio.ByteBuffer; + import java.nio.channels.FileChannel; + import java.text.Format; + import java.text.NumberFormat; import com.bigdata.rawstore.Addr; *************** *** 40,43 **** --- 45,58 ---- protected int nextOffset; + static final NumberFormat cf; + + static { + + cf = NumberFormat.getIntegerInstance(); + + cf.setGroupingUsed(true); + + } + final public long getInitialExtent() { *************** *** 179,181 **** --- 194,301 ---- } + /** + * Helper method used by {@link DiskBackedBufferStrategy} and + * {@link DiskOnlyStrategy} to implement + * {@link IBufferStrategy#transferTo(RandomAccessFile)} + * + * @param src + * The source. + * @param out + * The output file. + * + * @return The #of bytes transferred. + * + * @throws IOException + */ + protected long transferFromDiskTo(IDiskBasedStrategy src,RandomAccessFile out) throws IOException { + + final long begin = System.currentTimeMillis(); + + // #of bytes to transfer. + final long count = src.getNextOffset(); + + // the output channel. + final FileChannel outChannel = out.getChannel(); + + // current position on the output channel. + final long toPosition = outChannel.position(); + + if(toPosition + count > Integer.MAX_VALUE) { + + throw new IOException("Index segment exceeds int32 bytes."); + + } + + /* + * Transfer data from channel to channel. + */ + + final FileChannel tmpChannel = src.getRandomAccessFile().getChannel(); + + /* + * Set the fromPosition on source channel. We want everything after the + * file header. + */ + tmpChannel.position(src.getHeaderSize()); + + /* + * Extend the output file. This is required at least for some + * circumstances. + */ + out.setLength(toPosition+count); + + /* + * Transfer the data. It is possible that this will take multiple + * writes for at least some implementations. + */ + + // System.err.println("fromPosition="+tmpChannel.position()+", toPosition="+toPosition+", count="+count); + + int nwrites = 0; // #of write operations. + + { + + long n = count; + + long to = toPosition; + + while (n > 0) { + + long nxfer = outChannel.transferFrom(tmpChannel, to, n); + + to += nxfer; + + n -= nxfer; + + nwrites++; + + // // Verify transfer is complete. + // if (nxfer != count) { + // + // throw new IOException("Expected to transfer " + count + // + ", but transferred " + nxfer); + // + // } + + } + + } + + /* + * Update the position on the output channel since transferFrom does + * NOT do this itself. + */ + outChannel.position(toPosition+count); + + final long elapsed = System.currentTimeMillis() - begin; + + System.err.println("\nTransferred " + count + + " bytes from disk channel to disk channel (offset=" + + toPosition + ") in " + nwrites + " writes and " + elapsed + + "ms"); + + return count; + + } + } Index: Tx.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/Tx.java,v retrieving revision 1.27 retrieving revision 1.28 diff -C2 -d -r1.27 -r1.28 *** Tx.java 15 Feb 2007 20:59:21 -0000 1.27 --- Tx.java 15 Feb 2007 22:01:18 -0000 1.28 *************** *** 85,100 **** * concurrently. We do not even need a read-lock on the indices isolated * by the transaction since they are read-only. This might prove to be a ! * nice way to leverage multiple processors / cores on a data server. * ! * @todo support {@link PartitionedIndex}es. * ! * @todo Make the {@link IsolatedBTree}s safe across {@link Journal#overflow()} ! * events. When {@link PartitionedIndex}es are used this adds a ! * requirement for tracking which {@link IndexSegment}s and ! * {@link Journal}s are required to support the {@link IsolatedBTree}. ! * Deletes of old journals and index segments must be deferred until no * transaction remains which can read those data. This metadata must be * restart-safe so that resources are eventually deleted. On restart, ! * active transactions will abort and their resources may be released. * There is also a requirement for quickly locating the specific journal * and index segments required to support isolation of an index. This --- 85,103 ---- * concurrently. We do not even need a read-lock on the indices isolated * by the transaction since they are read-only. This might prove to be a ! * nice way to leverage multiple processors / cores on a data server. The ! * size limit on the transaction write set is currently 2G, but the ! * transaction will run in memory up to 100M. * ! * @todo Support transactions where the indices isolated by the transactions are ! * {@link PartitionedIndex}es. * ! * @todo Track which {@link IndexSegment}s and {@link Journal}s are required ! * to support the {@link IsolatedBTree}s in use by a {@link Tx}. Deletes ! * of old journals and index segments MUST be deferred until no * transaction remains which can read those data. This metadata must be * restart-safe so that resources are eventually deleted. On restart, ! * active transactions will have been discarded abort and their resources ! * released. (Do we need a restart-safe means to indicate the set of ! * running transactions?)<br> * There is also a requirement for quickly locating the specific journal * and index segments required to support isolation of an index. This *************** *** 103,107 **** * far) as well as an index into the named indices index -- perhaps simply * an index by timestamp into the root addresses (or whole root block ! * views). * * @todo The various public methods on this API that have {@link RunState} --- 106,111 ---- * far) as well as an index into the named indices index -- perhaps simply * an index by timestamp into the root addresses (or whole root block ! * views, or moving the root addresses out of the root block and into the ! * store with only the address of the root addresses in the root block). * * @todo The various public methods on this API that have {@link RunState} *************** *** 127,139 **** final static String IS_COMPLETE = "Transaction is complete"; ! /* ! * */ ! final private Journal journal; /** * The timestamp assigned to this transaction. */ ! final private long timestamp; /** --- 131,144 ---- final static String IS_COMPLETE = "Transaction is complete"; ! /** ! * The transaction uses the {@link Journal} for some handshaking in the ! * commit protocol and to locate the named indices that it isolates. */ ! final protected Journal journal; /** * The timestamp assigned to this transaction. */ ! final protected long timestamp; /** *************** *** 141,145 **** * object was created. */ ! final private long commitCounter; private RunState runState; --- 146,150 ---- * object was created. */ ! final protected long commitCounter; private RunState runState; *************** *** 157,168 **** * time it is invoked. */ ! final private IRawStore tmpStore = new TemporaryStore(); /** * BTrees isolated by this transactions. - * - * @todo in order to survive overflow this mapping must be persistent. */ ! private Map<String,IsolatedBTree> btrees = new HashMap<String,IsolatedBTree>(); /** --- 162,171 ---- * time it is invoked. */ ! final protected IRawStore tmpStore = new TemporaryStore(); /** * BTrees isolated by this transactions. */ ! private Map<String, IsolatedBTree> btrees = new HashMap<String, IsolatedBTree>(); /** Index: TemporaryStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/TemporaryStore.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TemporaryStore.java 15 Feb 2007 20:59:21 -0000 1.1 --- TemporaryStore.java 15 Feb 2007 22:01:18 -0000 1.2 *************** *** 280,306 **** } - /** - * Return a temporary file for use by the store. The file will be - * automatically deleted if the JVM exits. - * - * @return A temporary file for use by the store. - */ - protected File getTempFile() { - - try { - - File file = File.createTempFile("transientOverflow", ".store"); - - file.deleteOnExit(); - - return file; - - } catch(IOException ex) { - - throw new RuntimeException(ex); - - } - - } - } --- 280,282 ---- Index: DiskBackedBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/DiskBackedBufferStrategy.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** DiskBackedBufferStrategy.java 8 Feb 2007 21:32:10 -0000 1.8 --- DiskBackedBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.9 *************** *** 18,22 **** * @version $Id$ */ ! abstract public class DiskBackedBufferStrategy extends BasicBufferStrategy { /** --- 18,23 ---- * @version $Id$ */ ! abstract public class DiskBackedBufferStrategy extends BasicBufferStrategy ! implements IDiskBasedStrategy { /** *************** *** 35,39 **** private boolean open = false; ! public boolean isOpen() { return open; --- 36,58 ---- private boolean open = false; ! final public int getHeaderSize() { ! ! return headerSize; ! ! } ! ! final public File getFile() { ! ! return file; ! ! } ! ! final public RandomAccessFile getRandomAccessFile() { ! ! return raf; ! ! } ! ! final public boolean isOpen() { return open; *************** *** 41,45 **** } ! public boolean isStable() { return true; --- 60,64 ---- } ! final public boolean isStable() { return true; *************** *** 148,150 **** --- 167,175 ---- } + public long transferTo(RandomAccessFile out) throws IOException { + + return super.transferFromDiskTo(this, out); + + } + } Index: DirectBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/DirectBufferStrategy.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** DirectBufferStrategy.java 9 Feb 2007 16:13:18 -0000 1.10 --- DirectBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.11 *************** *** 104,108 **** force(true); ! System.err.println("Disk file: newLength="+newExtent); } catch(IOException ex) { --- 104,108 ---- force(true); ! System.err.println("Disk file: newLength="+cf.format(newExtent)); } catch(IOException ex) { --- NEW FILE: IDiskBasedStrategy.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 15, 2007 */ package com.bigdata.journal; import java.io.File; import java.io.RandomAccessFile; /** * An interface for implementations backed by a file on disk. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IDiskBasedStrategy extends IBufferStrategy { /** * The size of the file header in bytes. */ public int getHeaderSize(); /** * The backing file. */ public File getFile(); /** * The object used to read and write on that file. */ public RandomAccessFile getRandomAccessFile(); } Index: IBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/IBufferStrategy.java,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -d -r1.13 -r1.14 *** IBufferStrategy.java 8 Feb 2007 21:32:10 -0000 1.13 --- IBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.14 *************** *** 1,4 **** --- 1,7 ---- package com.bigdata.journal; + import java.io.IOException; + import java.io.RandomAccessFile; + import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.IRawStore; *************** *** 92,94 **** --- 95,112 ---- ForceEnum forceOnCommitEnum); + /** + * A block operation that transfers the serialized records (aka the written + * on portion of the user extent) en mass from the buffer onto an output + * file. The buffered records are written "in order" starting at the current + * position on the output file. The file is grown if necessary. + * + * @param out + * The file to which the buffer contents will be transferred. + * + * @return The #of bytes written. + * + * @throws IOException + */ + public long transferTo(RandomAccessFile out) throws IOException; + } Index: BasicBufferStrategy.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/journal/BasicBufferStrategy.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** BasicBufferStrategy.java 15 Feb 2007 20:59:21 -0000 1.14 --- BasicBufferStrategy.java 15 Feb 2007 22:01:18 -0000 1.15 *************** *** 1,5 **** --- 1,8 ---- package com.bigdata.journal; + import java.io.IOException; + import java.io.RandomAccessFile; import java.nio.ByteBuffer; + import java.nio.channels.FileChannel; import com.bigdata.rawstore.Addr; *************** *** 211,217 **** userExtent = newUserExtent; ! System.err.println("Buffer: newCapacity="+newCapacity); } } --- 214,266 ---- userExtent = newUserExtent; ! System.err.println("Buffer: newCapacity=" + cf.format(newCapacity)); } + public long transferTo(RandomAccessFile out) throws IOException { + + long count = nextOffset; + + final FileChannel outChannel = out.getChannel(); + + // current position on the output channel. + final long toPosition = outChannel.position(); + + if(toPosition + count > Integer.MAX_VALUE) { + + throw new IOException("Index segment exceeds int32 bytes."); + + } + + /* + * use a single nio operation to write all the data onto the output + * channel. + */ + + final long begin = System.currentTimeMillis(); + + // setup the buffer for the operation. + directBuffer.limit(nextOffset); + directBuffer.position(0); + + // write the data. + final long nwritten = outChannel.write(directBuffer); + + if( nwritten != count ) { + + throw new AssertionError("Expected to write " + count + + " bytes but wrote " + nwritten); + + } + + final long elapsed = System.currentTimeMillis() - begin; + + System.err.println("\nTransferred " + count + + " bytes from memory to disk at offset=" + toPosition + " in " + + elapsed + "ms"); + + return count; + + } + } |