From: Bryan T. <tho...@us...> - 2007-03-29 17:01:39
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16125/src/java/com/bigdata/scaleup Modified Files: PartitionedIndexView.java PartitionMetadata.java MasterJournal.java SegmentMetadata.java SlaveJournal.java AbstractPartitionTask.java IResourceMetadata.java IsolatablePartitionedIndexView.java JournalMetadata.java Log Message: Fixed bug in overflow handling for triple store. Added DataService UUID[] to partition metadata. Index: IsolatablePartitionedIndexView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/IsolatablePartitionedIndexView.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** IsolatablePartitionedIndexView.java 12 Mar 2007 18:06:12 -0000 1.2 --- IsolatablePartitionedIndexView.java 29 Mar 2007 17:01:33 -0000 1.3 *************** *** 49,67 **** import com.bigdata.isolation.IIsolatableIndex; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.objndx.IEntryIterator; /** * A {@link PartitionedIndexView} that supports transactions and deletion ! * markers. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ - * - * FIXME implement; support processing of delete markers. They can exist in the mutable - * btree and in index segments that are not either a clean first eviction or a - * full compacting merge (e.g., they can still exist in a compacting merge if - * there are other index segments or btrees that are part of a partition but are - * not partitipating in the compacting merge). */ public class IsolatablePartitionedIndexView extends PartitionedIndexView implements IIsolatableIndex { --- 49,93 ---- import com.bigdata.isolation.IIsolatableIndex; + import com.bigdata.isolation.IsolatableFusedView; import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.objndx.IBatchBTree; import com.bigdata.objndx.IEntryIterator; + import com.bigdata.objndx.ReadOnlyFusedView; /** * A {@link PartitionedIndexView} that supports transactions and deletion ! * markers. Write operations are passed through to the base class, which in turn ! * delegates them to the {@link UnisolatedBTree} identified to the constructor. ! * Read operations understand deletion markers. Processing deletion markers ! * requires that the source(s) for an index partition view are read in order ! * from the most recent (the mutable btree that is absorbing writes for the ! * index partition) to the earliest historical resource. The first entry for the ! * key in any source is the value that will be reported on a read. If the entry ! * is deleted, then the read will report that no entry exists for that key. ! * <p> ! * Note that deletion markers can exist in both the mutable btree absorbing ! * writes and in historical journals and index segments having data for the ! * partition view. Deletion markers are expunged from index segments only by a ! * full compacting merge of all index segments having life data for the ! * partition. ! * <p> ! * Implementation note: both the write operations and the {@link IBatchBTree} ! * operations are inherited from the base class. Only non-batch read operations ! * are overriden by this class. ! * ! * FIXME implement; support processing of delete markers - basically they have ! * to be processed on read so that a delete on the mutable btree overrides an ! * historical value, and a deletion marker in a more recent index segment ! * overrides a deletion marker in an earlier index segment. Deletion markers can ! * exist in the both mutable btree and in index segments that are not either a ! * clean first eviction or a full compacting merge (e.g., they can still exist ! * in a compacting merge if there are other index segments or btrees that are ! * part of a partition but are not partitipating in the compacting merge). ! * ! * @see IsolatableFusedView ! * @see ReadOnlyFusedView * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class IsolatablePartitionedIndexView extends PartitionedIndexView implements IIsolatableIndex { *************** *** 69,126 **** /** * @param btree * @param mdi */ public IsolatablePartitionedIndexView(UnisolatedBTree btree, MetadataIndex mdi) { super(btree, mdi); - // throw new UnsupportedOperationException(); - } - - public boolean contains(byte[] key) { - // TODO Auto-generated method stub - return false; - } - - public Object insert(Object key, Object value) { - // TODO Auto-generated method stub - return null; - } - - public Object lookup(Object key) { - // TODO Auto-generated method stub - return null; - } - - public int rangeCount(byte[] fromKey, byte[] toKey) { - // TODO Auto-generated method stub - return 0; - } - - public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { - // TODO Auto-generated method stub - return null; - } - - public Object remove(Object key) { - // TODO Auto-generated method stub - return null; - } - - public void contains(int ntuples, byte[][] keys, boolean[] contains) { - // TODO Auto-generated method stub - - } - - public void insert(int ntuples, byte[][] keys, Object[] values) { - // TODO Auto-generated method stub - - } - - public void lookup(int ntuples, byte[][] keys, Object[] values) { - // TODO Auto-generated method stub - - } - - public void remove(int ntuples, byte[][] keys, Object[] values) { - // TODO Auto-generated method stub } --- 95,105 ---- /** * @param btree + * The btree that will absorb writes for the index partitions. * @param mdi + * The metadata index. */ public IsolatablePartitionedIndexView(UnisolatedBTree btree, MetadataIndex mdi) { + super(btree, mdi); } Index: JournalMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/JournalMetadata.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** JournalMetadata.java 27 Mar 2007 17:11:41 -0000 1.2 --- JournalMetadata.java 29 Mar 2007 17:01:33 -0000 1.3 *************** *** 44,48 **** package com.bigdata.scaleup; - import java.io.File; import java.util.UUID; --- 44,47 ---- *************** *** 61,85 **** protected final String filename; protected final ResourceState state; protected final UUID uuid; ! public File getFile() { ! return new File(filename); } /** ! * Always returns ZERO (0L) since we can not accurately estimate the #of ! * bytes on the journal dedicated to a given partition of a named index. */ ! public long size() { ! return 0L; } ! public ResourceState state() { return state; } ! public UUID getUUID() { return uuid; } --- 60,108 ---- protected final String filename; + protected final long nbytes; protected final ResourceState state; protected final UUID uuid; ! public final boolean isIndexSegment() { ! ! return false; ! ! } ! ! public final boolean isJournal() { ! ! return true; ! ! } ! ! public final String getFile() { ! ! return filename; ! } /** ! * Note: this value is typically zero (0L) since we can not accurately ! * estimate the #of bytes on the journal dedicated to a given partition of a ! * named index. The value is originally set to zero (0L) by the ! * {@link JournalMetadata#JournalMetadata(Journal, ResourceState)} ! * constructor. */ ! public final long size() { ! ! return nbytes; ! } ! public final ResourceState state() { ! return state; + } ! public final UUID getUUID() { ! return uuid; + } *************** *** 94,97 **** --- 117,126 ---- this.filename = journal.getFile().toString(); + /* + * Note: 0L since we can not easily estimate the #of bytes on the + * journal that are dedicated to an index partition. + */ + this.nbytes = 0L; + this.state = state; *************** *** 100,102 **** --- 129,149 ---- } + public JournalMetadata(String file, long nbytes, ResourceState state, UUID uuid) { + + if(file == null) throw new IllegalArgumentException(); + + if(state == null) throw new IllegalArgumentException(); + + if(uuid == null) throw new IllegalArgumentException(); + + this.filename = file; + + this.nbytes = nbytes; + + this.state = state; + + this.uuid = uuid; + + } + } Index: IResourceMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/IResourceMetadata.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** IResourceMetadata.java 27 Mar 2007 17:11:41 -0000 1.2 --- IResourceMetadata.java 29 Mar 2007 17:01:33 -0000 1.3 *************** *** 60,66 **** /** ! * The store file. */ ! public File getFile(); /** --- 60,80 ---- /** ! * True iff this resource is an {@link IndexSegment}. Each ! * {@link IndexSegment} contains historical read-only data for exactly one ! * partition of a scale-out index. */ ! public boolean isIndexSegment(); ! ! /** ! * True iff this resource is a {@link Journal}. When the resource is a ! * {@link Journal}, there will be a named mutable btree on the journal that ! * is absorbing writes for one or more index partition of a scale-out index. ! */ ! public boolean isJournal(); ! ! /** ! * The name of the file containing the resource. ! */ ! public String getFile(); /** Index: AbstractPartitionTask.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/AbstractPartitionTask.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** AbstractPartitionTask.java 27 Mar 2007 17:11:41 -0000 1.4 --- AbstractPartitionTask.java 29 Mar 2007 17:01:33 -0000 1.5 *************** *** 48,52 **** import java.util.concurrent.Executors; - import com.bigdata.isolation.IIsolatableIndex; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.isolation.Value; --- 48,51 ---- *************** *** 56,60 **** import com.bigdata.objndx.IndexSegmentBuilder; import com.bigdata.objndx.IndexSegmentMerger; - import com.bigdata.objndx.IndexSegmentMetadata; import com.bigdata.objndx.RecordCompressor; import com.bigdata.objndx.IndexSegmentMerger.MergedEntryIterator; --- 55,58 ---- *************** *** 90,98 **** * @todo parameterize useChecksum, recordCompressor. * - * @todo assiging sequential segment identifiers may impose an unnecessary - * message overhead since we can just use the temporary file mechanism - * and the inspect the {@link IndexSegmentMetadata} to learn more - * about a given store file. - * * @todo try performance with and without checksums and with and without * record compression. --- 88,91 ---- *************** *** 129,138 **** */ protected final RecordCompressor recordCompressor = null; ! ! /** ! * The serializer used by all {@link IIsolatableIndex}s. ! */ ! static protected final IValueSerializer valSer = Value.Serializer.INSTANCE; ! /** * --- 122,126 ---- */ protected final RecordCompressor recordCompressor = null; ! /** * *************** *** 220,224 **** private final IResourceMetadata src; - private final int segId; /** --- 208,211 ---- *************** *** 227,236 **** * The source for the build operation. Only those entries in * the described key range will be used. - * @param segId - * The output segment identifier. */ public BuildTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, IResourceMetadata src, int segId) { super(master, name, indexUUID, branchingFactor, errorRate, partId, --- 214,221 ---- * The source for the build operation. Only those entries in * the described key range will be used. */ public BuildTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, IResourceMetadata src) { super(master, name, indexUUID, branchingFactor, errorRate, partId, *************** *** 239,244 **** this.src = src; - this.segId = segId; - } --- 224,227 ---- *************** *** 255,264 **** AbstractBTree src = master.getIndex(name,this.src); ! File outFile = master.getSegmentFile(name, partId, segId); IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, master.tmpDir, src.rangeCount(fromKey, toKey), src .rangeIterator(fromKey, toKey), branchingFactor, ! valSer, useChecksum, recordCompressor, errorRate, indexUUID); IResourceMetadata[] resources = new SegmentMetadata[] { new SegmentMetadata( --- 238,248 ---- AbstractBTree src = master.getIndex(name,this.src); ! File outFile = master.getSegmentFile(name, partId); IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, master.tmpDir, src.rangeCount(fromKey, toKey), src .rangeIterator(fromKey, toKey), branchingFactor, ! src.getNodeSerializer().getValueSerializer(), useChecksum, ! recordCompressor, errorRate, indexUUID); IResourceMetadata[] resources = new SegmentMetadata[] { new SegmentMetadata( *************** *** 282,286 **** abstract static class AbstractMergeTask extends AbstractPartitionTask { - protected final int segId; protected final boolean fullCompactingMerge; --- 266,269 ---- *************** *** 296,300 **** protected AbstractMergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, ! int partId, byte[] fromKey, byte[] toKey, int segId, boolean fullCompactingMerge) { --- 279,283 ---- protected AbstractMergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, ! int partId, byte[] fromKey, byte[] toKey, boolean fullCompactingMerge) { *************** *** 302,307 **** fromKey, toKey); - this.segId = segId; - this.fullCompactingMerge = fullCompactingMerge; --- 285,288 ---- *************** *** 326,330 **** // output file for the merged segment. ! File outFile = master.getSegmentFile(name, partId, segId); IResourceMetadata[] resources = getResources(); --- 307,311 ---- // output file for the merged segment. ! File outFile = master.getSegmentFile(name, partId); IResourceMetadata[] resources = getResources(); *************** *** 339,347 **** } // merge the data from the btree on the slave and the index // segment. MergedLeafIterator mergeItr = new IndexSegmentMerger( tmpFileBranchingFactor, srcs).merge(); ! // build the merged index segment. IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, --- 320,331 ---- } + final IValueSerializer valSer = srcs[0].getNodeSerializer() + .getValueSerializer(); + // merge the data from the btree on the slave and the index // segment. MergedLeafIterator mergeItr = new IndexSegmentMerger( tmpFileBranchingFactor, srcs).merge(); ! // build the merged index segment. IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, *************** *** 383,390 **** // new segment definitions. ! final SegmentMetadata[] newSegs = new SegmentMetadata[2]; // assume only the last segment is live. ! final SegmentMetadata oldSeg = pmd.segs[pmd.segs.length-1]; newSegs[0] = new SegmentMetadata(oldSeg.filename, oldSeg.nbytes, --- 367,374 ---- // new segment definitions. ! final IResourceMetadata[] newSegs = new IResourceMetadata[2]; // assume only the last segment is live. ! final SegmentMetadata oldSeg = (SegmentMetadata)pmd.resources[pmd.resources.length-1]; newSegs[0] = new SegmentMetadata(oldSeg.filename, oldSeg.nbytes, *************** *** 394,398 **** .length(), ResourceState.Live, builder.segmentUUID); ! mdi.put(fromKey, new PartitionMetadata(0, segId + 1, newSegs)); return null; --- 378,382 ---- .length(), ResourceState.Live, builder.segmentUUID); ! mdi.put(fromKey, new PartitionMetadata(0, pmd.dataServices, newSegs)); return null; *************** *** 436,444 **** public MergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, IResourceMetadata[] resources, ! int segId) { super(master, name, indexUUID, branchingFactor, errorRate, partId, ! fromKey, toKey, segId, false); this.resources = resources; --- 420,428 ---- public MergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, IResourceMetadata[] resources ! ) { super(master, name, indexUUID, branchingFactor, errorRate, partId, ! fromKey, toKey, false); this.resources = resources; *************** *** 490,497 **** public FullMergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, long commitTime, int segId) { super(master, name, indexUUID, branchingFactor, errorRate, partId, ! fromKey, toKey, segId, true); this.commitTime = commitTime; --- 474,481 ---- public FullMergeTask(MasterJournal master, String name, UUID indexUUID, int branchingFactor, double errorRate, int partId, ! byte[] fromKey, byte[] toKey, long commitTime) { super(master, name, indexUUID, branchingFactor, errorRate, partId, ! fromKey, toKey, true); this.commitTime = commitTime; Index: SlaveJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/SlaveJournal.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** SlaveJournal.java 27 Mar 2007 14:34:22 -0000 1.6 --- SlaveJournal.java 29 Mar 2007 17:01:33 -0000 1.7 *************** *** 50,57 **** import com.bigdata.isolation.IIsolatableIndex; import com.bigdata.isolation.UnisolatedBTree; - import com.bigdata.journal.IJournal; import com.bigdata.journal.Journal; import com.bigdata.journal.Name2Addr; - import com.bigdata.journal.ResourceManager; import com.bigdata.objndx.BTree; import com.bigdata.objndx.IEntryIterator; --- 50,55 ---- *************** *** 61,69 **** /** ! * Class delegates the {@link #overflow()} event to a master ! * {@link IJournal}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class SlaveJournal extends Journal { --- 59,69 ---- /** ! * Class delegates the {@link #overflow()} event to a {@link MasterJournal}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * FIXME refactor the metadata index so that it may be run as an embedded + * process or remote service. */ public class SlaveJournal extends Journal { *************** *** 106,115 **** * The overflow event is delegated to the master. */ ! public void overflow() { // handles event reporting. super.overflow(); ! master.overflow(); } --- 106,115 ---- * The overflow event is delegated to the master. */ ! public boolean overflow() { // handles event reporting. super.overflow(); ! return master.overflow(); } *************** *** 233,247 **** /* ! * @todo the assigned random UUID for the metadata index must be used by ! * all B+Tree objects having data for the metadata index so once we ! * support partitions in the metadata index itself this UUID must be ! * propagated to all of those downstream objects. */ MetadataIndex mdi = new MetadataIndex(this, ! BTree.DEFAULT_BRANCHING_FACTOR, UUID.randomUUID(), btree ! .getIndexUUID(), name); ! // create the initial partition which can accept any key. ! mdi.put(new byte[]{}, new PartitionMetadata(0)); // add to the persistent name map. --- 233,264 ---- /* ! * Note: there are two UUIDs here - the UUID for the metadata index ! * describing the partitions of the named scale-out index and the UUID ! * of the named scale-out index. The metadata index UUID MUST be used by ! * all B+Tree objects having data for the metadata index (its mutable ! * btrees on journals and its index segments) while the managed named ! * index UUID MUST be used by all B+Tree objects having data for the ! * named index (its mutable btrees on journals and its index segments). */ + + final UUID metadataIndexUUID = UUID.randomUUID(); + + final UUID managedIndexUUID = btree.getIndexUUID(); + MetadataIndex mdi = new MetadataIndex(this, ! BTree.DEFAULT_BRANCHING_FACTOR, metadataIndexUUID, ! managedIndexUUID, name); ! /* ! * Create the initial partition which can accept any key. ! * ! * @todo specify the DataSerivce(s) that will accept writes for this ! * index partition. This should be done as part of refactoring the ! * metadata index into a first level service. ! */ ! ! final UUID[] dataServices = new UUID[]{}; ! ! mdi.put(new byte[]{}, new PartitionMetadata(0, dataServices )); // add to the persistent name map. *************** *** 363,376 **** final PartitionMetadata pmd = (PartitionMetadata) itr.next(); ! for (int i = 0; i < pmd.segs.length; i++) { ! SegmentMetadata smd = pmd.segs[i]; ! File file = new File(smd.filename); ! if (file.exists() && !file.delete()) { ! log.warn("Could not remove file: " ! + file.getAbsolutePath()); } --- 380,397 ---- final PartitionMetadata pmd = (PartitionMetadata) itr.next(); ! for (int i = 0; i < pmd.resources.length; i++) { ! IResourceMetadata rmd = pmd.resources[i]; ! if (rmd.isIndexSegment()) { ! File file = new File(rmd.getFile()); ! if (file.exists() && !file.delete()) { ! ! log.warn("Could not remove file: " ! + file.getAbsolutePath()); ! ! } } Index: SegmentMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/SegmentMetadata.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** SegmentMetadata.java 27 Mar 2007 17:11:41 -0000 1.4 --- SegmentMetadata.java 29 Mar 2007 17:01:33 -0000 1.5 *************** *** 44,48 **** package com.bigdata.scaleup; - import java.io.File; import java.util.UUID; --- 44,47 ---- *************** *** 54,57 **** --- 53,58 ---- * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * @todo make fields protected/private. */ public class SegmentMetadata implements IResourceMetadata { *************** *** 74,77 **** --- 75,90 ---- final public UUID uuid; + public final boolean isIndexSegment() { + + return true; + + } + + public final boolean isJournal() { + + return false; + + } + public SegmentMetadata(String filename,long nbytes,ResourceState state, UUID uuid ) { *************** *** 101,106 **** } ! public File getFile() { ! return new File(filename); } --- 114,119 ---- } ! public String getFile() { ! return filename; } Index: PartitionedIndexView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/PartitionedIndexView.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** PartitionedIndexView.java 27 Mar 2007 14:34:22 -0000 1.2 --- PartitionedIndexView.java 29 Mar 2007 17:01:33 -0000 1.3 *************** *** 179,187 **** int n = 0; ! for(int i=0; i<pmd.segs.length; i++) { ! if(pmd.segs[i].state != ResourceState.Live) continue; ! segs[n++] = (IndexSegment) master.getIndex(getName(), pmd.segs[i]); } --- 179,187 ---- int n = 0; ! for(int i=0; i<pmd.resources.length; i++) { ! if(pmd.resources[i].state() != ResourceState.Live) continue; ! segs[n++] = (IndexSegment) master.getIndex(getName(), pmd.resources[i]); } *************** *** 312,318 **** for (int i = 0; i < resources.length; i++) { ! SegmentMetadata seg = pmd.segs[i]; ! if (seg.state != ResourceState.Live) continue; --- 312,318 ---- for (int i = 0; i < resources.length; i++) { ! IResourceMetadata seg = pmd.resources[i]; ! if (seg.state() != ResourceState.Live) continue; Index: PartitionMetadata.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/PartitionMetadata.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PartitionMetadata.java 27 Mar 2007 17:11:41 -0000 1.4 --- PartitionMetadata.java 29 Mar 2007 17:01:33 -0000 1.5 *************** *** 50,53 **** --- 50,55 ---- import java.util.UUID; + import com.bigdata.isolation.UnisolatedBTree; + import com.bigdata.journal.Journal; import com.bigdata.objndx.IValueSerializer; import com.bigdata.objndx.IndexSegment; *************** *** 57,89 **** * partition. * ! * FIXME add ordered UUID[] of the data services on which the index partition ! * has been mapped. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class PartitionMetadata { /** * The unique partition identifier. */ ! final int partId; ! /** ! * The next unique within partition segment identifier to be assigned. */ ! final int nextSegId; ! /** ! * Zero or more files containing {@link IndexSegment}s holding live ! * data for this partition. The entries in the array reflect the ! * creation time of the index segments. The earliest segment is listed ! * first. The most recently created segment is listed last. */ ! final SegmentMetadata[] segs; ! public PartitionMetadata(int partId) { ! this(partId, 0, new SegmentMetadata[] {}); } --- 59,101 ---- * partition. * ! * @todo provide a persistent event log or just integrate the state changes over ! * the historical states of the partition description in the metadata ! * index? ! * ! * @todo aggregate resource load statistics. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public class PartitionMetadata /*implements Externalizable*/ { /** * The unique partition identifier. */ ! final protected int partId; ! /** ! * The ordered list of data services on which data for this partition will ! * be written and from which data for this partition may be read. */ ! final protected UUID[] dataServices; ! /** ! * Zero or more files containing {@link Journal}s or {@link IndexSegment}s ! * holding live data for this partition. The entries in the array reflect ! * the creation time of the index segments. The earliest segment is listed ! * first. The most recently created segment is listed last. Only the ! * {@link ResourceState#Live} resources must be read in order to provide a ! * consistent view of the data for the index partition. ! * {@link ResourceState#Dead} resources will eventually be scheduled for ! * restart-safe deletion. ! * ! * @see ResourceState */ ! final protected IResourceMetadata[] resources; ! public PartitionMetadata(int partId, UUID[] dataServices ) { ! this(partId, dataServices, new IResourceMetadata[] {}); } *************** *** 94,112 **** * The unique partition identifier assigned by the * {@link MetadataIndex}. ! * @param segs ! * A description of each {@link IndexSegment} associated with ! * that partition. */ ! public PartitionMetadata(int partId, int nextSegId, SegmentMetadata[] segs) { ! this.partId = partId; ! this.nextSegId = nextSegId; ! this.segs = segs; } /** * The #of live index segments (those having data that must be included * to construct a fused view representing the current state of the --- 106,162 ---- * The unique partition identifier assigned by the * {@link MetadataIndex}. ! * @param dataServices ! * The ordered array of data service identifiers on which data ! * for this partition will be written and from which data for ! * this partition may be read. ! * @param resources ! * A description of each {@link Journal} or {@link IndexSegment} ! * resource associated with that partition. */ ! public PartitionMetadata(int partId, UUID[] dataServices, ! IResourceMetadata[] resources) { ! if (dataServices == null) ! throw new IllegalArgumentException(); ! if (resources == null) ! throw new IllegalArgumentException(); ! this.partId = partId; ! ! this.dataServices = dataServices; ! ! this.resources = resources; } /** + * The #of data services on which the data for this partition will be + * written and from which they may be read. + * + * @return The replication count for the index partition. + */ + public int getDataServiceCount() { + + return dataServices.length; + + } + + /** + * The ordered list of data services on which the data for this partition + * will be written and from which the data for this partition may be read. + * The first data service is always the primary. Writes SHOULD be pipelined + * from the primary to the secondaries in the same order as they appear in + * this array. + * + * @return A copy of the array of data service identifiers. + */ + public UUID[] getDataServices() { + + return dataServices.clone(); + + } + + /** * The #of live index segments (those having data that must be included * to construct a fused view representing the current state of the *************** *** 119,125 **** int count = 0; ! for (int i = 0; i < segs.length; i++) { ! if (segs[i].state == ResourceState.Live) count++; --- 169,175 ---- int count = 0; ! for (int i = 0; i < resources.length; i++) { ! if (resources[i].state() == ResourceState.Live) count++; *************** *** 141,149 **** int k = 0; ! for (int i = 0; i < segs.length; i++) { ! if (segs[i].state == ResourceState.Live) { ! files[k++] = segs[i].filename; } --- 191,199 ---- int k = 0; ! for (int i = 0; i < resources.length; i++) { ! if (resources[i].state() == ResourceState.Live) { ! files[k++] = resources[i].getFile(); } *************** *** 166,175 **** return false; ! if (segs.length != o2.segs.length) return false; ! for (int i = 0; i < segs.length; i++) { ! if (!segs[i].equals(o2.segs[i])) return false; --- 216,235 ---- return false; ! if (dataServices.length != o2.dataServices.length) return false; ! if (resources.length != o2.resources.length) ! return false; ! for (int i = 0; i < dataServices.length; i++) { ! ! if (!dataServices[i].equals(o2.dataServices[i])) ! return false; ! ! } ! ! for (int i = 0; i < resources.length; i++) { ! ! if (!resources[i].equals(o2.resources[i])) return false; *************** *** 186,279 **** } - // /** - // * The metadata about an index segment life cycle as served by a - // * specific service instance on some host. - // * - // * @todo we need to track load information for the service and the host. - // * however that information probably does not need to be restart - // * safe so it is easily maintained within a rather small hashmap - // * indexed by the service address. - // * - // * @author <a href="mailto:tho...@us...">Bryan - // * Thompson</a> - // * @version $Id$ - // */ - // public static class IndexSegmentServiceMetadata { - // - // /** - // * The service that is handling this index segment. This service - // * typically handles many index segments and multiplexes them on a - // * single journal. - // * - // * @todo When a client looks up an index segment in the metadata index, - // * what we send them is the set of key-addr entries from the leaf - // * in which the index segment was found. If a request by the - // * client to that service discovers that the service no longer - // * handles a key range, that the service is dead, etc., then the - // * client will have to invalidate its cache entry and lookup the - // * current location of the index segment in the metadata index. - // */ - // public InetSocketAddress addr; - // - // } - // - // /** - // * An array of the services that are registered as handling this index - // * segment. One of these services is the master and accepts writes from - // * the client. The other services mirror the segment and provide - // * redundency for failover and load balancing. The order in which the - // * segments are listed in this array could reflect the master (at - // * position zero) and the write pipeline from the master to the - // * secondaries could be simply the order of the entries in the array. - // */ - // public IndexSegmentServiceMetadata services[]; - // - // /** - // * The time that the index segment was started on that service. - // */ - // public long startTime; - // - // /** - // * A log of events for the index segment. This could just be a linked - // * list of strings that get serialized as a single string. Each event is - // * then a semi-structured string, typically generated by a purpose - // * specific logging appender. - // */ - // public Vector<Event> eventLog; - // - // public PartitionMetadata(InetSocketAddress addr) { - // } - // - // /** - // * An event for an index segment. - // * - // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - // * @version $Id$ - // */ - // public static class Event { - // - //// public long timestamp; - // - // public String msg; - // - // /** - // * Serialization for an event. - // * - // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - // * @version $Id$ - // */ - // public static class Serializer /*implements ...*/{ - // - // } - // - // } - /** * Serialization for an index segment metadata entry. * ! * FIXME implement {@link Externalizable} and use explicit versioning. ! * ! * FIXME assumes that resources are {@link IndexSegment}s rather than ! * either index segments or journals. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 246,255 ---- } /** * Serialization for an index segment metadata entry. * ! * FIXME convert to use {@link UnisolatedBTree} (so byte[] values that we ! * (de-)serialize one a one-by-one basis ourselves), implement ! * {@link Externalizable} and use explicit versioning and packed integers. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> *************** *** 289,292 **** --- 265,270 ---- public transient static final PartitionMetadata.Serializer INSTANCE = new Serializer(); + // private static final transient int VERSION0 = 0x0; + public Serializer() { } *************** *** 294,323 **** public void putValues(DataOutputStream os, Object[] values, int nvals) throws IOException { ! for (int i = 0; i < nvals; i++) { PartitionMetadata val = (PartitionMetadata) values[i]; ! final int nsegs = val.segs.length; ! os.writeInt(val.partId); ! os.writeInt(val.nextSegId); ! ! os.writeInt(nsegs); ! for (int j = 0; j < nsegs; j++) { ! SegmentMetadata segmentMetadata = val.segs[j]; ! os.writeUTF(segmentMetadata.filename); ! os.writeLong(segmentMetadata.nbytes); ! os.writeInt(segmentMetadata.state.valueOf()); ! os.writeLong(segmentMetadata.uuid.getMostSignificantBits()); ! os.writeLong(segmentMetadata.uuid.getLeastSignificantBits()); } --- 272,317 ---- public void putValues(DataOutputStream os, Object[] values, int nvals) throws IOException { ! for (int i = 0; i < nvals; i++) { PartitionMetadata val = (PartitionMetadata) values[i]; ! final int nservices = val.dataServices.length; ! ! final int nresources = val.resources.length; ! os.writeInt(val.partId); + + os.writeInt(nservices); ! os.writeInt(nresources); ! for( int j=0; j<nservices; j++) { ! ! final UUID serviceUUID = val.dataServices[j]; ! ! os.writeLong(serviceUUID.getMostSignificantBits()); ! ! os.writeLong(serviceUUID.getLeastSignificantBits()); ! ! } ! ! for (int j = 0; j < nresources; j++) { ! IResourceMetadata rmd = val.resources[j]; ! os.writeBoolean(rmd.isIndexSegment()); ! ! os.writeUTF(rmd.getFile()); ! os.writeLong(rmd.size()); ! os.writeInt(rmd.state().valueOf()); ! final UUID resourceUUID = rmd.getUUID(); ! os.writeLong(resourceUUID.getMostSignificantBits()); ! ! os.writeLong(resourceUUID.getLeastSignificantBits()); } *************** *** 334,360 **** final int partId = is.readInt(); ! final int nextSegId = is.readInt(); ! ! final int nsegs = is.readInt(); ! PartitionMetadata val = new PartitionMetadata(partId, ! nextSegId, new SegmentMetadata[nsegs]); ! for (int j = 0; j < nsegs; j++) { String filename = is.readUTF(); long nbytes = is.readLong(); ! ResourceState state = ResourceState ! .valueOf(is.readInt()); UUID uuid = new UUID(is.readLong()/*MSB*/,is.readLong()/*LSB*/); ! val.segs[j] = new SegmentMetadata(filename, nbytes, state, uuid); } ! values[i] = val; } --- 328,364 ---- final int partId = is.readInt(); ! final int nservices = is.readInt(); ! final int nresources = is.readInt(); ! ! final UUID[] services = new UUID[nservices]; ! ! final IResourceMetadata[] resources = new IResourceMetadata[nresources]; ! for (int j = 0; j < nservices; j++) { + services[j] = new UUID(is.readLong()/*MSB*/,is.readLong()/*LSB*/); + + } + + for (int j = 0; j < nresources; j++) { + + boolean isIndexSegment = is.readBoolean(); + String filename = is.readUTF(); long nbytes = is.readLong(); ! ResourceState state = ResourceState.valueOf(is.readInt()); UUID uuid = new UUID(is.readLong()/*MSB*/,is.readLong()/*LSB*/); ! resources[j] = (isIndexSegment ? new SegmentMetadata( ! filename, nbytes, state, uuid) ! : new JournalMetadata(filename, nbytes, state, uuid)); } ! values[i] = new PartitionMetadata(partId, services, resources); } Index: MasterJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/MasterJournal.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** MasterJournal.java 27 Mar 2007 17:11:41 -0000 1.5 --- MasterJournal.java 29 Mar 2007 17:01:33 -0000 1.6 *************** *** 371,375 **** * <code>segment.dir</code> - The property whose value is the name of * the top level directory beneath which new segment files will be ! * created. When not specified the default is a directory named "<i>segs" * in the directory named by the {@link #JOURNAL_DIR} property. */ --- 371,375 ---- * <code>segment.dir</code> - The property whose value is the name of * the top level directory beneath which new segment files will be ! * created. When not specified the default is a directory named "<i>resources" * in the directory named by the {@link #JOURNAL_DIR} property. */ *************** *** 444,448 **** val = properties.getProperty(Options.SEGMENT_DIR); ! segmentDir = val == null?new File(journalDir,"segs"):new File(val); if(segmentDir.exists() && !segmentDir.isDirectory()) { --- 444,448 ---- val = properties.getProperty(Options.SEGMENT_DIR); ! segmentDir = val == null?new File(journalDir,"resources"):new File(val); if(segmentDir.exists() && !segmentDir.isDirectory()) { *************** *** 523,527 **** /* ! * Create the initial slave slave. */ this.slave = createSlave(this,properties); --- 523,527 ---- /* ! * Create the initial slave journal. */ this.slave = createSlave(this,properties); *************** *** 631,636 **** * respects transaction isolation. */ ! public void overflow() { ! /* * Create the new buffer. --- 631,640 ---- * respects transaction isolation. */ ! public boolean overflow() { ! ! System.err.println("*** Overflow *** "); ! ! final Object state = willOverflow(); ! /* * Create the new buffer. *************** *** 675,681 **** --- 679,694 ---- // immediate shutdown of the old journal. oldJournal.closeAndDelete(); + + didOverflow(state); + + // handled overflow by opening a new journal. + return true; } + protected Object willOverflow() {return null;} + + protected void didOverflow(Object state) {} + /** * Does not queue indices for eviction, forcing the old journal to remain *************** *** 901,907 **** final PartitionMetadata pmd = mdi.get(separatorKey); - // the next segment identifier to be assigned. - final int segId = pmd.nextSegId; - if (pmd.getLiveCount()==0) { --- 914,917 ---- *************** *** 917,926 **** */ ! File outFile = getSegmentFile(name,pmd.partId,segId); IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, tmpDir, oldIndex.btree.getEntryCount(), oldIndex.btree ! .getRoot().entryIterator(), mseg, ! Value.Serializer.INSTANCE, true/* useChecksum */, null/* new RecordCompressor() */, 0d, oldIndex.btree .getIndexUUID()); --- 927,937 ---- */ ! File outFile = getSegmentFile(name,pmd.partId); IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, tmpDir, oldIndex.btree.getEntryCount(), oldIndex.btree ! .getRoot().entryIterator(), mseg, oldIndex ! .getBTree().getNodeSerializer() ! .getValueSerializer(), true/* useChecksum */, null/* new RecordCompressor() */, 0d, oldIndex.btree .getIndexUUID()); *************** *** 929,933 **** * update the metadata index for this partition. */ ! mdi.put(separatorKey, new PartitionMetadata(0, segId + 1, new SegmentMetadata[] { new SegmentMetadata("" + outFile, outFile.length(), ResourceState.Live, --- 940,944 ---- * update the metadata index for this partition. */ ! mdi.put(separatorKey, new PartitionMetadata(0, pmd.dataServices, new SegmentMetadata[] { new SegmentMetadata("" + outFile, outFile.length(), ResourceState.Live, *************** *** 955,959 **** // output file for the merged segment. ! File outFile = getSegmentFile(name, pmd.partId, segId); // merge the data from the btree on the slave and the index --- 966,970 ---- // output file for the merged segment. ! File outFile = getSegmentFile(name, pmd.partId); // merge the data from the btree on the slave and the index *************** *** 965,969 **** IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, null, mergeItr.nentries, new MergedEntryIterator(mergeItr), ! mseg, oldIndex.btree.getNodeSerializer() .getValueSerializer(), false/* useChecksum */, null/* recordCompressor */, 0d/* errorRate */, --- 976,980 ---- IndexSegmentBuilder builder = new IndexSegmentBuilder(outFile, null, mergeItr.nentries, new MergedEntryIterator(mergeItr), ! mseg, oldIndex.getBTree().getNodeSerializer() .getValueSerializer(), false/* useChecksum */, null/* recordCompressor */, 0d/* errorRate */, *************** *** 999,1003 **** // assume only the last segment is live. ! final SegmentMetadata oldSeg = pmd.segs[pmd.segs.length-1]; newSegs[0] = new SegmentMetadata(oldSeg.filename, oldSeg.nbytes, --- 1010,1014 ---- // assume only the last segment is live. ! final SegmentMetadata oldSeg = (SegmentMetadata)pmd.resources[pmd.resources.length-1]; newSegs[0] = new SegmentMetadata(oldSeg.filename, oldSeg.nbytes, *************** *** 1007,1011 **** .length(), ResourceState.Live, builder.segmentUUID); ! mdi.put(separatorKey, new PartitionMetadata(0, segId + 1, newSegs)); // /* --- 1018,1022 ---- .length(), ResourceState.Live, builder.segmentUUID); ! mdi.put(separatorKey, new PartitionMetadata(0, pmd.dataServices, newSegs)); // /* *************** *** 1026,1032 **** // assuming at most one dead and one live segment. ! if(pmd.segs.length>1) { ! final SegmentMetadata deadSeg = pmd.segs[0]; if(deadSeg.state!=ResourceState.Dead) { --- 1037,1043 ---- // assuming at most one dead and one live segment. ! if(pmd.resources.length>1) { ! final SegmentMetadata deadSeg = (SegmentMetadata)pmd.resources[0]; if(deadSeg.state!=ResourceState.Dead) { *************** *** 1068,1073 **** * The unique within index partition identifier - see * {@link PartitionMetadata#partId}. - * @param segId - * The unique within partition segment identifier. * * @todo munge the index name so that we can support unicode index names in --- 1079,1082 ---- *************** *** 1077,1081 **** * segmentId in the filenames. */ ! protected File getSegmentFile(String name,int partId,int segId) { File parent = getPartitionDirectory(name,partId); --- 1086,1090 ---- * segmentId in the filenames. */ ! protected File getSegmentFile(String name,int partId) { File parent = getPartitionDirectory(name,partId); *************** *** 1087,1093 **** } ! File file = new File(parent, segId + SEG); ! ! return file; } --- 1096,1108 ---- } ! try { ! ! return File.createTempFile(name, SEG, parent); ! ! } catch(IOException ex) { ! ! throw new RuntimeException(ex); ! ! } } *************** *** 1320,1324 **** // open the file. IndexSegmentFileStore fileStore = new IndexSegmentFileStore( ! resource.getFile()); // load the btree. --- 1335,1339 ---- // open the file. IndexSegmentFileStore fileStore = new IndexSegmentFileStore( ! new File(resource.getFile())); // load the btree. |