From: Bryan T. <tho...@us...> - 2007-02-15 14:23:54
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv7688/src/java/com/bigdata/scaleup Modified Files: PartitionedJournal.java Name2MetadataAddr.java Log Message: Added support for filtering and resolving on EntryIterator and worked through most of the test suite for UnsisolatedBTree. Index: Name2MetadataAddr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/Name2MetadataAddr.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** Name2MetadataAddr.java 9 Feb 2007 16:13:18 -0000 1.1 --- Name2MetadataAddr.java 15 Feb 2007 14:23:50 -0000 1.2 *************** *** 49,53 **** import com.bigdata.journal.Name2Addr; - import com.bigdata.objndx.BTree; import com.bigdata.objndx.BTreeMetadata; import com.bigdata.objndx.IIndex; --- 49,52 ---- Index: PartitionedJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/PartitionedJournal.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** PartitionedJournal.java 15 Feb 2007 01:34:23 -0000 1.5 --- PartitionedJournal.java 15 Feb 2007 14:23:50 -0000 1.6 *************** *** 73,82 **** * </p> * <p> ! * An {@link PartitionedIndex} is dynamically decomposed into key-range ! * partitions. Each partition is defined by the first key that may be inserted ! * on, or read from, that parition. A total ordering over partitions and their ! * locations is maintained in a metadata index. An insert or read is directed to ! * the first partition having a minimum key greater than or equal to the probe ! * key. * </p> * <p> --- 73,83 ---- * </p> * <p> ! * A {@link PartitionedIndex} is an {@link IIndex} that is dynamically ! * decomposed into key-range partitions. Each partition is defined by a ! * <i>separator key</i>. The separator key is the first key that may be ! * inserted into, or read from, that partition. A total ordering over partitions ! * and their locations is maintained in a {@link MetadataIndex}. An insert or ! * read is directed to the first partition having a separator key greater than ! * or equal to the probe key. * </p> * <p> *************** *** 84,104 **** * that buffers writes for that partition and zero or more {@link IndexSegment}s * per partition. The relationship between keys and partitions is managed by a ! * {@link MetadataIndex}. * </p> * <p> ! * All writes bound for any partition of any index are absorbed on this ! * {@link Journal}. For each {@link PartitionedIndex}, there is a ! * corresponding {@link BTree} that absorbs writes (including deletes) on the ! * slave. On overflow, {@link Journal}, the backing store is frozen and a new ! * buffer and backing store are deployed to absorb further writes. During this ! * time, reads on the {@link PartitionedJournal} are served by a {@link FusedView} ! * of the data on the new slave and the data on the old slave. * </p> * <p> ! * While reads and writes proceed in the forground on the new buffer and backing ! * store, a background thread builds an {@link IndexSegment} from each modified ! * {@link BTree} corresponding to a {@link PartitionedIndex}. Once all indices ! * have been processed, the old buffer and backing store are released and their ! * use is atomic replaced by the use of the new index segments. * </p> * <p> --- 85,113 ---- * that buffers writes for that partition and zero or more {@link IndexSegment}s * per partition. The relationship between keys and partitions is managed by a ! * {@link MetadataIndex}. Partitions may be multiplexed onto the same ! * {@link Journal} in order to reduce the #of disk files that are being actively ! * written and maximize the use of sequential IO on a given IO channel. * </p> * <p> ! * All writes bound for any partition of any index are absorbed on the ! * {@link Journal} to which that partition has been assigned. For each ! * {@link PartitionedIndex}, there is a corresponding {@link BTree} that ! * absorbs writes (including deletes) on the {@link Journal}. The ! * {@link PartitionedJournal} actually delegates all storage services to a ! * {@link SlaveJournal}. When the {@link SlaveJournal} {@link #overflow()}s, ! * it is frozen and a new {@link SlaveJournal} is deployed to absorb further ! * writes. During this time, reads on the {@link PartitionedJournal} are served ! * by a {@link FusedView} of the data on the new {@link SlaveJournal} and the ! * data on the old {@link SlaveJournal}. * </p> * <p> ! * While reads and writes proceed in the foreground on the new ! * {@link SlaveJournal}, a background thread builds an {@link IndexSegment} ! * from each modified {@link BTree} corresponding to a {@link PartitionedIndex}. ! * Note that we would be free to delete the old {@link SlaveJournal} and old ! * {@link IndexSegment}s except that concurrent transactions may still need to ! * read from historical states which would be lost by the merge. However, ! * unisolated read and write operations and new transactions immediately begin ! * to use the new {@link IndexSegment}s. * </p> * <p> *************** *** 106,127 **** * compacting merge MAY be performed. Otherwise a new {@link IndexSegment} is * generated, possibly resulting in more than one {@link IndexSegment} for the ! * same partition. Once all data from the slave are stable in the appropriate ! * {@link IndexSegment}s, the old {@link Journal} is closed and either deleted ! * synchronously or marked for deletion. Each {@link IndexSegment} built in this ! * manner contains a historical snapshot of data written on {@link Journal} for ! * a given partition. If multiple {@link IndexSegment} are allowed to accumulate ! * per partition, then the {@link IndexSegment}s are combined periodically ! * using a compacting merge (responsible for retaining the most recent versions ! * for any key and processing delete markers). The inputs to this process are ! * then deleted (or marked for deletion). * </p> * <p> ! * A partition overflows and is split when the total data size of an index ! * partition (estimated as the sum of the {@link IndexSegment}s and the data on ! * the {@link Journal} for that partition) exceeds a threshold. Likewise, ! * partitions may underflow and be joined. These operations require updates to ! * the metadata index so that futher requests are directed based on the new ! * parition boundaries. * </p> * * @todo When you open an index segment the nodes are fully buffered, so that --- 115,177 ---- * compacting merge MAY be performed. Otherwise a new {@link IndexSegment} is * generated, possibly resulting in more than one {@link IndexSegment} for the ! * same partition. Once all data from the {@link SlaveJournal} are stable in the ! * appropriate {@link IndexSegment}s, the old {@link SlaveJournal} is no longer ! * required for unisolated reads or writes or for new transactions (however, it ! * MAY be required to serve existing transactions reading from historical states ! * that are no longer present in the merged index segments). Each ! * {@link IndexSegment} built in this manner contains a snapshot of surviving ! * data written on {@link Journal} for a given partition. If multiple ! * {@link IndexSegment} are allowed to accumulate per partition, then the ! * {@link IndexSegment}s are combined periodically using a compacting merge ! * (responsible for retaining the most recent versions for any key and ! * processing delete markers). The inputs to a compacting merge are then no ! * longer required by unisolated reads or writes but MAY be required to support ! * concurrent transactions reading from historical states not preserved by the ! * merge. * </p> * <p> ! * An index partition overflows and is split when the total data size of an ! * index partition (estimated as the sum of the {@link IndexSegment}s and the ! * data on the {@link Journal} for that partition) exceeds a threshold. ! * Likewise, index partitions may underflow and be joined. These operations ! * require updates to the {@link MetadataIndex} so that futher requests are ! * directed based on the new partition boundaries. * </p> + * <p> + * Either once the journal is frozen or periodically during its life we would + * evict an index to a perfect index segment on disk (write the leaves out in a + * linear sequence, complete with prior-next references, and build up a perfect + * index over those leaves - the branching factor here can be much higher in + * order to reduce the tree depth and optimize IO). The perfect index would have + * to have "delete" entries to mark deleted keys. Periodically we would merge + * evicted index segments. The goal is to balance write absorbtion and + * concurrency control within memory and using pure sequantial IO against 100:1 + * or better data on disk with random seeks for reading any given leaf. The + * index nodes would be written densely after the leaves such that it would be + * possible to fully buffer the index nodes with a single sequential IO. A key + * range scan would likewise be a sequential IO since we would know the start + * and end leaf for the key range directly from the index. A read would first + * resolve against the current journal, then the prior journal iff one is in the + * process of be paritioned out into individual perfect index segments, and then + * against those index segments in order. A "point" read would therefore be + * biased to be more efficient for "recently" written data, but reads otherwise + * must present a merged view of the data in each remaining historical perfect + * index that covers at least part of the key range. Periodically the index + * segments could be fully merged, at which point we would no longer have any + * delete entries in a given perfect index segment. + * </p> + * <p> + * At one extreme there will be one journal per disk and the journal will use + * the entire disk partition. In a pure write scenario the disk would perform + * only sequential IO. However, applications will also need to read data from + * disk. Read and write buffers need to be split. Write buffers are used to + * defer IOs until large sequential IOs may be realized. Read buffers are used + * for pre-fetching when the data on disk is much larger than the available RAM + * and the expectation of reuse is low while the expectation of completing a + * sequential scan is high. Direct buffering may be used for hot-spots but + * requires a means to bound the locality of the buffered segment, e.g., by not + * multiplexing an index segment that will be directly buffered and providing a + * sufficient resource abundence for high performance. + * <p> * * @todo When you open an index segment the nodes are fully buffered, so that *************** *** 144,147 **** --- 194,242 ---- * and a distributed protocol for access to the various index partitions. * + * @todo I've been considering the problem of a distributed index more. A + * mutiplexed journal holding segments for one or more clustered indices + * (containing anything from database rows to generic objects to terms to + * triples) would provide fast write absorption and good read rates + * without a secondary read-optimized segment. If this is combined with + * pipelined journals for per-index segment redundency (a given segment is + * replicated on N=3+ hosts) then we can have both failover and + * load-balancing for read-intensive segments. + * <p> + * Clustered indices support (re-)distribution and redundency since their + * rows are their essential data while the index simply provides efficient + * access. Therefore the same objects may be written onto multiple + * replications of an index range hosted by different journals. The object + * data will remain invariant, but the entailed index data will be + * different for each journal on which the object is written. + * <p> + * This requires a variety of metadata for each index segment. If a + * segment is defined by an index identifier and a separator key, then the + * metadata model needs to identify the master journal for a given segment + * (writes are directed to this journal) and the secondary journals for + * the segment (the master writes through to the secondary journals using + * a pipeline). Likewise, the journal must be able to identify the root + * for any given committed state of each index range written on that + * journal. + * <p> + * We need to track write load and read load per index range in the + * journal, per journal, per IO channel, per disk, per host, and per + * network segment. Write load can be reduced by splitting a segment, by + * using a host with faster resources, or by reducing other utilization of + * the host. The latter would include the case of preferring reads from a + * secondary journal for that index segment. It is an extremely cheap + * action to offload readers to a secondary service. Likewise, failover of + * the master for an index range is also inexpensive since the data are + * already in place on several secondaries, + * <p> + * A new replication of an index range may be built up gradually, e.g., by + * moving a leaf at a time and piplining only the sub-range of the index + * range that has been already mirrored. For simplicity, the new copy of + * the index range would not be visible in the index metadata until a full + * copy of the index range was life. Registration of the copy with the + * metadata index is then trivial. Until that time, the metadata index + * would only know that a copy was being developed. If one of the existing + * replicas is not heavily loaded then it can handle the creation of the + * new copy. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ |