From: Bryan T. <tho...@us...> - 2007-04-25 16:05:45
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv11220/src/java/com/bigdata/service Modified Files: MetadataService.java ClientIndexView.java BigdataClient.java IMetadataService.java Log Message: Implemented range count for partitioned indices. Index: BigdataClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/BigdataClient.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** BigdataClient.java 25 Apr 2007 14:57:11 -0000 1.2 --- BigdataClient.java 25 Apr 2007 16:05:37 -0000 1.3 *************** *** 51,55 **** import java.rmi.RemoteException; import java.util.UUID; - import java.util.concurrent.ExecutionException; import net.jini.config.Configuration; --- 51,54 ---- *************** *** 106,109 **** --- 105,113 ---- * transaction identifier for its read and write operations. * + * @todo write tests where an index is static partitioned over multiple data + * services and verify that the {@link ClientIndexView} is consistent. + * <p> + * Work towards the same guarentee when dynamic partitioning is enabled. + * * @todo support transactions (there is no transaction manager service yet and * the 2-/3-phase commit protocol has not been implemented on the *************** *** 112,117 **** * @todo Write or refactor logic to map operations across multiple partitions. * ! * FIXME Support factory for indices, reuse cached information across ! * transactional and non-transactional views of the same index. * * @todo Use lambda expressions (and downloaded code) for server-side logic for --- 116,121 ---- * @todo Write or refactor logic to map operations across multiple partitions. * ! * @todo reuse cached information across transactional and non-transactional ! * views of the same index. * * @todo Use lambda expressions (and downloaded code) for server-side logic for *************** *** 598,625 **** /** ! * @todo setup cache. test cache and lookup on metadata service if a ! * cache miss. * * @param name * @param key * @return */ ! public IPartitionMetadata getPartition(String name, byte[] key) { ! ! // synchronized(indexCache) { ! // ! // Map<Integer,IDataService> partitionCache = indexCache.get(name); ! // ! // if(partitionCache==null) { ! // ! // partitionCache = new ConcurrentHashMap<Integer, IDataService>(); ! // ! // indexCache.put(name, partitionCache); ! // ! // } ! // ! // IDataService dataService = ! // ! // } /* --- 602,649 ---- /** ! * @todo setup cache. test cache and lookup on metadata service if a ! * cache miss. the cache should be based on a lease and the data ! * service should know whether an index partition has been moved ! * and notify the client that it needs to re-discover the data ! * service for an index partition. the cache is basically a ! * partial copy of the metadata index that is local to the client. ! * The cache needs to have "fake" entries that are the ! * left-sibling of each real partition entry so that it can ! * correctly determine when there is a cache miss. ! * <p> ! * Note that index partition definitions will evolve slowly over ! * time through splits and joins of index segments. again, the ! * client should presume consistency of its information but the ! * data service should know when it no longer has information for ! * a key range in a partition (perhaps passing the partitionId and ! * a timestamp for the last partition update to the data service ! * with each request). ! * <p> ! * Provide a historical view of the index partition definitions ! * when transactional isolation is in use by the client. This ! * should make it possible for a client to not be perturbed by ! * split/joins of index partitions when executing with ! * transactional isolation. ! * <p> ! * Note that service failover is at least partly orthogonal to the ! * partition metadata in as much as the index partition ! * definitions themselves do not evolve (the same separator keys ! * are in place and the same resources have the consistent data ! * for a view of the index partition), but it is possible that the ! * data services have changed. It is an open question how to ! * maintain isolation with failover while supporting failover ! * without aborting the transaction. The most obvious thing is to ! * have a transationally isolated client persue the failover ! * services already defined in the historical transaction without ! * causing the partition metadata to be updated on the metadata ! * service. (Unisolated clients would begin to see updated ! * partition metadata more or immediately.) * + * @param tx * @param name * @param key * @return */ ! public IPartitionMetadata getPartition(long tx, String name, byte[] key) { /* *************** *** 632,636 **** try { ! pmd = getMetadataService().getPartition(name, key); } catch(Exception ex) { --- 656,664 ---- try { ! byte[] val = getMetadataService().getPartition(name, key); ! ! if(val ==null) return null; ! ! pmd = (IPartitionMetadata) SerializerUtil.deserialize(val); } catch(Exception ex) { *************** *** 645,648 **** --- 673,692 ---- // // private Map<String, Map<Integer, IDataService>> indexCache = new ConcurrentHashMap<String, Map<Integer, IDataService>>(); + // + // synchronized(indexCache) { + // + // Map<Integer,IDataService> partitionCache = indexCache.get(name); + // + // if(partitionCache==null) { + // + // partitionCache = new ConcurrentHashMap<Integer, IDataService>(); + // + // indexCache.put(name, partitionCache); + // + // } + // + // IDataService dataService = + // + // } /** Index: ClientIndexView.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/ClientIndexView.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** ClientIndexView.java 25 Apr 2007 14:57:10 -0000 1.3 --- ClientIndexView.java 25 Apr 2007 16:05:37 -0000 1.4 *************** *** 57,61 **** --- 57,63 ---- import com.bigdata.btree.IEntryIterator; import com.bigdata.btree.IIndex; + import com.bigdata.io.SerializerUtil; import com.bigdata.scaleup.IPartitionMetadata; + import com.bigdata.scaleup.MetadataIndex; import com.bigdata.scaleup.PartitionedIndexView; import com.bigdata.service.BigdataClient.BigdataFederation; *************** *** 65,68 **** --- 67,74 ---- * A client-side view of an index. * + * @todo consider writing a client interface to the {@link MetadataIndex} so + * that this code can look identifical to the code that we would write if + * the metdata index was local. + * * @todo cache leased information about index partitions of interest to the * client. The cache will be a little tricky since we need to know when *************** *** 175,179 **** public boolean contains(byte[] key) { ! IPartitionMetadata pmd = fed.getPartition(name, key); IDataService dataService = fed.getDataService(pmd); --- 181,185 ---- public boolean contains(byte[] key) { ! IPartitionMetadata pmd = fed.getPartition(tx,name, key); IDataService dataService = fed.getDataService(pmd); *************** *** 197,201 **** public Object insert(Object key, Object value) { ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); IDataService dataService = fed.getDataService(pmd); --- 203,207 ---- public Object insert(Object key, Object value) { ! IPartitionMetadata pmd = fed.getPartition(tx,name, (byte[])key); IDataService dataService = fed.getDataService(pmd); *************** *** 223,227 **** public Object lookup(Object key) { ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); IDataService dataService = fed.getDataService(pmd); --- 229,233 ---- public Object lookup(Object key) { ! IPartitionMetadata pmd = fed.getPartition(tx,name, (byte[])key); IDataService dataService = fed.getDataService(pmd); *************** *** 246,250 **** public Object remove(Object key) { ! IPartitionMetadata pmd = fed.getPartition(name, (byte[])key); IDataService dataService = fed.getDataService(pmd); --- 252,256 ---- public Object remove(Object key) { ! IPartitionMetadata pmd = fed.getPartition(tx,name, (byte[])key); IDataService dataService = fed.getDataService(pmd); *************** *** 286,319 **** public int rangeCount(byte[] fromKey, byte[] toKey) { ! IPartitionMetadata pmd1 = fed.getPartition(name, (byte[])fromKey); ! ! IPartitionMetadata pmd2 = fed.getPartition(name, (byte[])toKey); ! if(pmd2.getPartitionId()!=pmd1.getPartitionId()) { ! throw new UnsupportedOperationException( ! "Can not span partitions at this time"); } - - IDataService dataService = fed.getDataService(pmd1); ! int rangeCount = 0; ! ! try { ! rangeCount += dataService.rangeCount(IDataService.UNISOLATED, name, ! fromKey, toKey); ! ! } catch(Exception ex) { ! throw new RuntimeException(ex); } ! ! return rangeCount; } public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub --- 292,388 ---- public int rangeCount(byte[] fromKey, byte[] toKey) { ! IMetadataService metadataService = getMetadataService(); ! final int fromIndex; ! final int toIndex; ! try { ! ! // index of the first partition to check. ! fromIndex = (fromKey == null ? 0 : metadataService ! .findIndexOfPartition(name, fromKey)); ! // index of the last partition to check. ! toIndex = (toKey == null ? 0 : metadataService ! .findIndexOfPartition(name, toKey)); ! ! } catch (IOException ex) { ! ! throw new RuntimeException(ex); } ! // per javadoc, keys out of order returns zero(0). ! if (toIndex < fromIndex) ! return 0; ! // use to counters so that we can look for overflow. ! int count = 0; ! int lastCount = 0; ! for (int index = fromIndex; index <= toIndex; index++) { ! ! IPartitionMetadata pmd; ! ! try { ! ! byte[] tmp = metadataService.getPartitionAtIndex(name, ! fromIndex); ! ! if (tmp == null) ! throw new AssertionError(); ! ! pmd = (IPartitionMetadata) SerializerUtil.deserialize(tmp); ! ! } catch(IOException ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! // // The first key that would enter the nth partition. ! // byte[] separatorKey = mdi.keyAt(index); ! ! IDataService dataService = fed.getDataService(pmd); ! ! try { ! ! /* ! * Add in the count from that partition. ! * ! * @todo modify to request only the range count that actually ! * lies within the partition so that the data service can check ! * the range and notify clients that appear to be requesting ! * data for index partitions that have been relocated. ! */ ! ! count += dataService.rangeCount(tx, name, fromKey, toKey); ! ! } catch(Exception ex) { ! ! throw new RuntimeException(ex); ! ! } ! ! if(count<lastCount) { ! ! // more than would fit in an Integer. ! return Integer.MAX_VALUE; ! ! } ! ! lastCount = count; } ! ! return count; } + /** + * FIXME provide an {@link IEntryIterator} that kinds the use of a series of + * {@link ResultSet}s to produce the range iterator. We need an outer loop + * over the partitions spanned by the key range and then an inner loop until + * we have exhausted the key range overlapping with each partition. + */ public IEntryIterator rangeIterator(byte[] fromKey, byte[] toKey) { // TODO Auto-generated method stub Index: IMetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMetadataService.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** IMetadataService.java 25 Apr 2007 14:57:10 -0000 1.7 --- IMetadataService.java 25 Apr 2007 16:05:37 -0000 1.8 *************** *** 73,118 **** */ public interface IMetadataService extends IDataService { ! ! // /** ! // * Return the partition metadata for the index partition that includes the ! // * specified key. ! // * ! // * @param key ! // * The key. ! // * ! // * @return The metadata index partition in which that key is or would be ! // * located. ! // * ! // * @todo return lease for the index partition that would contain the key. ! // * ! // * @todo abstract away from Jini so that we can support other fabrics ! // * (OSGi/SCA). ! // * ! // * @todo Either the client or the metadata service should support ! // * pre-caching of some number of index partitions surrounding that ! // * partition. ! // * ! // * @todo do a variant that supports a key range - this should really just be ! // * the same as ! // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} ! // * with the client addressing the metadata index rather than the data ! // * index (likewise for this method as well). ! // * ! // * @todo update the {@link PartitionMetadata} data model to reflect a single ! // * point of responsibility with a media replication chain for ! // * failover. Either this method or a variant method needs to return ! // * the partition metadata itself so that {@link DataService}s can ! // * configure their downstream media replication pipelines. ! // */ ! // public PartitionMetadata getPartition(String name, byte[] key) throws IOException; ! // ! /* * methods that require access to the metadata server for their * implementations. */ /** ! * Return the UUID of an under utilized data service. */ public ServiceID getUnderUtilizedDataService() throws IOException; --- 73,90 ---- */ public interface IMetadataService extends IDataService { ! /* * methods that require access to the metadata server for their * implementations. + * + * @todo the tx identifier will have to be pass in for clients that want to + * use transactional isolation to achieve a consistent and stable view of + * the metadata index as of the start time of their transaction. */ /** ! * Return the identifier of an under utilized data service. ! * ! * @todo convert to return a UUID to kind Jini isolated from the core impl. */ public ServiceID getUnderUtilizedDataService() throws IOException; *************** *** 129,132 **** --- 101,106 ---- * * @throws IOException + * + * @todo convert serviceID to a UUID to keep Jini encapsulated. */ public IDataService getDataServiceByID(ServiceID serviceID) *************** *** 173,190 **** * @param key * The key. - * @return The metadata for the index partition in which that key would be - * found. - * - * @throws IOException * ! * FIXME offer a variant that reports the index partitions spanned by a key ! * range and write tests for that. Note that the remote API for that method ! * should use a result-set data model to efficiently communicate the data ! * when there are a large #of spanned partitions. * * @see MetadataIndex#find(byte[]) */ ! public IPartitionMetadata getPartition(String name, byte[] key) throws IOException; } --- 147,196 ---- * @param key * The key. * ! * @return The serialized {@link IPartitionMetadata} spanning the given key ! * or <code>null</code> if there are no partitions defined. ! * ! * @throws IOException * * @see MetadataIndex#find(byte[]) */ ! public byte[] getPartition(String name, byte[] key) throws IOException; + /** + * Find the index of the partition spanning the given key. + * + * @return The index of the partition spanning the given key or + * <code>-1</code> iff there are no partitions defined. + * + * @exception IllegalStateException + * if there are partitions defined but no partition spans the + * key. In this case the {@link MetadataIndex} lacks an entry + * for the key <code>new byte[]{}</code>. + */ + public int findIndexOfPartition(String name,byte[] key) throws IOException; + + /** + * The partition at that index. + * + * @param name + * The name of the scale-out index. + * @param index + * The entry index in the metadata index. + * + * @return The serialized {@link IPartitionMetadata} for that the entry with + * that index. + * + * @throws IOException + * + * @todo this is subject to concurrent modification of the metadata index + * would can cause the index to identify a different partition. client + * requests that use {@link #findIndexOfPartition(String, byte[])} and + * {@link #getPartitionAtIndex(String, int)} really need to refer to + * the same historical version of the metadata index (this effects + * range count and range iterator requests and to some extent batch + * operations that span multiple index partitions). + */ + public byte[] getPartitionAtIndex(String name, int index ) throws IOException; + } Index: MetadataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataService.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** MetadataService.java 25 Apr 2007 14:57:11 -0000 1.9 --- MetadataService.java 25 Apr 2007 16:05:37 -0000 1.10 *************** *** 73,76 **** --- 73,80 ---- * later. * + * @todo support transactionally isolated views onto the metadata index by + * passing in the tx identifier and using the appropriate historical view + * of the metadata index. + * * @todo Provide a means to reconstruct the metadata index from the journal and * index segment data files. We tag each journal and index segment with a *************** *** 155,159 **** } ! public IPartitionMetadata getPartition(String name,byte[] key) throws IOException { // the name of the metadata index itself. --- 159,168 ---- } ! ! /** ! * Note: This is equivilent to {@link MetadataIndex#find(byte[])} except that ! * it does not deserialize the {@link IPartitionMetadata}. ! */ ! public byte[] getPartition(String name,byte[] key) throws IOException { // the name of the metadata index itself. *************** *** 169,185 **** } ! /* ! * @todo this winds up deserializing the value into a PartitionMetadata ! * object and then re-serializing it to return to the remote client. ! */ ! IPartitionMetadata pmd = mdi.find(key); ! if( pmd == null ) { ! throw new IllegalStateException("No partitioned in index: "+name); } ! return pmd; } --- 178,233 ---- } ! final int index = mdi.findIndexOf(key); ! if(index == -1) return null; ! ! byte[] val = (byte[]) mdi.valueAt(index); ! ! return val; ! ! } ! ! /** ! * This is equivilent to {@link MetadataIndex#findIndexOf(byte[])}. ! */ ! public int findIndexOfPartition(String name,byte[] key) throws IOException { ! ! // the name of the metadata index itself. ! final String metadataName = getMetadataName(name); ! ! // make sure there is no metadata index for that btree. ! MetadataIndex mdi = (MetadataIndex) journal.getIndex(metadataName); ! ! if(mdi == null) { ! throw new IllegalArgumentException("Index not registered: " + name); } + + final int index = mdi.findIndexOf(key); + + return index; ! } ! ! public byte[] getPartitionAtIndex(String name, int index ) throws IOException { ! ! // the name of the metadata index itself. ! final String metadataName = getMetadataName(name); ! ! // make sure there is no metadata index for that btree. ! MetadataIndex mdi = (MetadataIndex) journal.getIndex(metadataName); ! ! if(mdi == null) { ! ! throw new IllegalArgumentException("Index not registered: " + name); ! ! } ! ! if(index == -1) return null; ! ! byte[] val = (byte[]) mdi.valueAt(index); ! ! return val; } |