From: Bryan T. <tho...@us...> - 2007-03-11 11:43:19
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv5433/src/java/com/bigdata/scaleup Modified Files: Name2MetadataAddr.java MetadataIndex.java SlaveJournal.java AbstractPartitionTask.java Added Files: PartitionedIndexView.java MasterJournal.java IsolatablePartitionedIndexView.java Removed Files: PartitionedIndex.java PartitionedJournal.java Log Message: Continued minor refactoring in line with model updates. --- NEW FILE: IsolatablePartitionedIndexView.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 12, 2007 */ package com.bigdata.scaleup; import com.bigdata.isolation.IIsolatableIndex; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.objndx.IEntryIterator; /** * A {@link PartitionedIndexView} that supports transactions and deletion * markers. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * FIXME implement; support processing of delete markers. They can exist in the mutable * btree and in index segments that are not either a clean first eviction or a * full compacting merge (e.g., they can still exist in a compacting merge if * there are other index segments or btrees that are part of a partition but are * not partitipating in the compacting merge). */ public class IsolatablePartitionedIndexView extends PartitionedIndexView implements IIsolatableIndex { /** * @param btree * @param mdi */ public IsolatablePartitionedIndexView(UnisolatedBTree btree, MetadataIndex mdi) { super(btree, mdi); throw new UnsupportedOperationException(); } public boolean contains(byte[] key) { // TODO Auto-generated method stub return false; } public Object insert(Object key, Object value) { // TODO Auto-generated method stub return null; } public Object lookup(Object key) { // TODO Auto-generated method stub return null; } public int rangeCount(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return 0; } public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub return null; } public Object remove(Object key) { // TODO Auto-generated method stub return null; } public void contains(int ntuples, byte[][] keys, boolean[] contains) { // TODO Auto-generated method stub } public void insert(int ntuples, byte[][] keys, Object[] values) { // TODO Auto-generated method stub } public void lookup(int ntuples, byte[][] keys, Object[] values) { // TODO Auto-generated method stub } public void remove(int ntuples, byte[][] keys, Object[] values) { // TODO Auto-generated method stub } } Index: MetadataIndex.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/MetadataIndex.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** MetadataIndex.java 6 Mar 2007 20:38:06 -0000 1.5 --- MetadataIndex.java 11 Mar 2007 11:42:42 -0000 1.6 *************** *** 44,50 **** package com.bigdata.scaleup; ! import com.bigdata.objndx.BTree; import com.bigdata.objndx.BTreeMetadata; import com.bigdata.rawstore.IRawStore; --- 44,53 ---- package com.bigdata.scaleup; ! import com.bigdata.isolation.IsolatedBTree; ! import com.bigdata.journal.Journal; ! import com.bigdata.journal.Tx; import com.bigdata.objndx.BTree; import com.bigdata.objndx.BTreeMetadata; + import com.bigdata.objndx.IndexSegment; import com.bigdata.rawstore.IRawStore; *************** *** 64,67 **** --- 67,76 ---- * @todo mutation operations need to be synchronized. * + * @todo Track which {@link IndexSegment}s and {@link Journal}s are required + * to support the {@link IsolatedBTree}s in use by a {@link Tx}. Deletes + * of old journals and index segments MUST be deferred until no + * transaction remains which can read those data. This metadata must be + * restart-safe so that resources are eventually deleted. + * * @todo define a UUID so that is at least possible to rename a partitioned * index? the uuid would be store in the metadata record for the metadata *************** *** 78,82 **** /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndex} was registered. */ private final String name; --- 87,91 ---- /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndexView} was registered. */ private final String name; *************** *** 84,88 **** /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndex} was registered. */ final public String getName() { --- 93,97 ---- /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndexView} was registered. */ final public String getName() { *************** *** 101,105 **** * @param name * The name of the metadata index - this MUST be the name under ! * which the corresponding {@link PartitionedIndex} was * registered. */ --- 110,114 ---- * @param name * The name of the metadata index - this MUST be the name under ! * which the corresponding {@link PartitionedIndexView} was * registered. */ *************** *** 139,143 **** /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndex} was registered. */ public final String name; --- 148,152 ---- /** * The name of the metadata index, which is the always the same as the name ! * under which the corresponding {@link PartitionedIndexView} was registered. */ public final String name; --- PartitionedJournal.java DELETED --- --- NEW FILE: MasterJournal.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations [...1306 lines suppressed...] } return seg; } /** * The maximum #of index segments that will be held open without a hard * reference existing for that index segment in the application. */ final int INDEX_SEGMENT_LRU_CAPACITY = 5; /** * A cache for recently used index segments designed to prevent their being * swept by the VM between uses. */ private final WeakValueCache<String/*filename*/, IndexSegment> resourceCache; } Index: AbstractPartitionTask.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/AbstractPartitionTask.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** AbstractPartitionTask.java 8 Mar 2007 18:14:06 -0000 1.1 --- AbstractPartitionTask.java 11 Mar 2007 11:42:42 -0000 1.2 *************** *** 100,104 **** IPartitionTask { ! protected final PartitionedJournal master; /** * Branching factor used for generated {@link IndexSegment}(s). --- 100,104 ---- IPartitionTask { ! protected final MasterJournal master; /** * Branching factor used for generated {@link IndexSegment}(s). *************** *** 171,175 **** * correct separator keys must be scheduled. */ ! public AbstractPartitionTask(PartitionedJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey) { --- 171,175 ---- * correct separator keys must be scheduled. */ ! public AbstractPartitionTask(MasterJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey) { *************** *** 227,231 **** * The output segment identifier. */ ! public BuildTask(PartitionedJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, IResourceMetadata src, int segId) { --- 227,231 ---- * The output segment identifier. */ ! public BuildTask(MasterJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, IResourceMetadata src, int segId) { *************** *** 290,294 **** * True iff this will be a full compacting merge. */ ! protected AbstractMergeTask(PartitionedJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, int segId, --- 290,294 ---- * True iff this will be a full compacting merge. */ ! protected AbstractMergeTask(MasterJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, int segId, *************** *** 429,433 **** * The output segment identifier. */ ! public MergeTask(PartitionedJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, IResourceMetadata[] resources, --- 429,433 ---- * The output segment identifier. */ ! public MergeTask(MasterJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, IResourceMetadata[] resources, *************** *** 483,487 **** * merge operation. */ ! public FullMergeTask(PartitionedJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, long commitTime, int segId) { --- 483,487 ---- * merge operation. */ ! public FullMergeTask(MasterJournal master, String name, int branchingFactor, double errorRate, int partId, byte[] fromKey, byte[] toKey, long commitTime, int segId) { *************** *** 496,500 **** protected IResourceMetadata[] getResources() { ! final PartitionedIndex oldIndex = ((PartitionedIndex) master .getIndex(name, commitTime)); --- 496,500 ---- protected IResourceMetadata[] getResources() { ! final PartitionedIndexView oldIndex = ((PartitionedIndexView) master .getIndex(name, commitTime)); Index: SlaveJournal.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/SlaveJournal.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** SlaveJournal.java 8 Mar 2007 18:14:06 -0000 1.3 --- SlaveJournal.java 11 Mar 2007 11:42:42 -0000 1.4 *************** *** 47,50 **** --- 47,51 ---- import java.util.Properties; + import com.bigdata.isolation.IIsolatableIndex; import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.IJournal; *************** *** 52,56 **** import com.bigdata.journal.Name2Addr; import com.bigdata.objndx.BTree; - import com.bigdata.objndx.BTreeMetadata; import com.bigdata.objndx.IEntryIterator; import com.bigdata.objndx.IIndex; --- 53,56 ---- *************** *** 82,88 **** protected Name2MetadataAddr name2MetadataAddr; ! private final PartitionedJournal master; ! protected PartitionedJournal getMaster() { return master; --- 82,88 ---- protected Name2MetadataAddr name2MetadataAddr; ! private final MasterJournal master; ! protected MasterJournal getMaster() { return master; *************** *** 90,94 **** } ! public SlaveJournal(PartitionedJournal master,Properties properties) { super(properties); --- 90,94 ---- } ! public SlaveJournal(MasterJournal master,Properties properties) { super(properties); *************** *** 155,159 **** */ ! name2MetadataAddr = (Name2MetadataAddr)BTreeMetadata .load(this, addr); --- 155,159 ---- */ ! name2MetadataAddr = (Name2MetadataAddr)BTree .load(this, addr); *************** *** 166,172 **** /** ! * Registers and returns a {@link PartitionedIndex} under the given name and * assigns an {@link UnisolatedBTree} to absorb writes for that ! * {@link PartitionedIndex}. The resulting index will support transactional * isolation. * <p> --- 166,172 ---- /** ! * Registers and returns a {@link PartitionedIndexView} under the given name and * assigns an {@link UnisolatedBTree} to absorb writes for that ! * {@link PartitionedIndexView}. The resulting index will support transactional * isolation. * <p> *************** *** 176,181 **** * zero {@link IndexSegment}s. * <p> ! * Note: The returned object is invalid once the {@link PartitionedJournal} ! * {@link PartitionedJournal#overflow()}s. * <p> * Note: You MUST {@link #commit()} before the registered index will be --- 176,181 ---- * zero {@link IndexSegment}s. * <p> ! * Note: The returned object is invalid once the {@link MasterJournal} ! * {@link MasterJournal#overflow()}s. * <p> * Note: You MUST {@link #commit()} before the registered index will be *************** *** 189,195 **** /** ! * Registers and returns a {@link PartitionedIndex} under the given name and * assigns the supplied {@link IIndex} to absorb writes for that ! * {@link PartitionedIndex}. * <p> * A {@link MetadataIndex} is also registered under the given name and an --- 189,195 ---- /** ! * Registers and returns a {@link PartitionedIndexView} under the given name and * assigns the supplied {@link IIndex} to absorb writes for that ! * {@link PartitionedIndexView}. * <p> * A {@link MetadataIndex} is also registered under the given name and an *************** *** 198,203 **** * zero {@link IndexSegment}s. * <p> ! * Note: The returned object is invalid once the {@link PartitionedJournal} ! * {@link PartitionedJournal#overflow()}s. * <p> * Note: You MUST {@link #commit()} before the registered index will be --- 198,203 ---- * zero {@link IndexSegment}s. * <p> ! * Note: The returned object is invalid once the {@link MasterJournal} ! * {@link MasterJournal#overflow()}s. * <p> * Note: You MUST {@link #commit()} before the registered index will be *************** *** 243,272 **** /* ! * Now register the partitioned index on the super class. */ ! return super.registerIndex(name, new PartitionedIndex( ! (BTree) btree, mdi)); } public IIndex getIndex(String name) { IIndex index = super.getIndex(name); ! ! if(index instanceof BTree) { ! index = new PartitionedIndex((BTree)index,getMetadataIndex(name)); } return index; ! } /** ! * Return the {@link MetadataIndex} for the named {@link PartitionedIndex}. ! * This object is used to maintain the definitions of the partitions for ! * that index, including where each file is located that contains live data ! * for the index. */ public MetadataIndex getMetadataIndex(String name) { --- 243,314 ---- /* ! * Now register the view on the super class. The view will delegate ! * writes and the commit protocol to the btree specified by the caller, ! * but will support index partitions and will read from a fused view of ! * the resources for each index partition. */ ! return super.registerIndex(name, getView((BTree) btree, mdi)); } + /** + * Returns a {@link PartitionedIndexView} or + * {@link IsolatablePartitionedIndexView} depending on whether or not the + * named index supports isolation. + */ public IIndex getIndex(String name) { + /* + * Obtain the persistence capable index. If this is a cache hit, then it + * will be the view. Otherwise this will be either a BTree or an + * UnisolatedBTree that was just loaded from the store and we will have + * to wrap it up as a view. + */ IIndex index = super.getIndex(name); ! ! if (index instanceof BTree) { ! ! /* ! * Wrap up the mutable B+-Tree responsible for absorbing writes as ! * a view. ! */ ! MetadataIndex mdi = getMetadataIndex(name); + /* + * Choose the type of view based on whether or not the registered index + * supports isolation. + */ + index = getView((BTree)index,mdi); + } return index; ! } /** ! * Choose the type of view based on whether or not the registered index ! * supports isolation. ! */ ! private PartitionedIndexView getView(BTree btree, MetadataIndex mdi) { ! ! if (btree instanceof IIsolatableIndex) { ! ! return new IsolatablePartitionedIndexView((UnisolatedBTree) btree, ! mdi); ! } else { ! ! return new PartitionedIndexView((BTree) btree, mdi); ! ! } ! ! } ! ! /** ! * Return the {@link MetadataIndex} for the named ! * {@link PartitionedIndexView}. This object is used to maintain the ! * definitions of the partitions for that index, including where each ! * resource is located that contains data for each partition of the index. */ public MetadataIndex getMetadataIndex(String name) { *************** *** 279,292 **** /** ! * FIXME write tests. use to clean up the test suites. note that we can ! * not drop files until we have done an atomic commit. also note that * restart-safe removal of files is not going to happen without some ! * additional sophistication. one solution is to periodically compare ! * the named indices and the segments directory, either deleting or * flagging for an operator those things which do not belong. */ public void dropIndex(String name) { ! PartitionedIndex ndx = (PartitionedIndex) getIndex(name); if (ndx == null) --- 321,335 ---- /** ! * FIXME write tests. use to clean up the test suites. note that we can not ! * drop files until we have done an atomic commit. also note that * restart-safe removal of files is not going to happen without some ! * additional sophistication, eg, placing a task to schedule deletion of ! * resources onto the restart safe schedule. one solution is to periodically ! * compare the named indices and the segments directory, either deleting or * flagging for an operator those things which do not belong. */ public void dropIndex(String name) { ! PartitionedIndexView ndx = (PartitionedIndexView) getIndex(name); if (ndx == null) *************** *** 354,356 **** } ! } \ No newline at end of file --- 397,399 ---- } ! } --- NEW FILE: PartitionedIndexView.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Feb 7, 2007 */ package com.bigdata.scaleup; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; import com.bigdata.journal.ICommitter; import com.bigdata.journal.Journal; import com.bigdata.objndx.AbstractBTree; import com.bigdata.objndx.BTree; import com.bigdata.objndx.BatchContains; import com.bigdata.objndx.BatchInsert; import com.bigdata.objndx.BatchLookup; import com.bigdata.objndx.BatchRemove; import com.bigdata.objndx.EmptyEntryIterator; import com.bigdata.objndx.ReadOnlyFusedView; import com.bigdata.objndx.IEntryIterator; import com.bigdata.objndx.IFusedView; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.IndexSegment; /** * A mutable B+-Tree that is dynamically partitioned into one or more key * ranges. Each key range is a <i>partition</i>. A partition is defined by the * lowest value key that can enter that partition (the separator key). A * {@link MetadataIndex} contains the definitions of the partitions. Each * partition has a mutable {@link BTree} that will absorb writes for a * partition. The same {@link BTree} is used to absorb writes distined for any * partitions of the same index on the same {@link Journal}. This class * internally tracks whether or not a write has occurred on each partition. That * information is used to guide eviction decisions when the * {@link Journal#overflow()}s and buffered writes are migrated onto one or * more {@link IndexSegment}s each of which contains historical read-only data * for a single index partition. * <p> * Writes on a {@link PartitionIndex} simply write through to the {@link BTree} * corresponding to that index (having the same name) on the * {@link Journal journal} (they are effectively multiplexed on the journal). * <p> * Read operations process the spanned partitions sequence so that the * aggregated results are in index order. For each partition, the read request * is constructed using ah {@link IFusedView} of the mutable {@link BTree} used * to absorb writes and the live {@link IndexSegment}s for that partition. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo add a restart safe data structure tracking each partition for which * writes have been absorbed on the {@link Journal}. this could be just * an ordered set of partition identifiers for which writes have been * absorbed that is maintained by an insertion sort, an ordered array of * <partId:writeCounter>, or a btree mapping partition identifiers to * write counters. The data need to be associated as part of the metadata * for the btree that is absorbing writes for that partition. The simplest * way to do this is to subclass BTree or UnisolatedBTree so that they are * partition aware and automatically track the #of writes destined for * each partition. * * @todo For a scale-out solution the clients are responsible for talking with * the metadata service and locating the data service for each partition. * The client is responsible for directing the writes to the correct data * service. The data service simply fronts for the journal and directs the * writes onto the {@link BTree} absorbing writes for the named index. It * is a (client) error for a write to be directed to a {@link Journal} not * tasked with buffering data for the partition into which that write * should go. */ public class PartitionedIndexView implements IIndex, ICommitter { /** * The mutable {@link BTree} used to absorb all writes for the * {@link PartitionedIndexView}. */ protected final BTree btree; /** * The metadata index used to locate the partitions relevant to a given read * operation. */ private final MetadataIndex mdi; /** * A cache of the fused views for in use partitions. * * @todo reconcile this with * {@link MasterJournal#getIndex(String, IResourceMetadata)} * which provides a weak value cache for index segments. */ private final Map<Integer,ReadOnlyFusedView> views = new HashMap<Integer,ReadOnlyFusedView>(); public MasterJournal getMaster() { return getSlave().getMaster(); } public SlaveJournal getSlave() { return (SlaveJournal)getBTree().getStore(); } /** * The mutable {@link BTree} used to absorb writes. */ public BTree getBTree() { return btree; } /** * Opens and returns the live {@link IndexSegment}s for the partition. * These {@link IndexSegments} MUST be explicitly closed in order to reclaim * resources and release the lock on the backing files. * * @param pmd * The partition. * * @return The live {@link IndexSegment}s for that partition. */ protected IndexSegment[] openIndexSegments(PartitionMetadata pmd) { final int liveCount = pmd.getLiveCount(); IndexSegment[] segs = new IndexSegment[liveCount]; MasterJournal master = getMaster(); int n = 0; for(int i=0; i<pmd.segs.length; i++) { if(pmd.segs[i].state != ResourceState.Live) continue; segs[n++] = (IndexSegment) master.getIndex(getName(), pmd.segs[i]); } assert n == liveCount; return segs; } /** * Close all open views, including any backing index segments. */ protected void closeViews() { Iterator<Map.Entry<Integer,ReadOnlyFusedView>> itr = views.entrySet().iterator(); while(itr.hasNext()) { ReadOnlyFusedView view = itr.next().getValue(); // @todo assumes one open segment per view. IndexSegment seg = (IndexSegment)view.srcs[1]; seg.close(); } views.clear(); } /** * Return a fused view on the partition in which the key would be found. The * view is cached and will be reused on subsequent requests. This operation * can have high latency if the view for the partition is not in the cache. * * @param key * The search key. * * @return The fused view. * * @todo use an weak value LRU policy to close out partitions that are not * being actively used. note that we still need to know which * partitions have been touched on a {@link PartitionedIndexView} when we * export its data in response to a journal overflow event. This * suggests a hard reference queue to hold onto recently used views * and a persistent bitmap or flag in the metadata index to identify * those partitions for which we need to export data. Note that we * MUST hold onto any view that is currently being used to report data * to the application, e.g., by a striterator. This means that we need * to use a hash map with weak or soft reference values. * <p> * Alternatively, perhaps we can write a custom iterator over the * mutable btree for the index that automatically exports or performs * a compacting merge for each partition for which it encounters data * on an entry scan. */ protected IIndex getView(byte[] key) { PartitionMetadata pmd = mdi.find(key); ReadOnlyFusedView view = views.get(pmd.partId); if(view==null) { if(pmd.getLiveCount()==0) { // the btree is the view. return btree; } /* * Open the live index segments for this partition (high latency). */ IndexSegment[] segs = openIndexSegments(pmd); AbstractBTree[] sources = new AbstractBTree[segs.length+1]; // the mutable btree. sources[0] = btree; // the immutable historical segments. System.arraycopy(segs, 0, sources, 1, segs.length); // create the fused view. view = new ReadOnlyFusedView(sources); // place the view in the cache. views.put(pmd.partId, view); } return view; } /** * Return the resources required to provide a coherent view of the partition * in which the key is found. * * @param key * The key. * * @return The resources required to read on the partition containing that * key. The resources are arranged in reverse timestamp order * (increasing age). * * @todo reconcile with {@link #getView(byte[])}? */ protected IResourceMetadata[] getResources(byte[] key) { JournalMetadata journalResource = new JournalMetadata((Journal) btree .getStore(), ResourceState.Live); PartitionMetadata pmd = mdi.find(key); final int liveCount = pmd.getLiveCount(); IResourceMetadata[] resources = new IResourceMetadata[liveCount + 1]; int n = 0; resources[n++] = journalResource; for (int i = 0; i < resources.length; i++) { SegmentMetadata seg = pmd.segs[i]; if (seg.state != ResourceState.Live) continue; resources[n++] = seg; } assert n == resources.length; return resources; } public String getName() { return mdi.getName(); } /** * @param btree * The mutable {@link BTree} used to absorb all writes for the * {@link PartitionedIndexView}. * @param mdi * The metadata index used to locate the partitions relevant to a * given read operation. */ public PartitionedIndexView(BTree btree, MetadataIndex mdi) { this.btree = btree; this.mdi = mdi; } /* * Non-batch API. * * The write operations are trivial since we always direct them to the named * btree on the journal. * * The point read operations are also trivial conceptually. The are always * directed to the fused view of the mutable btree on the journal and the * live index segments for the partition that spans the search key. * * The rangeCount and rangeIterator operations are more complex. The need to * be processed by each partition spanned by the from/to key range. We need * to present the operation to each partition in turn and append the results * together. */ public Object insert(Object key, Object value) { return btree.insert(key, value); } public Object remove(Object key) { return btree.remove(key); } public boolean contains(byte[] key) { return getView(key).contains(key); } public Object lookup(Object key) { return getView((byte[])key).lookup(key); } /** * For each partition spanned by the key range, report the sum of the counts * for each source in the view for that partition (since this is based on * {@link ReadOnlyFusedView}s for each partition, it will may overcount the #of * entries actually in the range on each partition). * <p> * If the count would exceed {@link Integer#MAX_VALUE} then the result is * {@link Integer#MAX_VALUE}. */ public int rangeCount(byte[] fromKey, byte[] toKey) { // index of the first partition to check. final int fromIndex = (fromKey == null ? 0 : mdi.findIndexOf(fromKey)); // index of the last partition to check. final int toIndex = (toKey == null ? 0 : mdi.findIndexOf(toKey)); // per javadoc, keys out of order returns zero(0). if(toIndex<fromIndex) return 0; // use to counters so that we can look for overflow. int count = 0; int lastCount = 0; for( int index = fromIndex; index<=toIndex; index++) { // The first key that would enter the nth partition. byte[] separatorKey = mdi.keyAt(index); // Add in the count from that partition. count += getView(separatorKey).rangeCount(fromKey, toKey); if(count<lastCount) { return Integer.MAX_VALUE; } lastCount = count; } return count; } /** * Return an iterator that visits all values in the key range in key order. * The iterator will visit each partition spanned by the key range in turn. */ public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // index of the first partition to check. final int fromIndex = (fromKey == null ? 0 : mdi.findIndexOf(fromKey)); // index of the last partition to check. final int toIndex = (toKey == null ? 0 : mdi.findIndexOf(toKey)); // keys are out of order. if(toIndex<fromIndex) return EmptyEntryIterator.INSTANCE; // iterator that will visit all key/vals in that range. return new PartitionedRangeIterator(fromKey, toKey, fromIndex, toIndex); } /* * Batch API. * * The write operations are trivial since we always direct them to the named * btree on the journal. * * The read operations are more complex. We need to partition the set of * keys based on the partitions to which those keys would be directed. This * is basically a join against the {@link MetadataIndex}. This operation is * also required on the rangeCount and rangeIterator methods on the * non-batch api. */ public void insert(BatchInsert op) { btree.insert(op); } public void remove(BatchRemove op) { btree.remove(op); } // FIXME contains(batch) public void contains(BatchContains op) { throw new UnsupportedOperationException(); } // FIXME lookup(batch) public void lookup(BatchLookup op) { throw new UnsupportedOperationException(); } /** * An iterator that visits all key/value entries in a specified key range * across one or more partitions. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private class PartitionedRangeIterator implements IEntryIterator { /** * The first key to visit or null iff there is no lower bound. */ private final byte[] fromKey; /** * The first key NOT to visit or null iff there is no upper bound. */ private final byte[] toKey; /** * The index of the first partition spanned by the key range. */ private final int fromIndex; /** * The index of the last partition spanned by the key range. */ private final int toIndex; /** * The index of the partition whose entries are currently being visited. */ private int index; /** * The iterator for the current partition. */ private IEntryIterator src; /** * The last visited key and null iff we have not visited anything yet. */ private byte[] key = null; /** * The last visited value. */ private Object val = null; /** * * @param fromKey * The first key to visit or null iff there is no lower * bound. * @param toKey * The first key NOT to visit or null iff there is no upper * bound. * @param fromIndex * The index of the first partition spanned by the key range. * @param toIndex * The index of the last partition spanned by the key range. */ public PartitionedRangeIterator(byte[] fromKey, byte[] toKey, int fromIndex,int toIndex) { assert fromIndex >= 0; assert toIndex >= fromIndex; this.fromKey = fromKey; this.toKey = toKey; this.fromIndex = fromIndex; this.toIndex = toIndex; // the first partition to visit. this.index = fromIndex; // The first key that would enter that partition. byte[] separatorKey = mdi.keyAt(index); // The rangeIterator for that partition. src = getView(separatorKey).rangeIterator(fromKey, toKey); } public boolean hasNext() { if(src.hasNext()) return true; // The current partition has been exhausted. if(index < toIndex) { // the next partition to visit. index++; // The first key that would enter that partition. byte[] separatorKey = mdi.keyAt(index); // The rangeIterator for that partition. src = getView(separatorKey).rangeIterator(fromKey, toKey); return src.hasNext(); } else { // All partitions have been exhausted. return false; } } public Object next() { if (!hasNext()) { throw new NoSuchElementException(); } /* * eagerly fetch the key and value so that we can report them using * getKey() and getValue() */ val = src.next(); key = src.getKey(); return val; } public Object getValue() { if (key == null) { // nothing has been visited yet. throw new IllegalStateException(); } return val; } public byte[] getKey() { if (key == null) { // nothing has been visited yet. throw new IllegalStateException(); } return key; } /** * Not supported. * * @exception UnsupportedOperationException * Always. */ public void remove() { throw new UnsupportedOperationException(); } } /** * The commit is delegated to the mutable B+-Tree since that is where all * the writes are absorbed. */ public long handleCommit() { return btree.handleCommit(); } } Index: Name2MetadataAddr.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/scaleup/Name2MetadataAddr.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** Name2MetadataAddr.java 17 Feb 2007 21:34:21 -0000 1.3 --- Name2MetadataAddr.java 11 Mar 2007 11:42:41 -0000 1.4 *************** *** 50,57 **** import com.bigdata.journal.Name2Addr; import com.bigdata.objndx.BTreeMetadata; - import com.bigdata.objndx.IIndex; import com.bigdata.rawstore.IRawStore; /** * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ --- 50,59 ---- import com.bigdata.journal.Name2Addr; import com.bigdata.objndx.BTreeMetadata; import com.bigdata.rawstore.IRawStore; /** + * Extension of {@link Name2Addr} for locating the {@link MetadataIndex} + * associated with a named {@link PartitionedIndexView}. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ *************** *** 70,79 **** } - - protected IIndex loadBTree(IRawStore store, String name, long addr) { - - return (MetadataIndex)BTreeMetadata.load(this.store, addr); - - } } --- 72,75 ---- --- PartitionedIndex.java DELETED --- |