|
From: Bryan T. <tho...@us...> - 2007-04-20 16:36:38
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv16005/src/java/com/bigdata/service Modified Files: AbstractServer.java DataServer.java EmbeddedDataService.java IDataService.java DataService.java DataServiceClient.java MetadataServer.java Log Message: Updated the IDataService interface and now have client talking over JERI to a data service instance for various btree operations, range count, and range query. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** DataServer.java 27 Mar 2007 14:34:23 -0000 1.4 --- DataServer.java 20 Apr 2007 16:36:27 -0000 1.5 *************** *** 52,55 **** --- 52,57 ---- import java.util.Properties; + import net.jini.config.Configuration; + import com.bigdata.journal.IJournal; import com.sun.jini.start.LifeCycle; *************** *** 58,71 **** * The bigdata data server. * - * @todo reduce the permissions required to start the server with the server - * starter. - * * @see src/resources/config for sample configurations. * - * @todo write tests against an standalone installation and then see what it - * looks like when the data services are running on more than one host. - * note that unisolated operations can be tested without a transaction - * server. - * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ --- 60,65 ---- *************** *** 74,78 **** --- 68,75 ---- /** + * Creates a new {@link DataServer}. + * * @param args + * The name of the {@link Configuration} file for the service. */ public DataServer(String[] args) { *************** *** 88,91 **** --- 85,94 ---- } + /** + * Starts a new {@link DataServer}. + * + * @param args + * The name of the {@link Configuration} file for the service. + */ public static void main(String[] args) { *************** *** 104,108 **** * service. */ ! protected void destroy() { DataService service = (DataService)impl; --- 107,111 ---- * service. */ ! public void destroy() { DataService service = (DataService)impl; Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** DataService.java 16 Apr 2007 10:35:28 -0000 1.7 --- DataService.java 20 Apr 2007 16:36:27 -0000 1.8 *************** *** 48,55 **** --- 48,59 ---- package com.bigdata.service; + import java.io.Externalizable; import java.io.IOException; + import java.io.ObjectOutputStream; + import java.io.Serializable; import java.net.InetSocketAddress; import java.rmi.Remote; import java.util.Properties; + import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; *************** *** 61,64 **** --- 65,69 ---- import com.bigdata.btree.BatchLookup; import com.bigdata.btree.BatchRemove; + import com.bigdata.btree.BytesUtil; import com.bigdata.btree.IBatchBTree; import com.bigdata.btree.IBatchOp; *************** *** 68,74 **** import com.bigdata.btree.IReadOnlyBatchOp; import com.bigdata.btree.ISimpleBTree; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.ITx; - import com.bigdata.journal.IsolationEnum; import com.bigdata.journal.Journal; import com.bigdata.util.concurrent.DaemonThreadFactory; --- 73,79 ---- import com.bigdata.btree.IReadOnlyBatchOp; import com.bigdata.btree.ISimpleBTree; + import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.util.concurrent.DaemonThreadFactory; *************** *** 345,359 **** } /** * ! * @todo the state of the op is changed as a side effect and needs to be ! * communicated back to a remote client. Also, the remote client does ! * not need to send uninitialized data across the network when the ! * batch operation will use the data purely for a response - we can ! * just initialize the data fields on this side of the interface and ! * then send them back across the network api. */ ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { --- 350,434 ---- } + + public void registerIndex(String name,UUID indexUUID) { + + journal.serialize(new RegisterIndexTask(name,indexUUID)); + + } + + public void dropIndex(String name) { + + journal.serialize(new DropIndexTask(name)); + + } + + public void batchInsert(long tx, String name, int ntuples, byte[][] keys, + byte[][] vals) throws InterruptedException, ExecutionException { + + batchOp( tx, name, new BatchInsert(ntuples, keys, vals)); + + } + + public boolean[] batchContains(long tx, String name, int ntuples, byte[][] keys + ) throws InterruptedException, ExecutionException { + + BatchContains op = new BatchContains(ntuples, keys, new boolean[ntuples]); + + batchOp( tx, name, op ); + + return op.contains; + + } + + public byte[][] batchLookup(long tx, String name, int ntuples, byte[][] keys) + throws InterruptedException, ExecutionException { + + BatchLookup op = new BatchLookup(ntuples,keys,new byte[ntuples][]); + + batchOp(tx, name, op); + + return (byte[][])op.values; + + } + + public byte[][] batchRemove(long tx, String name, int ntuples, + byte[][] keys, boolean returnOldValues) + throws InterruptedException, ExecutionException { + + BatchRemove op = new BatchRemove(ntuples,keys,new byte[ntuples][]); + + batchOp(tx, name, op); + + return returnOldValues ? (byte[][])op.values : null; + + } /** + * Executes a batch operation on a named btree. * ! * @param tx ! * The transaction identifier -or- zero (0L) IFF the operation is ! * NOT isolated by a transaction. ! * @param name ! * The index name (required). ! * @param op ! * The batch operation. ! * ! * @exception InterruptedException ! * if the operation was interrupted (typically by ! * {@link #shutdownNow()}. ! * @exception ExecutionException ! * If the operation caused an error. See ! * {@link ExecutionException#getCause()} for the underlying ! * error. ! * ! * @todo it is possible to have concurrent execution of batch operations for ! * distinct indices. In order to support this, the write thread would ! * have to become a pool of N worker threads fed from a queue of ! * operations. Concurrent writers can execute as long as they are ! * writing on different indices. (Concurrent readers can execute as ! * long as they are reading from a historical commit time.) */ ! protected void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { *************** *** 415,435 **** } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); ! if (tx == 0L) ! throw new UnsupportedOperationException( ! "Unisolated context not allowed"); ! ! RangeQueryResult result = (RangeQueryResult) txService.submit( ! new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); ! return result; } ! // /** // * @todo if unisolated or isolated at the read-commit level, then the --- 490,555 ---- } + + public int rangeCount(long tx, String name, byte[] fromKey, byte[] toKey) + throws InterruptedException, ExecutionException { + + if (name == null) + throw new IllegalArgumentException(); + + final RangeCountTask task = new RangeCountTask(tx, name, fromKey, toKey); + + final boolean isolated = tx != 0L; + + if (isolated) { + + return (Integer) readService.submit(task).get(); + + } else { + + return (Integer) journal.serialize(task).get(); + + } + + } ! /** ! * ! * FIXME the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for reads as ! * well as writes. ! * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. ! * ! * @todo if we allow the filter to cause mutations (e.g., deleting matching ! * entries) then we have to examine the operation to determine whether ! * or not we need to use the {@link #txService} or the ! * {@link #readService} ! */ ! public ResultSet rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); ! final RangeQueryTask task = new RangeQueryTask(tx, name, fromKey, ! toKey, capacity, flags); ! ! final boolean isolated = tx != 0L; ! if(isolated) { ! ! return (ResultSet) readService.submit(task).get(); ! ! } else { ! ! return (ResultSet) journal.serialize(task).get(); ! ! } } ! // /** // * @todo if unisolated or isolated at the read-commit level, then the *************** *** 458,462 **** // int flags = 0; // @todo set to deliver keys + values for map op. // ! // RangeQueryResult result = (RangeQueryResult) txService.submit( // new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); // --- 578,582 ---- // int flags = 0; // @todo set to deliver keys + values for map op. // ! // ResultSet result = (ResultSet) txService.submit( // new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); // *************** *** 467,470 **** --- 587,671 ---- // // } + + /** + * + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ + private abstract class AbstractIndexManagementTask implements Callable<Object> { + + protected final String name; + + protected AbstractIndexManagementTask(String name) { + + if(name==null) throw new IllegalArgumentException(); + + this.name = name; + + } + + } + + private class RegisterIndexTask extends AbstractIndexManagementTask { + + protected final UUID indexUUID; + + public RegisterIndexTask(String name,UUID indexUUID) { + + super(name); + + if(indexUUID==null) throw new IllegalArgumentException(); + + this.indexUUID = indexUUID; + + } + + public Object call() throws Exception { + + IIndex ndx = journal.getIndex(name); + + if(ndx != null) { + + if(!ndx.getIndexUUID().equals(indexUUID)) { + + throw new IllegalStateException( + "Index already registered with that name and a different indexUUID"); + + } + + return ndx; + + } + + ndx = journal.registerIndex(name, new UnisolatedBTree(journal, indexUUID)); + + journal.commit(); + + return ndx; + + } + + } + + private class DropIndexTask extends AbstractIndexManagementTask { + + public DropIndexTask(String name) { + + super(name); + + } + + public Object call() throws Exception { + + journal.dropIndex(name); + + journal.commit(); + + return null; + + } + + } /** *************** *** 650,684 **** } ! private class RangeQueryTask implements Callable<Object> { private final String name; private final byte[] fromKey; private final byte[] toKey; - private final int flags; private final ITx tx; ! public RangeQueryTask(long startTime, String name, byte[] fromKey, ! byte[] toKey, int flags) { ! assert startTime != 0L; ! tx = journal.getTx(startTime); ! if (tx == null) { ! throw new IllegalStateException("Unknown tx"); } ! if (!tx.isActive()) { ! throw new IllegalStateException("Tx not active"); } ! if( tx.getIsolationLevel() == IsolationEnum.ReadCommitted ) { ! throw new UnsupportedOperationException("Read-committed not supported"); } --- 851,985 ---- } ! private class RangeCountTask implements Callable<Object> { + // startTime or 0L iff unisolated. + private final long startTime; private final String name; private final byte[] fromKey; private final byte[] toKey; private final ITx tx; ! public RangeCountTask(long startTime, String name, byte[] fromKey, ! byte[] toKey) { ! this.startTime = startTime; ! if(startTime != 0L) { ! ! /* ! * Isolated read. ! */ ! ! tx = journal.getTx(startTime); ! ! if (tx == null) { ! ! throw new IllegalStateException("Unknown tx"); ! ! } ! ! if (!tx.isActive()) { ! ! throw new IllegalStateException("Tx not active"); ! ! } ! ! } else { ! ! /* ! * Unisolated read. ! */ ! ! tx = null; ! ! } ! this.name = name; ! this.fromKey = fromKey; ! this.toKey = toKey; ! ! } ! ! public IIndex getIndex(String name) { ! ! if(tx==null) { ! ! return journal.getIndex(name); ! ! } else { ! return tx.getIndex(name); } + + } + + public Object call() throws Exception { ! IIndex ndx = getIndex(name); ! ! if(ndx==null) { ! throw new IllegalStateException("No such index: "+name); } + + return new Integer(ndx.rangeCount(fromKey, toKey)); ! } ! ! } ! ! private class RangeQueryTask implements Callable<Object> { ! ! // startTime or 0L iff unisolated. ! private final long startTime; ! private final String name; ! private final byte[] fromKey; ! private final byte[] toKey; ! private final int capacity; ! private final int flags; ! ! private final ITx tx; ! ! public RangeQueryTask(long startTime, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) { ! ! this.startTime = startTime; ! ! if(startTime != 0L) { ! /* ! * Isolated read. ! */ ! ! tx = journal.getTx(startTime); ! ! if (tx == null) { ! ! throw new IllegalStateException("Unknown tx"); ! ! } ! ! if (!tx.isActive()) { ! ! throw new IllegalStateException("Tx not active"); ! ! } ! ! // if( tx.getIsolationLevel() == IsolationEnum.ReadCommitted ) { ! // ! // throw new UnsupportedOperationException("Read-committed not supported"); ! // ! // } ! ! } else { ! ! /* ! * Unisolated read. ! */ ! ! tx = null; } *************** *** 687,690 **** --- 988,992 ---- this.fromKey = fromKey; this.toKey = toKey; + this.capacity = capacity; this.flags = flags; *************** *** 693,697 **** public IIndex getIndex(String name) { ! return tx.getIndex(name); } --- 995,1007 ---- public IIndex getIndex(String name) { ! if(tx==null) { ! ! return journal.getIndex(name); ! ! } else { ! ! return tx.getIndex(name); ! ! } } *************** *** 701,758 **** IIndex ndx = getIndex(name); ! final int count = ndx.rangeCount(fromKey, toKey); ! boolean countOnly = false; ! final IEntryIterator itr = (countOnly ? null : ndx.rangeIterator( ! fromKey, toKey)); ! return new RangeQueryResult(count, itr, tx.getStartTimestamp(), ! name, fromKey, toKey, flags); } } ! /** - * @todo must keep track of the open iterators on the transaction and - * invalidate them once the transaction completes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public static class RangeQueryResult { ! public final int count; ! public final IEntryIterator itr; ! public final long startTime; ! public final String name; ! public final byte[] fromKey; ! public final byte[] toKey; ! public final int flags; ! public RangeQueryResult(int count, IEntryIterator itr, long startTime, String name, ! byte[] fromKey, byte[] toKey, int flags) { ! this.count = count; ! this.itr = itr; - this.startTime = startTime; - this.name = name; - this.fromKey = fromKey; - this.toKey = toKey; - this.flags = flags; - } } ! /** ! * Abstract class for tasks that execute batch api operations. There are ! * various concrete subclasses, each of which MUST be submitted to the ! * appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> --- 1011,1160 ---- IIndex ndx = getIndex(name); ! if(ndx==null) { ! ! throw new IllegalStateException("No such index: "+name); ! ! } ! final boolean sendKeys = (flags & KEYS) != 0; ! final boolean sendVals = (flags & VALS) != 0; ! /* ! * setup iterator since we will visit keys and/or values in the key ! * range. ! */ ! return new ResultSet(ndx, fromKey, toKey, capacity, sendKeys, ! sendVals); } } ! /** * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * @todo implement {@link Externalizable}, probably buffer the + * {@link ObjectOutputStream} and focus on byte[] transfers. */ ! public static class ResultSet implements Serializable { ! /** ! * Total #of key-value pairs within the key range (approximate). ! */ ! public final int rangeCount; ! /** ! * Actual #of key-value pairs in the {@link ResultSet} ! */ ! public final int ntuples; ! /** ! * True iff the iterator exhausted the available keys such that no more ! * results would be available if you formed the successor of the ! * {@link #lastKey}. ! */ ! final public boolean exhausted; ! ! /** ! * The last key visited by the iterator <em>regardless</em> of the ! * filter imposed -or- <code>null</code> iff no keys were visited by ! * the iterator for the specified key range. ! * ! * @see #nextKey() ! */ ! public final byte[] lastKey; ! /** ! * The next key that should be used to retrieve keys and/or values ! * starting from the first possible successor of the {@link #lastKey} ! * visited by the iterator in this operation (the successor is formed by ! * appending a <code>nul</code> byte to the {@link #lastKey}). ! * ! * @return The successor of {@link #lastKey} -or- <code>null</code> ! * iff the iterator exhausted the available keys. ! * ! * @exception UnsupportedOperationException ! * if the {@link #lastKey} is <code>null</code>. ! */ ! public byte[] nextKey() { ! if (lastKey == null) ! throw new UnsupportedOperationException(); ! return BytesUtil.successor(lastKey); } + /** + * The visited keys iff the {@link RangeQueryEnum#Keys} flag was set. + */ + public final byte[][] keys; + + /** + * The visited values iff the {@link RangeQueryEnum#Values} flag was + * set. + */ + public final byte[][] vals; + + public ResultSet(final IIndex ndx, final byte[] fromKey, + final byte[] toKey, final int capacity, final boolean sendKeys, + final boolean sendVals) { + + // The upper bound on the #of key-value pairs in the range. + rangeCount = ndx.rangeCount(fromKey, toKey); + + final int limit = (rangeCount > capacity ? capacity : rangeCount); + + int ntuples = 0; + + keys = (sendKeys ? new byte[limit][] : null); + + vals = (sendVals ? new byte[limit][] : null); + + // iterator that will visit the key range. + IEntryIterator itr = ndx.rangeIterator(fromKey, toKey); + + /* + * true if any keys were visited regardless of whether or not they + * satisified the optional filter. This is used to make sure that we + * always return the lastKey visited if any keys were visited and + * otherwise set lastKey := null. + */ + boolean anything = false; + + while (ntuples < limit && itr.hasNext()) { + + anything = true; + + byte[] val = (byte[]) itr.next(); + + if (sendVals) + vals[ntuples] = val; + + if (sendKeys) + keys[ntuples] = itr.getKey(); + + // #of results that will be returned. + ntuples++; + + } + + this.ntuples = ntuples; + + this.lastKey = (anything ? itr.getKey() : null); + + this.exhausted = ! itr.hasNext(); + + } + } ! /** ! * Abstract class for tasks that execute {@link IProcedure} operations. ! * There are various concrete subclasses, each of which MUST be submitted to ! * the appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** IDataService.java 16 Apr 2007 10:02:50 -0000 1.7 --- IDataService.java 20 Apr 2007 16:36:27 -0000 1.8 *************** *** 45,56 **** import java.io.IOException; import java.util.concurrent.ExecutionException; import com.bigdata.btree.BTree; ! import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; ! import com.bigdata.journal.IsolationEnum; ! import com.bigdata.service.DataService.RangeQueryResult; /** --- 45,56 ---- import java.io.IOException; + import java.util.UUID; import java.util.concurrent.ExecutionException; import com.bigdata.btree.BTree; ! import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; ! import com.bigdata.service.DataService.ResultSet; /** *************** *** 94,97 **** --- 94,160 ---- /** + * A constant that may be used as the transaction identifier when the + * operation is <em>unisolated</em> (non-transactional). The value of + * this constant is ZERO (0L). + */ + public static final long UNISOLATED = 0L; + + /** + * Flag specifies that keys in the key range will be returned. When not + * given, the keys will NOT be included in the {@link ResultSet} sent to the + * client. + */ + public static final int KEYS = 1 << 0; + + /** + * Flag specifies that values in the key range will be returned. When not + * given, the values will NOT be included in the {@link ResultSet} sent to + * the client. + */ + public static final int VALS = 1 << 1; + + /** + * Register a named mutable B+Tree on the {@link DataService} (unisolated). + * The keys will be variable length unsigned byte[]s. The values will be + * variable length byte[]s. The B+Tree will support version counters and + * delete markers (it will be compatible with the use of transactions for + * concurrency control). + * + * @param name + * The name that can be used to recover the index. + * + * @param indexUUID + * The UUID that identifies the index. When the mutable B+Tree is + * part of a scale-out index, then you MUST provide the indexUUID + * for that scale-out index. Otherwise this MUST be a random + * UUID, e.g., using {@link UUID#randomUUID()}. + * + * @return The object that would be returned by {@link #getIndex(String)}. + * + * @todo exception if index exists? + * + * @todo provide configuration options {whether the index supports isolation + * or not ({@link BTree} vs {@link UnisolatedBTree}), the branching + * factor for the index, and the value serializer. For a client server + * divide I think that we can always go with an + * {@link UnisolatedBTree}. We should pass in the UUID so that this + * can be used by the {@link MetadataService} to create mutable btrees + * to absorb writes when one or more partitions of a scale out index + * are mapped onto the {@link DataService}. + */ + public void registerIndex(String name,UUID uuid) throws IOException; + + /** + * Drops the named index (unisolated). + * + * @param name + * The name of the index to be dropped. + * + * @exception IllegalArgumentException + * if <i>name</i> does not identify a registered index. + */ + public void dropIndex(String name) throws IOException; + + /** * <p> * Used by the client to submit a batch operation on a named B+Tree *************** *** 133,136 **** --- 196,201 ---- * The batch operation. * + * @exception IOException + * if there was a problem with the RPC. * @exception InterruptedException * if the operation was interrupted (typically by *************** *** 141,154 **** * error. * ! * @todo it is possible to have concurrent execution of batch operations for ! * distinct indices. In order to support this, the write thread would ! * have to become a pool of N worker threads fed from a queue of ! * operations. Concurrent writers can execute as long as they are ! * writing on different indices. (Concurrent readers can execute as ! * long as they are reading from a historical commit time.) */ ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException; ! /** * <p> --- 206,228 ---- * error. * ! * @todo javadoc update. ! * @todo support extension operations (read or mutable). */ ! public void batchInsert(long tx, String name, int ntuples, byte[][] keys, ! byte[][] values) throws InterruptedException, ExecutionException, ! IOException; ! ! public boolean[] batchContains(long tx, String name, int ntuples, ! byte[][] keys) throws InterruptedException, ExecutionException, ! IOException; ! ! public byte[][] batchLookup(long tx, String name, int ntuples, ! byte[][] keys) throws InterruptedException, ExecutionException, ! IOException; ! ! public byte[][] batchRemove(long tx, String name, int ntuples, ! byte[][] keys, boolean returnOldValues) throws InterruptedException, ExecutionException, IOException; ! /** * <p> *************** *** 156,172 **** * </p> * <p> ! * Note: The rangeQuery operation is NOT allowed for read-committed ! * transactions (the underlying constraint is that the {@link BTree} does ! * NOT support traversal under concurrent modification so this operation is ! * limited to read-only or fully isolated transactions or to unisolated ! * reads against a historical commit time). * </p> * * @param tx * @param name * @param fromKey * @param toKey * @param flags ! * (@todo define flags: count yes/no, keys yes/no, values yes/no) * * @exception InterruptedException --- 230,257 ---- * </p> * <p> ! * In order to visit all keys in a range, clients are expected to issue ! * repeated calls in which the <i>fromKey</i> is incremented to the ! * successor of the last key visited until either an empty {@link ResultSet} ! * is returned or the {@link ResultSet#isLast()} flag is set, indicating ! * that all keys up to (but not including) the <i>startKey</i> have been ! * visited. * </p> * * @param tx + * The transaction identifier -or- zero (0L) IFF the operation is + * NOT isolated by a transaction. * @param name + * The index name (required). * @param fromKey + * The starting key for the scan. * @param toKey + * The first key that will not be visited. + * @param capacity + * The maximum #of key-values to return. (This must be rounded up + * if necessary in order to all values selected for a single row + * of a sparse row store.) * @param flags ! * One or more flags formed by bitwise OR of the constants ! * defined by {@link RangeQueryEnum}. * * @exception InterruptedException *************** *** 177,194 **** * {@link ExecutionException#getCause()} for the underlying * error. - * @exception UnsupportedOperationException - * If the tx is zero (0L) (indicating an unisolated - * operation) -or- if the identifed transaction is - * {@link IsolationEnum#ReadCommitted}. * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. */ ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException, IOException; /** * <p> --- 262,360 ---- * {@link ExecutionException#getCause()} for the underlying * error. * ! * @todo provide for optional filter. */ ! public ResultSet rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int capacity, int flags) throws InterruptedException, ExecutionException, IOException, IOException; + + /** + * <p> + * Range count of entries in a key range for the named index on this + * {@link DataService}. + * </p> + * + * @param tx + * The transaction identifier -or- zero (0L) IFF the operation is + * NOT isolated by a transaction. + * @param name + * The index name (required). + * @param fromKey + * The starting key for the scan. + * @param toKey + * The first key that will not be visited. + * + * @return The upper bound estimate of the #of key-value pairs in the key + * range of the partition(s) of the named index found on this + * {@link DataService}. + * + * @exception InterruptedException + * if the operation was interrupted (typically by + * {@link #shutdownNow()}. + * @exception ExecutionException + * If the operation caused an error. See + * {@link ExecutionException#getCause()} for the underlying + * error. + */ + public int rangeCount(long tx, String name, byte[] fromKey, byte[] toKey) + throws InterruptedException, ExecutionException, IOException, + IOException; + + // /** + // * Typesafe enum for flags that control the behavior of + // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} + // * + // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + // * @version $Id$ + // */ + // public static enum RangeQueryEnum { + // + // /** + // * Flag specifies that keys in the key range will be returned. When not + // * given, the keys will NOT be included in the {@link ResultSetChunk}s + // * sent to the client. + // */ + // Keys(1 << 1), + // + // /** + // * Flag specifies that values in the key range will be returned. When + // * not given, the values will NOT be included in the + // * {@link ResultSetChunk}s sent to the client. + // */ + // Values(1 << 2); + // + // private final int flag; + // + // private RangeQueryEnum(int flag) { + // + // this.flag = flag; + // + // } + // + // /** + // * True iff this flag is set. + // * + // * @param flags + // * An integer on which zero or more flags have been set. + // * + // * @return True iff this flag is set. + // */ + // public boolean isSet(int flags) { + // + // return (flags & flag) == 1; + // + // } + // + // /** + // * The bit mask for this flag. + // */ + // public final int valueOf() { + // + // return flag; + // + // } + // + // }; + /** * <p> *************** *** 220,223 **** --- 386,390 ---- * The procedure to be executed. * + * @throws IOException * @throws InterruptedException * @throws ExecutionException Index: MetadataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/MetadataServer.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** MetadataServer.java 27 Mar 2007 14:34:24 -0000 1.1 --- MetadataServer.java 20 Apr 2007 16:36:27 -0000 1.2 *************** *** 188,192 **** * metadata service. */ ! protected void destroy() { MetadataService service = (MetadataService)impl; --- 188,192 ---- * metadata service. */ ! public void destroy() { MetadataService service = (MetadataService)impl; Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** DataServiceClient.java 13 Apr 2007 15:04:19 -0000 1.5 --- DataServiceClient.java 20 Apr 2007 16:36:27 -0000 1.6 *************** *** 54,58 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.RangeQueryResult; /** --- 54,58 ---- import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.ResultSet; /** *************** *** 66,70 **** * service will support a scale-out solution. */ ! public class DataServiceClient implements IDataService { final IDataService delegate; --- 66,70 ---- * service will support a scale-out solution. */ ! abstract public class DataServiceClient implements IDataService { final IDataService delegate; *************** *** 105,115 **** } ! public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException { ! delegate.batchOp(tx, name, op); ! } ! ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { --- 105,115 ---- } ! // public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException, IOException { ! // delegate.batchOp(tx, name, op); ! // } ! // ! // public ResultSet rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException, IOException { ! // return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! // } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** EmbeddedDataService.java 13 Apr 2007 15:04:19 -0000 1.4 --- EmbeddedDataService.java 20 Apr 2007 16:36:27 -0000 1.5 *************** *** 56,60 **** import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.RangeQueryResult; import com.bigdata.util.concurrent.DaemonThreadFactory; --- 56,60 ---- import com.bigdata.btree.IBatchOp; import com.bigdata.journal.ValidationError; ! import com.bigdata.service.DataService.ResultSet; import com.bigdata.util.concurrent.DaemonThreadFactory; *************** *** 71,75 **** * using delegation patterns). */ ! public class EmbeddedDataService implements IDataService, IServiceShutdown { private final DataService delegate; --- 71,75 ---- * using delegation patterns). */ ! abstract public class EmbeddedDataService implements IDataService, IServiceShutdown { private final DataService delegate; *************** *** 118,124 **** // } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { --- 118,124 ---- // } ! // public ResultSet rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! // return delegate.rangeQuery(tx, name, fromKey, toKey, flags); ! // } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { Index: AbstractServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/AbstractServer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** AbstractServer.java 27 Mar 2007 14:34:23 -0000 1.5 --- AbstractServer.java 20 Apr 2007 16:36:27 -0000 1.6 *************** *** 185,188 **** --- 185,197 ---- private boolean open = false; + + /** + * Return the assigned {@link ServiceID}. + */ + public ServiceID getServiceID() { + + return serviceID; + + } /** *************** *** 460,463 **** --- 469,474 ---- log.info("serviceID=" + serviceID); + this.serviceID = serviceID; + if (serviceIdFile != null) { *************** *** 649,653 **** * state. */ ! protected void destroy() { shutdownNow(); --- 660,664 ---- * state. */ ! public void destroy() { shutdownNow(); |