From: Bryan T. <tho...@us...> - 2007-03-06 20:38:21
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23960/src/java/com/bigdata/objndx Modified Files: IndexSegmentMerger.java NodeSerializer.java IndexSegmentBuilder.java BTree.java IndexSegmentFileStore.java Node.java MutableKeyBuffer.java FusedView.java Added Files: IFusedView.java Removed Files: RangeIterator.java IRangeIterator.java Log Message: Refactoring to introduce asynchronous handling of overflow events in support of a scale-up/scale-out design. Index: IndexSegmentFileStore.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentFileStore.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** IndexSegmentFileStore.java 22 Feb 2007 16:59:35 -0000 1.8 --- IndexSegmentFileStore.java 6 Mar 2007 20:38:05 -0000 1.9 *************** *** 171,174 **** --- 171,180 ---- } + + public long size() { + + return metadata.length; + + } /** Index: Node.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/Node.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -d -r1.24 -r1.25 *** Node.java 13 Feb 2007 23:01:02 -0000 1.24 --- Node.java 6 Mar 2007 20:38:05 -0000 1.25 *************** *** 1895,1898 **** --- 1895,1907 ---- * index of the children before modifying their keys. * + * @todo 85% of the use of this method is updateEntryCount(). Since that + * method is only called on update, we would do well to buffer hard + * references during descent and test the buffer in this method before + * performing a full search. Since concurrent writers are not allowed, + * we only need a single buffer whose height is the height of the tree. + * This should prove especially beneficial for larger branching factors. + * For smaller branching factors the cost might be so small as to be + * ignorable. + * * @see Leaf#merge(Leaf sibling,boolean isRightSibling) */ Index: MutableKeyBuffer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/MutableKeyBuffer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** MutableKeyBuffer.java 26 Jan 2007 02:39:23 -0000 1.1 --- MutableKeyBuffer.java 6 Mar 2007 20:38:05 -0000 1.2 *************** *** 1,8 **** package com.bigdata.objndx; - /** * A mutable implementation of {@link IKeyBuffer}. * * @todo track prefix length for mutable keys (update when first/last key are * updated). at present the node/leaf logic directly manipulates the --- 1,8 ---- package com.bigdata.objndx; /** * A mutable implementation of {@link IKeyBuffer}. * + * @todo 27% of the search cost is dealing with the prefix. * @todo track prefix length for mutable keys (update when first/last key are * updated). at present the node/leaf logic directly manipulates the Index: FusedView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/FusedView.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** FusedView.java 17 Feb 2007 21:34:21 -0000 1.5 --- FusedView.java 6 Mar 2007 20:38:05 -0000 1.6 *************** *** 63,67 **** * {@link FusedView} instances if not in a more efficient manner. */ ! public class FusedView implements IIndex { /** --- 63,67 ---- * {@link FusedView} instances if not in a more efficient manner. */ ! public class FusedView implements IIndex, IFusedView { /** Index: IndexSegmentBuilder.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentBuilder.java,v retrieving revision 1.25 retrieving revision 1.26 diff -C2 -d -r1.25 -r1.26 *** IndexSegmentBuilder.java 21 Feb 2007 20:17:21 -0000 1.25 --- IndexSegmentBuilder.java 6 Mar 2007 20:38:05 -0000 1.26 *************** *** 93,99 **** * have to buffer them in memory or have to write them onto a temporary file and * then copy them into place after the last leaf has been processed. The code ! * currently uses a temporary file for this purpose. This space demand was not ! * present in West's algorithm because it did not attempt to place the leaves ! * contiguously onto the store. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 93,99 ---- * have to buffer them in memory or have to write them onto a temporary file and * then copy them into place after the last leaf has been processed. The code ! * abstracts this decision using a {@link TemporaryRawStore} for this purpose. ! * This space demand was not present in West's algorithm because it did not ! * attempt to place the leaves contiguously onto the store. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 110,114 **** * * FIXME use the shortest separator key. ! * * @see IndexSegment * @see IndexSegmentFile --- 110,114 ---- * * FIXME use the shortest separator key. ! * * @see IndexSegment * @see IndexSegmentFile *************** *** 288,291 **** --- 288,296 ---- /** + * The data throughput rate in megabytes per second. + */ + final float mbPerSec; + + /** * <p> * Builds an index segment on the disk from a {@link BTree}. The index *************** *** 714,717 **** --- 719,726 ---- elapsed = System.currentTimeMillis() - begin; + // data rate in MB/sec. + mbPerSec = (elapsed == 0 ? 0 : md.length / Bytes.megabyte32 + / (elapsed / 1000f)); + NumberFormat cf = NumberFormat.getNumberInstance(); *************** *** 726,733 **** System.err.println("index segment build: total=" + elapsed + "ms := setup(" + elapsed_setup + "ms) + build(" ! + elapsed_build + "ms) + write(" + elapsed_write ! + "ms); " + cf.format(plan.nentries) + " entries, " + fpf.format(((double) md.length / Bytes.megabyte32)) ! + "MB"); log.info("finished: total=" + elapsed + "ms := setup(" --- 735,742 ---- System.err.println("index segment build: total=" + elapsed + "ms := setup(" + elapsed_setup + "ms) + build(" ! + elapsed_build + "ms) + write(" + elapsed_write + "ms); " ! + cf.format(plan.nentries) + " entries, " + fpf.format(((double) md.length / Bytes.megabyte32)) ! + "MB" + ", rate=" + fpf.format(mbPerSec) + "MB/sec"); log.info("finished: total=" + elapsed + "ms := setup(" *************** *** 735,739 **** + "ms) + write(" + elapsed_write + "ms); nentries=" + plan.nentries + ", branchingFactor=" + m + ", nnodes=" ! + nnodesWritten + ", nleaves=" + nleavesWritten); } catch (Throwable ex) { --- 744,751 ---- + "ms) + write(" + elapsed_write + "ms); nentries=" + plan.nentries + ", branchingFactor=" + m + ", nnodes=" ! + nnodesWritten + ", nleaves=" + nleavesWritten+ ! fpf.format(((double) md.length / Bytes.megabyte32)) ! + "MB"+", rate="+fpf.format(mbPerSec)+"MB/sec"); ! } catch (Throwable ex) { --- IRangeIterator.java DELETED --- Index: BTree.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/BTree.java,v retrieving revision 1.35 retrieving revision 1.36 diff -C2 -d -r1.35 -r1.36 *** BTree.java 21 Feb 2007 20:17:21 -0000 1.35 --- BTree.java 6 Mar 2007 20:38:05 -0000 1.36 *************** *** 411,416 **** NodeFactory.INSTANCE, // recordCompressor, // ! // FIXME only use checksum for stores that are not fully buffered. ! true || !store.isFullyBuffered()/* useChecksum */ ); --- 411,423 ---- NodeFactory.INSTANCE, // recordCompressor, // ! /* ! * Note: there is less need to use checksum for stores that are ! * not fully buffered since the data are always read from memory ! * which we presume is already parity checked. While a checksum ! * on a fully buffered store could detect an overwrite, the ! * journal architecture makes that extremely unlikely and one ! * has never been observed. ! */ ! !store.isFullyBuffered()/* useChecksum */ ); --- NEW FILE: IFusedView.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 6, 2007 */ package com.bigdata.objndx; /** * A marker interface indicating fused view providing read-only operations on * multiple B+-Trees mapping variable length unsigned byte[] keys to arbitrary * values. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IFusedView { } --- RangeIterator.java DELETED --- Index: NodeSerializer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/NodeSerializer.java,v retrieving revision 1.32 retrieving revision 1.33 diff -C2 -d -r1.32 -r1.33 *** NodeSerializer.java 21 Feb 2007 20:17:21 -0000 1.32 --- NodeSerializer.java 6 Mar 2007 20:38:05 -0000 1.33 *************** *** 191,195 **** * {@link #branchingFactor}. */ ! public static final transient int DEFAULT_BUFFER_CAPACITY_PER_ENTRY = Bytes.kilobyte32 * 1; /** --- 191,195 ---- * {@link #branchingFactor}. */ ! public static final transient int DEFAULT_BUFFER_CAPACITY_PER_ENTRY = Bytes.kilobyte32 / 4; /** Index: IndexSegmentMerger.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/objndx/IndexSegmentMerger.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** IndexSegmentMerger.java 21 Feb 2007 20:17:21 -0000 1.9 --- IndexSegmentMerger.java 6 Mar 2007 20:38:05 -0000 1.10 *************** *** 48,68 **** package com.bigdata.objndx; - import java.io.File; import java.io.IOException; - import java.io.RandomAccessFile; import java.nio.ByteBuffer; - import java.nio.channels.FileChannel; import java.text.NumberFormat; import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.log4j.Level; import org.apache.log4j.Logger; import com.bigdata.objndx.IndexSegmentBuilder.NOPNodeFactory; import com.bigdata.objndx.IndexSegmentBuilder.SimpleLeafData; - import com.bigdata.rawstore.Addr; import com.bigdata.rawstore.Bytes; - import com.bigdata.rawstore.IRawStore; import cutthecrap.utils.striterators.Expander; --- 48,65 ---- package com.bigdata.objndx; import java.io.IOException; import java.nio.ByteBuffer; import java.text.NumberFormat; import java.util.Iterator; import java.util.NoSuchElementException; + import java.util.Vector; import org.apache.log4j.Level; import org.apache.log4j.Logger; + import com.bigdata.journal.TemporaryRawStore; import com.bigdata.objndx.IndexSegmentBuilder.NOPNodeFactory; import com.bigdata.objndx.IndexSegmentBuilder.SimpleLeafData; import com.bigdata.rawstore.Bytes; import cutthecrap.utils.striterators.Expander; *************** *** 71,87 **** /** * Class supporting a compacting merge of two btrees into a series of ordered ! * leaves on a temporary file in support of a compacting merge of mutable ! * {@link BTree}s and/or immutable {@link IndexSegment}s. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * ! * @todo write tests: merging a tree with itself, merging trees w/o deletion ! * markers, merging trees w/ deletion markers, merging trees w/ age-based ! * version expiration, merging trees with count-based version expiration. ! * ! * @todo Support delete during merge to support transactions (a TimestampValue ! * having a greater timestamp and a null value is interpreted as a delete ! * marker). * * @todo Support deletion based on history policy (requires timestamps in the --- 68,80 ---- /** * Class supporting a compacting merge of two btrees into a series of ordered ! * leaves written on a {@link TemporaryRawStore} in support of a compacting ! * merge of mutable {@link BTree}s and/or immutable {@link IndexSegment}s. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * ! * @todo Support merge rule that knows how to process timestamped entries and ! * deletion markers and that removes deletion markers iff a full ! * compacting merge is requested. * * @todo Support deletion based on history policy (requires timestamps in the *************** *** 92,103 **** * well. * - * FIXME rewrite to use {@link IRawStore} objects as buffers ala the - * {@link IndexSegmentBuilder} and see if we are better off using memory or disk - * to buffer the merge. - * * @see {@link FusedView}, which provides a dynamic view of two or more btrees. * However, this class is more efficient when we are going to do a bulk * merge operation since it performs the merge and computes the #of output * entries in one pass. */ public class IndexSegmentMerger { --- 85,94 ---- * well. * * @see {@link FusedView}, which provides a dynamic view of two or more btrees. * However, this class is more efficient when we are going to do a bulk * merge operation since it performs the merge and computes the #of output * entries in one pass. + * + * @todo parameterize recordCompressor. */ public class IndexSegmentMerger { *************** *** 122,125 **** --- 113,121 ---- /** + * @todo parameterize useChecksum. + */ + final boolean useChecksum = false; + + /** * Compacting merge of two btrees, writing the results onto a file. The file * data format is simply a sequence of leaves using the specified branching *************** *** 129,152 **** * leaves. * - * @param raf - * The file on which the results are written as a series of - * leaves. Typically this file will be created in a temporary - * directory and the file will be removed when the results of the - * compacting merge are no longer required. * @param m ! * The branching factor used by the leaves written on that file. * @param in1 * A btree. * @param in2 * Another btree. - * - * @todo exclusive lock on the output file. */ ! public IndexSegmentMerger(File outFile, int m, AbstractBTree in1, ! AbstractBTree in2) throws IOException { - if (outFile == null) - throw new IllegalArgumentException(); - if (m < AbstractBTree.MIN_BRANCHING_FACTOR) throw new IllegalArgumentException(); --- 125,138 ---- * leaves. * * @param m ! * The branching factor used by the leaves written on the ! * temporary store. * @param in1 * A btree. * @param in2 * Another btree. */ ! public IndexSegmentMerger(int m, AbstractBTree in1, AbstractBTree in2) throws IOException { if (m < AbstractBTree.MIN_BRANCHING_FACTOR) throw new IllegalArgumentException(); *************** *** 158,182 **** throw new IllegalArgumentException(); ! // @todo verify that we are compacting trees for the same index ! ! this.outFile = outFile; ! ! if( outFile.exists() && outFile.length() > 0) { ! ! throw new IOException("output file exists: " ! + outFile.getAbsoluteFile()); ! ! } ! ! // this is a temporary file so make sure that it will disappear. ! outFile.deleteOnExit(); ! ! out = new RandomAccessFile(outFile,"rw"); // reads leaves from the 1st btree. ! itr1 = in1.leafIterator(); // reads leaves from the 2nd btree. ! itr2 = in2.leafIterator(); // output leaf - reused for each leaf written. --- 144,156 ---- throw new IllegalArgumentException(); ! tmpStore = new TemporaryRawStore(); // reads leaves from the 1st btree. ! // itr1 = in1.leafIterator(); ! itr1 = new SourceLeafIterator(in1); // reads leaves from the 2nd btree. ! // itr2 = in2.leafIterator(); ! itr2 = new SourceLeafIterator(in2); // output leaf - reused for each leaf written. *************** *** 184,195 **** leaf.reset(m); ! // @todo should we always use checksums for the temporary file? ! final boolean useChecksum = true; ! ! // Used to serialize the stack and leaves for the output tree. ! int initialBufferCapacity = 0; // will be estimated. nodeSer = new NodeSerializer(NOPNodeFactory.INSTANCE, m, ! initialBufferCapacity, new IndexSegment.CustomAddressSerializer(), in1.nodeSer.keySerializer, --- 158,165 ---- leaf.reset(m); ! // Used to serialize the leaves. nodeSer = new NodeSerializer(NOPNodeFactory.INSTANCE, m, ! 0 /*initialBufferCapacity will be estimated*/, new IndexSegment.CustomAddressSerializer(), in1.nodeSer.keySerializer, *************** *** 201,227 **** } ! final File outFile; /** ! * Used to write the merged output file. */ ! final RandomAccessFile out; final NodeSerializer nodeSer; ! final Iterator itr1; // reads leaves from the 1st btree. ! Leaf leaf1 = null; // current leaf in 1st btree. ! int index1 = 0; // current entry index in current leaf of 1st btree. ! boolean exhausted1 = false; // true iff the 1st iterator is exhausted. ! final Iterator itr2; // reads leaves from the 2nd btree. ! Leaf leaf2 = null; // current leaf in 2nd btree. ! int index2 = 0; // current entry index in current leaf of 2nd btree. ! boolean exhausted2 = false; // true iff the 2nd iterator is exhausted. /** ! * the output leaf. we reuse this for each leaf that we write. when the ! * leaf is full we write it onto the file and then reset it so that it ! * is ready to accept more keys. */ final SimpleLeafData leaf; --- 171,273 ---- } ! /** ! * FIXME Support n-way merge. ! * ! * @param m ! * The output branching factor. ! * @param srcs ! * The source indices in reverse timestamp order (by increasing ! * age). ! * ! * @throws IOException ! */ ! public IndexSegmentMerger(int m, AbstractBTree[] srcs) throws IOException { ! ! throw new UnsupportedOperationException(); ! ! } ! ! /** ! * Used to buffer the output of the merge process. ! */ ! final TemporaryRawStore tmpStore; /** ! * The address at which each leaf in written in the {@link #tmpStore}. The ! * entries in this list are ordered. The first entry is the first leaf ! * written, the second entry is the second leaf written, etc. */ ! final Vector<Long> addrs = new Vector<Long>(); final NodeSerializer nodeSer; + + final SourceLeafIterator itr1; + final SourceLeafIterator itr2; ! // final Iterator itr1; // reads leaves from the 1st btree. ! // Leaf leaf1 = null; // current leaf in 1st btree. ! // int index1 = 0; // current entry index in current leaf of 1st btree. ! // boolean exhausted1 = false; // true iff the 1st iterator is exhausted. ! // ! // final Iterator itr2; // reads leaves from the 2nd btree. ! // Leaf leaf2 = null; // current leaf in 2nd btree. ! // int index2 = 0; // current entry index in current leaf of 2nd btree. ! // boolean exhausted2 = false; // true iff the 2nd iterator is exhausted. ! /** ! * @todo this will need to be modified to use a key range so that we can ! * evict an index partition worth of data at a time from a btree on ! * the journal. Once we do that, is there any point to operating at ! * the leaf iterator level vs just using an {@link EntryIterator}? ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! public static class SourceLeafIterator { ! ! final Iterator itr; // reads leaves from the source btree. ! Leaf leaf = null; // current leaf in source btree. ! int index = 0; // current entry index in current leaf of source btree. ! boolean exhausted = false; // true iff the source btree is exhausted. + public SourceLeafIterator(AbstractBTree src) { + + itr = src.leafIterator(); + + } + + public Leaf next() { + + return (Leaf)itr.next(); + + } + + /** + * If the current leaf is not fully consumed then return immediately. + * Otherwise read the next leaf from the source btree into {@link #leaf}. + * If there are no more leaves available then {@link #exhausted} is set + * to false. {@link #index} is reset to zero(0) in either case. + * + * @return true unless this source btree is exhausted. + */ + protected boolean nextLeaf() { + if (index < leaf.nkeys) + return !exhausted; + index = 0; + if (itr.hasNext()) { + leaf = (Leaf) itr.next(); + } else { + leaf = null; + exhausted = true; + } + return !exhausted; + } + + } + /** ! * The output leaf. We reuse this for each leaf that we write. When the ! * output leaf is full we write it onto the {@link #tmpStore} and then reset ! * it so that it is ready to accept more keys. */ final SimpleLeafData leaf; *************** *** 306,314 **** */ ! leaf1 = (Leaf)itr1.next(); ! leaf2 = (Leaf)itr2.next(); ! while( !exhausted1 && ! exhausted2 ) { applyMergeRule(); --- 352,360 ---- */ ! itr1.leaf = itr1.next(); ! itr2.leaf = itr2.next(); ! while( !itr1.exhausted && ! itr2.exhausted ) { applyMergeRule(); *************** *** 317,334 **** // copy anything remaining in the 1st btree. ! while(!exhausted1) { ! outputKey(leaf1,index1++); ! nextLeaf1(); } // copy anything remaining in the 2nd btree. ! while(!exhausted2) { ! outputKey(leaf2,index2++); ! nextLeaf2(); } --- 363,380 ---- // copy anything remaining in the 1st btree. ! while(!itr1.exhausted) { ! outputKey(itr1.leaf,itr1.index++); ! itr1.nextLeaf(); } // copy anything remaining in the 2nd btree. ! while(!itr2.exhausted) { ! outputKey(itr2.leaf,itr2.index++); ! itr2.nextLeaf(); } *************** *** 343,369 **** } ! // // synch to disk (not necessary since file is not reused). ! // out.getChannel().force(false); ! final long elapsed = System.currentTimeMillis()-begin; ! final long length = out.length(); ! ! NumberFormat cf = NumberFormat.getNumberInstance(); ! ! cf.setGroupingUsed(true); ! ! NumberFormat fpf = NumberFormat.getNumberInstance(); ! ! fpf.setGroupingUsed(false); ! ! fpf.setMaximumFractionDigits(2); ! System.err.println("merge: " + elapsed + "ms, " + cf.format(nentries) ! + " entries, " ! + fpf.format(((double) length / Bytes.megabyte32)) + "MB"); ! return new MergedLeafIterator(outFile, out, leaf.m, nentries, nleaves, ! maxLeafBytes, nodeSer); } --- 389,419 ---- } ! /* ! * reporting. ! */ ! { ! final long elapsed = System.currentTimeMillis() - begin; ! final long length = tmpStore.size(); ! NumberFormat cf = NumberFormat.getNumberInstance(); ! cf.setGroupingUsed(true); ! ! NumberFormat fpf = NumberFormat.getNumberInstance(); ! ! fpf.setGroupingUsed(false); ! ! fpf.setMaximumFractionDigits(2); ! ! System.err.println("merge: " + elapsed + "ms, " ! + cf.format(nentries) + " entries, " ! + fpf.format(((double) length / Bytes.megabyte32)) + "MB"); ! ! } ! ! return new MergedLeafIterator(tmpStore, addrs, leaf.m, nentries, ! nleaves, maxLeafBytes, nodeSer); } *************** *** 389,393 **** protected void applyMergeRule() throws IOException { ! assert !exhausted1 && !exhausted2; /* --- 439,443 ---- protected void applyMergeRule() throws IOException { ! assert !itr1.exhausted && !itr2.exhausted; /* *************** *** 397,403 **** * do less allocation and less search for this case. */ ! byte[] key1 = leaf1.keys.getKey(index1); ! byte[] key2 = leaf2.keys.getKey(index2); int ret = BytesUtil.compareBytes(key1,key2); --- 447,453 ---- * do less allocation and less search for this case. */ ! byte[] key1 = itr1.leaf.keys.getKey(itr1.index); ! byte[] key2 = itr2.leaf.keys.getKey(itr2.index); int ret = BytesUtil.compareBytes(key1,key2); *************** *** 409,430 **** */ ! outputKey(leaf1,index1++); ! index2++; ! nextLeaf1(); ! nextLeaf2(); } else if(ret<0) { ! outputKey(leaf1,index1++); ! nextLeaf1(); } else { ! outputKey(leaf2,index2++); ! nextLeaf2(); } --- 459,480 ---- */ ! outputKey(itr1.leaf,itr1.index++); ! itr2.index++; ! itr1.nextLeaf(); ! itr2.nextLeaf(); } else if(ret<0) { ! outputKey(itr1.leaf,itr1.index++); ! itr1.nextLeaf(); } else { ! outputKey(itr2.leaf,itr2.index++); ! itr2.nextLeaf(); } *************** *** 432,476 **** } ! /** ! * If the current leaf is not fully consumed then return immediately. ! * Otherwise read the next leaf from the 1nd source btree into ! * {@link #leaf1}. If there are no more leaves available then ! * {@link #exhausted1} is set to false. {@link #index1} is reset to zero(0) ! * in either case. ! * ! * @return true unless this source btree is exhausted. ! */ ! protected boolean nextLeaf1() { ! if(index1<leaf1.nkeys) return !exhausted1; ! index1 = 0; ! if (itr1.hasNext()) { ! leaf1 = (Leaf) itr1.next(); ! } else { ! leaf1 = null; ! exhausted1 = true; ! } ! return !exhausted1; ! } ! ! /** ! * If the current leaf is not fully consumed then return immediately. ! * Otherwise read the next leaf from the 2nd source btree into ! * {@link #leaf2}. If there are no more leaves available then ! * {@link #exhausted2} is set to false. {@link #index2} is reset to zero(0) ! * in either case. ! * ! * @return true unless this source btree is exhausted. ! */ ! protected boolean nextLeaf2() { ! if(index2<leaf2.nkeys) return !exhausted2; ! index2 = 0; ! if (itr2.hasNext()) { ! leaf2 = (Leaf) itr2.next(); ! } else { ! leaf2 = null; ! exhausted2 = true; ! } ! return !exhausted2; ! } /** --- 482,526 ---- } ! // /** ! // * If the current leaf is not fully consumed then return immediately. ! // * Otherwise read the next leaf from the 1nd source btree into ! // * {@link #leaf1}. If there are no more leaves available then ! // * {@link #exhausted1} is set to false. {@link #index1} is reset to zero(0) ! // * in either case. ! // * ! // * @return true unless this source btree is exhausted. ! // */ ! // protected boolean nextLeaf1() { ! // if(index1<leaf1.nkeys) return !exhausted1; ! // index1 = 0; ! // if (itr1.hasNext()) { ! // leaf1 = (Leaf) itr1.next(); ! // } else { ! // leaf1 = null; ! // exhausted1 = true; ! // } ! // return !exhausted1; ! // } ! // ! // /** ! // * If the current leaf is not fully consumed then return immediately. ! // * Otherwise read the next leaf from the 2nd source btree into ! // * {@link #leaf2}. If there are no more leaves available then ! // * {@link #exhausted2} is set to false. {@link #index2} is reset to zero(0) ! // * in either case. ! // * ! // * @return true unless this source btree is exhausted. ! // */ ! // protected boolean nextLeaf2() { ! // if(index2<leaf2.nkeys) return !exhausted2; ! // index2 = 0; ! // if (itr2.hasNext()) { ! // leaf2 = (Leaf) itr2.next(); ! // } else { ! // leaf2 = null; ! // exhausted2 = true; ! // } ! // return !exhausted2; ! // } /** *************** *** 489,493 **** if(DEBUG) log.debug("#leavesWritten=" + nleaves + ", src=" ! + (src == leaf1 ? "leaf1" : "leaf2") + ", srcpos=" + srcpos); MutableKeyBuffer keys = (MutableKeyBuffer) leaf.keys; --- 539,543 ---- if(DEBUG) log.debug("#leavesWritten=" + nleaves + ", src=" ! + (src == itr1.leaf ? "leaf1" : "leaf2") + ", srcpos=" + srcpos); MutableKeyBuffer keys = (MutableKeyBuffer) leaf.keys; *************** *** 530,561 **** ByteBuffer buf = nodeSer.putNodeOrLeaf( leaf ); ! FileChannel outChannel = out.getChannel(); ! ! // position on the channel before the write. ! final long offset = outChannel.position(); ! ! if(offset>Integer.MAX_VALUE) { ! ! throw new IOException("Index segment exceeds int32 bytes."); ! ! } final int nbytes = buf.limit(); ! /* ! * write header containing the #of bytes in the record. ! * ! * @todo it is unelegant to have to read in this this 4 byte header for ! * each leaf. perhaps it would have better performance to write a header ! * block at the end of the merge file that indexed into the leaves? ! */ ! out.writeInt(nbytes); ! ! // write the compressed record on the channel. ! final int nbytes2 = outChannel.write(buf); ! ! assert nbytes2 == buf.limit(); ! System.err.print("."); // wrote a leaf. nleaves++; --- 580,592 ---- ByteBuffer buf = nodeSer.putNodeOrLeaf( leaf ); ! // write the record final int nbytes = buf.limit(); ! final long addr = tmpStore.write(buf); ! ! addrs.add(addr); ! System.err.print(">"); // wrote a leaf. nleaves++; *************** *** 577,582 **** public static class MergedLeafIterator implements Iterator<ILeafData> { ! public final File file; ! protected final RandomAccessFile raf; public final int m; public final int nentries; --- 608,613 ---- public static class MergedLeafIterator implements Iterator<ILeafData> { ! public final TemporaryRawStore tmpStore; ! public final Vector<Long> addrs; public final int m; public final int nentries; *************** *** 586,599 **** private int leafIndex = 0; - - /** - * Offset of the last leaf read from the file. - */ - private int offset; - - /** - * Used to read leaves from the file. - */ - private final ByteBuffer buf; /** --- 617,620 ---- *************** *** 604,609 **** /** ! * @param file ! * @param raf * @param m * @param nentries --- 625,632 ---- /** ! * @param tmpStore ! * @param addrs ! * An ordered list of the addresses at which the leaves may ! * be found in <i>tmpStore</i>. * @param m * @param nentries *************** *** 612,621 **** * @param nodeSer */ ! public MergedLeafIterator(File file, RandomAccessFile raf, int m, ! int nentries, int nleaves, int maxLeafBytes, ! NodeSerializer nodeSer) throws IOException { ! this.file = file; ! this.raf = raf; this.m = m; this.nentries = nentries; --- 635,644 ---- * @param nodeSer */ ! public MergedLeafIterator(TemporaryRawStore tmpStore, ! Vector<Long> addrs, int m, int nentries, int nleaves, ! int maxLeafBytes, NodeSerializer nodeSer) throws IOException { ! this.tmpStore = tmpStore; ! this.addrs = addrs; this.m = m; this.nentries = nentries; *************** *** 626,640 **** nodeSer.keySerializer, nodeSer.valueSerializer, nodeSer.recordCompressor, nodeSer.useChecksum); - - // note: allocates direct buffer when size is large. - this.buf = NodeSerializer.alloc(maxLeafBytes); - - // rewind. - raf.seek(0); } /** ! * Close the channel and delete the merge file. * <p> * This is invoked automatically when the iterator is exhausted. --- 649,657 ---- nodeSer.keySerializer, nodeSer.valueSerializer, nodeSer.recordCompressor, nodeSer.useChecksum); } /** ! * Closes and deletes the backing store. * <p> * This is invoked automatically when the iterator is exhausted. *************** *** 643,679 **** */ public void close() { - - if (raf.getChannel().isOpen()) { - - try { - - System.err.println("Closing MergedLeafIterator: file="+file); ! raf.close(); ! ! } catch (IOException ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! if (file.exists() && !file.delete()) { ! ! log.warn("Could not delete file: " + file.getAbsoluteFile()); ! ! } ! ! } } /** ! * Test whether more leaves are available. ! * <p> ! * Automatically closes out the backing buffer when all leaves have been ! * processed. ! * ! * FIXME I am not seeing an automatic close of this iterator when ! * invoked by {@link MergedEntryIterator}. */ public boolean hasNext() { --- 660,673 ---- */ public void close() { ! log.info("Closing temporary store"); ! ! tmpStore.close(); } /** ! * Test whether more leaves are available. Automatically closes out the ! * backing buffer when all leaves have been processed. */ public boolean hasNext() { *************** *** 695,743 **** public ILeafData next() { ! if(!hasNext()) throw new NoSuchElementException(); ! ! try { ! ! // #of bytes in the next leaf. ! int nbytes = raf.readInt(); ! ! offset += Bytes.SIZEOF_INT; ! ! if (DEBUG) ! log.debug("will read " + nbytes + " bytes at offset=" ! + offset); ! ! buf.limit(nbytes); ! buf.position(0); ! ! int nread = raf.getChannel().read(buf); ! ! assert nread == nbytes; ! ! offset += nread; ! long addr = Addr.toLong(nbytes, offset); ! /* ! * Note: this is using a nodeSer whose node factory does not ! * require a non-null btree reference. ! */ ! ! // @todo cleanup buffer logic here and elsewhere. ! buf.position(0); ! ! ILeafData leaf = nodeSer.getLeaf(null/*btree*/, addr, buf); ! leafIndex++; ! ! return leaf; ! } ! catch (IOException ex) { ! throw new RuntimeException(ex); ! } } --- 689,712 ---- public ILeafData next() { ! if (!hasNext()) ! throw new NoSuchElementException(); ! // the address of the next leaf. ! final long addr = addrs.get(leafIndex); ! /* ! * Note: this is using a nodeSer whose node factory does not require ! * a non-null btree reference. ! */ ! ByteBuffer buf = tmpStore.read(addr); ! ILeafData leaf = nodeSer.getLeaf(null/* btree */, addr, buf); ! leafIndex++; ! System.err.print("<"); // read a leaf. ! return leaf; } |