From: Bryan T. <tho...@us...> - 2007-03-15 16:11:16
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv10595/src/java/com/bigdata/service Added Files: EmbeddedDataService.java NIODataService.java IReadOnlyProcedure.java IReducer.java IProcedure.java DataService.java TransactionService.java OldTransactionServer.java IDataService.java IMapOp.java DataServiceClient.java Log Message: Refactoring to define service apis (data service, transaction manager service) and some approximate implementations of those services (not supporting service discovery, network protocol, or service robustness). Copied in the UML model so that it will actually get committed to CVS.... --- NEW FILE: NIODataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Oct 9, 2006 */ package com.bigdata.service; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * The network facing {@link DataService} interface. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo Refactor code from the nio test suites. Note that the * {@link DataService} already breaks down the tasks into various thread * pools. This class needs to deploy one thread to accept connections, one * to accept requests (which may be assembled over a series of socket * reads). Once a request is complete, it is handed off to a pool of * worker threads iff it has high latency and otherwise executed * immediately (e.g., an tx abort is low latency and is always executed * directly - pretty much everything else is high latency and needs to be * managed by a worker pool). As worker threads complete, they formulate a * response and then place it on the queue for sending back responses to * clients. * * @todo break down into transaction service that directs 2-3 phase commits on * the data services involved in a given transaction. this will require a * protocol for notifying the transaction service when a client will write * on a data service instance. * * @todo provide a service for moving index partitions around to support load * distribution. * * @todo Support data replication, e.g., via pipelining writes or ROWAA, * including the case with RAM-only segments that gain failover through * replication. */ public class NIODataService { // // /** // * Open journals (indexed by segment). // * // * @todo Change to int32 keys? // * @todo Define Segment object to encapsulate both the Journal and the // * database as well as any metadata associated with the segment, e.g., // * load stats. // */ // Map<Long,Journal> journals = new HashMap<Long, Journal>(); // // /** // * // * @param segment // * @param properties // * @throws IOException // * // * @todo Define protocol for journal startup. // */ // public void openSegment(long segment, Properties properties) throws IOException { // // if( journals.containsKey(segment)) { // // throw new IllegalStateException(); // // } // // // @todo pass segment in when creating/opening a journal. // Journal journal = new Journal(properties); // // journals.put(segment, journal); // // } // // /** // * // * @param segment // * @param properties // * @throws IOException // * // * @todo Define protocol for journal shutdown. // */ // public void closeSegment(long segment,Properties properties) throws IOException { // // Journal journal = journals.remove(segment); // // if( journal == null ) throw new IllegalArgumentException(); // // /* // * @todo This is far to abupt. We have to gracefully shutdown the // * segment (both the journal and the read-optimized database). // */ // journal._bufferStrategy.close(); // // } // // /** // * Models a request from a client that has been read from the wire and is // * ready for processing. // * // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> // * @version $Id$ // * // * @todo define this and define how it relates to client responses. // */ // public static class ClientRequest { // // } // // /** // * Models a response that is read to be send down the wire to a client. // * // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> // * @version $Id$ // * // * @todo define this and define how it relates to client requests. // */ // public static class ClientResponse { // // } // // /** // * @todo create with NO open segments, and then accept requests to receive, // * open, create, send, close, or delete a segment. When opening a // * segment, open both the journal and the database. Keep properties // * for defaults? Server options only? // * // * @todo Work out relationship between per-segment and per-transaction // * request processing. If requests are FIFO per transaction, then that // * has to dominate the queues but we may want to have a set of worker // * threads that allow greater parallism when processing requests in // * different transactions against different segments. // */ // public NIODataService(Properties properties) { // // Queue<ClientRequest> requests = new ConcurrentLinkedQueue<ClientRequest>(); // // Queue<ClientResponse> responses = new ConcurrentLinkedQueue<ClientResponse>(); // // // } // // /** // * The {@link ClientAcceptor} accepts new clients. // * // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> // * @version $Id$ // */ // public class ClientAcceptor extends Thread { // // public ClientAcceptor() { // // super("Client-Acceptor"); // // } // // } // // /** // * The {@link ClientResponder} buffers Read, Write, and Delete requests from // * the client, places them into a per-transaction queue, and notices when // * results are available to send back to the client. // * // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> // * @version $Id$ // * // * @todo Delete requests are basically a special case of write requests, but // * they probably deserve distinction in the protocol. // */ // public class ClientResponder extends Thread { // // public ClientResponder() { // // super("Client-Responder"); // // } // // } // // /** // * The {@link ClientRequestHandler} consumes buffered client requests from a // * per-transaction FIFO queue and places responses onto a queue where they // * are picked up by the {@link ClientResponder} and sent to the client. // * // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> // * @version $Id$ // * // * @todo Reads from the journal must read through any FIFO queue, which // * means indexing the buffered request by transaction, arrival time, // * and objects written. If client requests write directly through then // * we can simplify this logic. However, I believe that we need to be // * able to suspend writes on the journal during commit processing. If // * the client had to block on writes for any transaction, that could // * introduce unacceptable latency. // */ // // public class ClientRequestHandler extends Thread { // // final Journal journal; // // /* // * @todo handshake with journal to make sure that the writer is // * exclusive, e.g., obtaining an exclusive file lock might work. // */ // public ClientRequestHandler(Journal journal) { // // super("Journal-Writer"); // // if (journal == null) // throw new IllegalArgumentException(); // // this.journal = journal; // // } // // /** // * Write request from client. // * // * @param txId // * The transaction identifier. // * @param objId // * The int32 within-segment persistent identifier. // * @param data // * The data to be written. The bytes from // * {@link ByteBuffer#position()} to // * {@link ByteBuffer#limit()} will be written. // */ // public void write(long txId, int objId, ByteBuffer data) { // // ITx transaction = journal.getTx(txId); // // if( transaction == null ) { // // // @todo Send back an error. // // throw new UnsupportedOperationException(); // // } // // transaction.write(objId,data); // // } // // /** // * Read request from the client. // * // * @param txId // * The transaction identifier. // * // * @param objId // * The int32 within-segment persistent identifier. // */ // public void read(long txId, int objId) { // // ITx transaction = journal.getTx(txId); // // if( transaction == null ) { // // // @todo Send back an error. // // throw new UnsupportedOperationException(); // // } // // /* // * @todo If we are doing a row scan or any kind of read-ahead then // * we can buffer the results into a block and send it back along // * with an object map so that the client can slice the individual // * rows out of the block. // */ // ByteBuffer data = transaction.read(objId, null); // // if( data == null ) { // // /* // * FIXME Resolve the object against the database. // */ // throw new UnsupportedOperationException("Read from database"); // // } // // /* // * FIXME Write the data onto a socket to get it back to the client. // */ // // } // // /** // * Delete request from client. // * // * @param txId // * The transaction identifier. // * @param objId // * The int32 within-segment persistent identifier. // */ // public void delete(long txId, int objId) { // // ITx transaction = journal.getTx(txId); // // if( transaction == null ) { // // // @todo Send back an error. // // throw new UnsupportedOperationException(); // // } // // transaction.delete(objId); // // } // // } } --- NEW FILE: DataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 14, 2007 */ package com.bigdata.service; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.ITx; import com.bigdata.journal.IsolationEnum; import com.bigdata.journal.Journal; import com.bigdata.objndx.BatchContains; import com.bigdata.objndx.BatchInsert; import com.bigdata.objndx.BatchLookup; import com.bigdata.objndx.BatchRemove; import com.bigdata.objndx.IBatchBTree; import com.bigdata.objndx.IBatchOp; import com.bigdata.objndx.IEntryIterator; import com.bigdata.objndx.IIndex; import com.bigdata.objndx.ILinearList; import com.bigdata.objndx.IReadOnlyBatchOp; import com.bigdata.objndx.ISimpleBTree; import com.bigdata.util.concurrent.DaemonThreadFactory; /** * An implementation of a data service suitable for use with RPC, direct client * calls (if decoupled by an operation queue), or a NIO interface. * <p> * This implementation is thread-safe. It will block for each operation. It MUST * be invoked within a pool of request handler threads servicing a network * interface in order to decouple data service operations from client requests. * When using as part of an embedded database, the client operations MUST be * buffered by a thread pool with a FIFO policy so that client requests will be * decoupled from data service operations. * <p> * The {@link #txService} provides concurrency for transaction processing. * <p> * The {@link #opService} provides concurrency for unisolated reads. * <p> * Unisolated writes serialized using * {@link AbstractJournal#serialize(Callable)}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @see NIODataService, which contains some old code that can be refactored for * an NIO interface to the data service. * * @todo add assertOpen() throughout * * @todo declare interface for managing service shutdown()/shutdownNow()? * * @todo support group commit for unisolated writes. i may have to refactor some * to get group commit to work for both transaction commits and unisolated * writes. basically, the tasks on the * {@link AbstractJournal#writeService write service} need to get * aggregated. * * @todo implement NIODataService, RPCDataService(possible), EmbeddedDataService * (uses queue to decouple operations), DataServiceClient (provides * translation from {@link ISimpleBTree} to {@link IBatchBTree}, provides * transparent partitioning of batch operations, handles handshaking and * leases with the metadata index locator service; abstract IO for * different client platforms (e.g., support PHP, C#). Bundle ICU4J with * the client. * * @todo JobScheduler service for map/reduce (or Hadoop integration). * * @todo another data method will need to be defined to support GOM with * pre-fetch. the easy way to do this is to get 50 objects to either side * of the object having the supplied key. This is easy to compute using * the {@link ILinearList} interface. I am not sure about unisolated * operations for GOM.... Isolated operations are straight forward. The * other twist is supporting scalable link sets, link set indices (not * named, unless the identity of the object collecting the link set is * part of the key), and non-OID indices (requires changes to * generic-native). * * @todo Have the {@link DataService} notify the transaction manager when a * write is performed on that service so that all partitipating * {@link DataService} instances will partitipate in a 2-/3-phase commit * (and a simple commit can be used when the transaction write set is * localized on a single dataservice instance). The message needs to be * synchronous each time a new index partition is written on by the client * so that the transaction manager can locate the primary * {@link DataService} instance for the write when it needs to commit or * abort the tx. */ public class DataService implements IDataService { protected Journal journal; /** * Pool of threads for handling unisolated reads. */ final protected ExecutorService readService; /** * Pool of threads for handling concurrent transactions. */ final protected ExecutorService txService; /** * Options understood by the {@link DataService}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class Options extends com.bigdata.journal.Options { /** * <code>readServicePoolSize</code> - The #of threads in the pool * handling concurrent unisolated read requests. * * @see #DEFAULT_READ_SERVICE_POOL_SIZE */ public static final String READ_SERVICE_POOL_SIZE = "readServicePoolSize"; /** * The default #of threads in the read service thread pool. */ public final static int DEFAULT_READ_SERVICE_POOL_SIZE = 20; /** * <code>txServicePoolSize</code> - The #of threads in the pool * handling concurrent transactions. * * @see #DEFAULT_TX_SERVICE_POOL_SIZE */ public static final String TX_SERVICE_POOL_SIZE = "txServicePoolSize"; /** * The default #of threads in the transaction service thread pool. */ public final static int DEFAULT_TX_SERVICE_POOL_SIZE = 100; } /** * * @param properties */ public DataService(Properties properties) { String val; final int txServicePoolSize; final int readServicePoolSize; /* * "readServicePoolSize" */ val = properties.getProperty(Options.READ_SERVICE_POOL_SIZE); if (val != null) { readServicePoolSize = Integer.parseInt(val); if (readServicePoolSize < 1 ) { throw new RuntimeException("The '" + Options.READ_SERVICE_POOL_SIZE + "' must be at least one."); } } else readServicePoolSize = Options.DEFAULT_READ_SERVICE_POOL_SIZE; /* * "txServicePoolSize" */ val = properties.getProperty(Options.TX_SERVICE_POOL_SIZE); if (val != null) { txServicePoolSize = Integer.parseInt(val); if (txServicePoolSize < 1 ) { throw new RuntimeException("The '" + Options.TX_SERVICE_POOL_SIZE + "' must be at least one."); } } else txServicePoolSize = Options.DEFAULT_TX_SERVICE_POOL_SIZE; /* * The journal's write service will be used to handle unisolated writes * and transaction commits. * * @todo parameterize for use of scale-up vs scale-out journal impls. */ journal = new Journal(properties); // setup thread pool for unisolated read operations. readService = Executors.newFixedThreadPool(readServicePoolSize, DaemonThreadFactory.defaultThreadFactory()); // setup thread pool for concurrent transactions. txService = Executors.newFixedThreadPool(txServicePoolSize, DaemonThreadFactory.defaultThreadFactory()); } /** * Polite shutdown does not accept new requests and will shutdown once * the existing requests have been processed. */ public void shutdown() { readService.shutdown(); txService.shutdown(); journal.shutdown(); } /** * Shutdown attempts to abort in-progress requests and shutdown as soon * as possible. */ public void shutdownNow() { readService.shutdownNow(); txService.shutdownNow(); journal.close(); } /* * ITxCommitProtocol. */ public void commit(long tx) { // will place task on writeService and block iff necessary. journal.commit(tx); } public void abort(long tx) { // will place task on writeService iff read-write tx. journal.abort(tx); } /* * IDataService. */ private boolean isReadOnly(long startTime) { assert startTime != 0l; ITx tx = journal.getTx(startTime); if (tx == null) { throw new IllegalStateException("Unknown: tx=" + startTime); } return tx.isReadOnly(); } /** * * @todo the state of the op is changed as a side effect and needs to be * communicated back to a remote client. Also, the remote client does * not need to send uninitialized data across the network when the * batch operation will use the data purely for a response - we can * just initialize the data fields on this side of the interface and * then send them back across the network api. */ public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); if( op == null ) throw new IllegalArgumentException(); final boolean isolated = tx != 0L; final boolean readOnly = (op instanceof IReadOnlyBatchOp) || (isolated && isReadOnly(tx)); if(isolated) { txService.submit(new TxBatchTask(tx,name,op)).get(); } else if( readOnly ) { readService.submit(new UnisolatedReadBatchTask(name,op)).get(); } else { /* * Special case since incomplete writes MUST be discarded and * complete writes MUST be committed. */ journal.serialize(new UnisolatedBatchReadWriteTask(name,op)).get(); } } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { if( proc == null ) throw new IllegalArgumentException(); final boolean isolated = tx != 0L; final boolean readOnly = proc instanceof IReadOnlyProcedure; if(isolated) { txService.submit(new TxProcedureTask(tx,proc)).get(); } else if( readOnly ) { readService.submit(new UnisolatedReadProcedureTask(proc)).get(); } else { /* * Special case since incomplete writes MUST be discarded and * complete writes MUST be committed. */ journal.serialize(new UnisolatedReadWriteProcedureTask(proc)).get(); } } public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); if (tx == 0L) throw new UnsupportedOperationException( "Unisolated context not allowed"); RangeQueryResult result = (RangeQueryResult)txService.submit( new RangeQueryTask(tx, name, fromKey, toKey, countOnly, keysOnly, valuesOnly)).get(); return result; } /** * @todo if unisolated or isolated at the read-commit level, then the * operation really needs to be broken down by partition or perhaps by * index segment leaf so that we do not have too much latency during a * read (this could be done for rangeQuery as well). * * @todo if fully isolated, then there is no problem running map. * * @todo The definition of a row is different if using a key formed from the * column name, application key, and timestamp. * * @todo For at least GOM we need to deserialize rows from byte[]s, so we * need to have the (de-)serializer to the application level value on * hand. */ public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); if (tx == 0L) throw new UnsupportedOperationException( "Unisolated context not allowed"); RangeQueryResult result = (RangeQueryResult) txService.submit( new RangeQueryTask(tx, name, fromKey, toKey, false, false, false)).get(); // @todo resolve the reducer service. IReducer reducer = null; op.apply(result.itr, reducer); } /** * Abstract class for tasks that execute batch api operations. There are * various concrete subclasses, each of which MUST be submitted to the * appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private abstract class AbstractBatchTask implements Callable<Object> { private final String name; private final IBatchOp op; public AbstractBatchTask(String name, IBatchOp op) { this.name = name; this.op = op; } abstract IIndex getIndex(String name); public Object call() throws Exception { IIndex ndx = getIndex(name); if (ndx == null) throw new IllegalStateException("Index not registered: " + name); if( op instanceof BatchContains ) { ndx.contains((BatchContains) op); } else if( op instanceof BatchLookup ) { ndx.lookup((BatchLookup) op); } else if( op instanceof BatchInsert ) { ndx.insert((BatchInsert) op); } else if( op instanceof BatchRemove ) { ndx.remove((BatchRemove) op); } else { // Extension batch mutation operation. op.apply(ndx); } return null; } } /** * Resolves the named index against the transaction in order to provide * appropriate isolation for reads, read-committed reads, or writes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @see ITx * * @todo In order to allow multiple clients to do work on the same * transaction at once, we need a means to ensure that the same * transaction is not assigned to more than one thread in the * {@link DataService#txService}. In the absence of clients imposing * a protocol among themselves for this purpose, we can simply * maintain a mapping of transactions to threads. If a transaction is * currently bound to a thread (its callable task is executing) then * the current thread must wait. This protocol can be easily * implemented using thread local variables.<br> * Note: it is possible for this protocol to result in large numbers * of worker threads blocking, but as long as each worker thread makes * progress it should not be possible for the thread pool as a whole * to block. */ private class TxBatchTask extends AbstractBatchTask { private final ITx tx; public TxBatchTask(long startTime, String name, IBatchOp op) { super(name,op); assert startTime != 0L; tx = journal.getTx(startTime); if (tx == null) { throw new IllegalStateException("Unknown tx"); } if (!tx.isActive()) { throw new IllegalStateException("Tx not active"); } } public IIndex getIndex(String name) { return tx.getIndex(name); } } /** * Class used for unisolated <em>read</em> operations. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private class UnisolatedReadBatchTask extends AbstractBatchTask { public UnisolatedReadBatchTask(String name, IBatchOp op) { super(name,op); } public IIndex getIndex(String name) { return journal.getIndex(name); } } /** * Class used for unisolated <em>write</em> operations. This class * performs the necessary handshaking with the journal to discard partial * writes in the event of an error during processing and to commit after a * successful write operation, thereby providing the ACID contract for an * unisolated write. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private class UnisolatedBatchReadWriteTask extends UnisolatedReadBatchTask { public UnisolatedBatchReadWriteTask(String name, IBatchOp op) { super(name,op); } protected void abort() { journal.abort(); } public Long call() throws Exception { try { super.call(); // commit (synchronous, immediate). return journal.commit(); } catch(Throwable t) { abort(); throw new RuntimeException(t); } } } private class RangeQueryTask implements Callable<Object> { private final String name; private final byte[] fromKey; private final byte[] toKey; private final boolean countOnly; private final boolean keysOnly; private final boolean valuesOnly; private final ITx tx; public RangeQueryTask(long startTime, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) { assert startTime != 0L; tx = journal.getTx(startTime); if (tx == null) { throw new IllegalStateException("Unknown tx"); } if (!tx.isActive()) { throw new IllegalStateException("Tx not active"); } if( tx.getIsolationLevel() == IsolationEnum.ReadCommitted ) { throw new UnsupportedOperationException("Read-committed not supported"); } this.name = name; this.fromKey = fromKey; this.toKey = toKey; this.countOnly = countOnly; this.keysOnly = keysOnly; this.valuesOnly = valuesOnly; } public IIndex getIndex(String name) { return tx.getIndex(name); } public Object call() throws Exception { IIndex ndx = getIndex(name); final int count = ndx.rangeCount(fromKey, toKey); final IEntryIterator itr = (countOnly ? null : ndx.rangeIterator( fromKey, toKey)); return new RangeQueryResult(count, itr, tx.getStartTimestamp(), name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } } /** * @todo must keep track of the open iterators on the transaction and * invalidate them once the transaction completes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class RangeQueryResult { public final int count; public final IEntryIterator itr; public final long startTime; public final String name; public final byte[] fromKey; public final byte[] toKey; public final boolean countOnly; public final boolean keysOnly; public final boolean valuesOnly; public RangeQueryResult(int count, IEntryIterator itr, long startTime, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) { this.count = count; this.itr = itr; this.startTime = startTime; this.name = name; this.fromKey = fromKey; this.toKey = toKey; this.countOnly = countOnly; this.keysOnly = keysOnly; this.valuesOnly = valuesOnly; } } /** * Abstract class for tasks that execute batch api operations. There are * various concrete subclasses, each of which MUST be submitted to the * appropriate service for execution. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private abstract class AbstractProcedureTask implements Callable<Object> { protected final IProcedure proc; public AbstractProcedureTask(IProcedure proc) { this.proc = proc; } } /** * Resolves the named index against the transaction in order to provide * appropriate isolation for reads, read-committed reads, or writes. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @see ITx * * @todo In order to allow multiple clients to do work on the same * transaction at once, we need a means to ensure that the same * transaction is not assigned to more than one thread in the * {@link DataService#txService}. In the absence of clients imposing * a protocol among themselves for this purpose, we can simply * maintain a mapping of transactions to threads. If a transaction is * currently bound to a thread (its callable task is executing) then * the current thread must wait. This protocol can be easily * implemented using thread local variables.<br> * Note: it is possible for this protocol to result in large numbers * of worker threads blocking, but as long as each worker thread makes * progress it should not be possible for the thread pool as a whole * to block. */ private class TxProcedureTask extends AbstractProcedureTask { private final ITx tx; public TxProcedureTask(long startTime, IProcedure proc) { super(proc); assert startTime != 0L; tx = journal.getTx(startTime); if (tx == null) { throw new IllegalStateException("Unknown tx"); } if (!tx.isActive()) { throw new IllegalStateException("Tx not active"); } } public Object call() throws Exception { proc.apply(tx.getStartTimestamp(),tx); return null; } } /** * Class used for unisolated <em>read</em> operations. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private class UnisolatedReadProcedureTask extends AbstractProcedureTask { public UnisolatedReadProcedureTask(IProcedure proc) { super(proc); } public Object call() throws Exception { proc.apply(0L,journal); return null; } } /** * Class used for unisolated <em>write</em> operations. This class * performs the necessary handshaking with the journal to discard partial * writes in the event of an error during processing and to commit after a * successful write operation, thereby providing the ACID contract for an * unisolated write. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ private class UnisolatedReadWriteProcedureTask extends UnisolatedReadProcedureTask { public UnisolatedReadWriteProcedureTask(IProcedure proc) { super(proc); } protected void abort() { journal.abort(); } public Long call() throws Exception { try { super.call(); // commit (synchronous, immediate). return journal.commit(); } catch(Throwable t) { abort(); throw new RuntimeException(t); } } } } --- NEW FILE: IDataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 15, 2007 */ package com.bigdata.service; import java.util.concurrent.ExecutionException; import com.bigdata.journal.ITransactionManager; import com.bigdata.journal.ITxCommitProtocol; import com.bigdata.journal.IsolationEnum; import com.bigdata.objndx.BTree; import com.bigdata.objndx.IBatchOp; import com.bigdata.service.DataService.RangeQueryResult; /** * Data service interface. * <p> * The data service exposes the methods on this interface to the client and the * {@link ITxCommitProtocol} methods to the {@link ITransactionManager} service. * <p> * The data service exposes both isolated (transactional) and unisolated batch * operations on scalable named btrees. Transactions are identified by their * start time. BTrees are identified by name. The btree batch API provides for * existence testing, lookup, insert, removal, and an extensible mutation * operation. Other operations exposed by this interface include: remote * procedure execution, key range traversal, and mapping of an operator over a * key range. * <p> * Unisolated processing is broken down into idempotent operation (reads) and * mutation operations (insert, remove, the extensible batch operator, and * remote procedure execution). * <p> * Unisolated writes are serialized and ACID. If an unisolated write succeeds, * then it will commit immediately. If the unisolated write fails, the partial * write on the journal will be discarded. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IDataService extends ITxCommitProtocol { /** * Used by the client to submit a batch operation on a named B+Tree * (synchronous). * <p> * Unisolated operations SHOULD be used to achieve "auto-commit" semantics. * Fully isolated transactions are useful IFF multiple operations must be * composed into a ACID unit. * <p> * While unisolated batch operations on a single data service are ACID, * clients are required to locate all index partitions for the logical * operation and distribute their operation across the distinct data service * instances holding the affected index partitions. In practice, this means * that contract for ACID unisolated operations is limited to operations * where the data is located on a single data service instance. For ACID * operations that cross multiple data service instances the client MUST use * a fully isolated transaction. While read-committed transactions impose * low system overhead, clients interested in the higher possible total * throughput SHOULD choose unisolated read operations in preference to a * read-committed transaction. * <p> * This method is thread-safe. It will block for each operation. It should * be invoked within a pool request handler threads servicing a network * interface and thereby decoupling data service operations from client * requests. When using as part of an embedded database, the client * operations MUST be buffered by a thread pool with a FIFO policy so that * client requests may be decoupled from data service operations. * * @param tx * The transaction identifier -or- zero (0L) IFF the operation is * NOT isolated by a transaction. * @param name * The index name (required). * @param op * The batch operation. * * @exception InterruptedException * if the operation was interrupted (typically by * {@link #shutdownNow()}. * @exception ExecutionException * If the operation caused an error. See * {@link ExecutionException#getCause()} for the underlying * error. */ public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException; /** * Submit a procedure. * <p> * <p> * Unisolated operations SHOULD be used to achieve "auto-commit" semantics. * Fully isolated transactions are useful IFF multiple operations must be * composed into a ACID unit. * <p> * While unisolated batch operations on a single data service are ACID, * clients are required to locate all index partitions for the logical * operation and distribute their operation across the distinct data service * instances holding the affected index partitions. In practice, this means * that contract for ACID unisolated operations is limited to operations * where the data is located on a single data service instance. For ACID * operations that cross multiple data service instances the client MUST use * a fully isolated transaction. While read-committed transactions impose * low system overhead, clients interested in the higher possible total * throughput SHOULD choose unisolated read operations in preference to a * read-committed transaction. * * @param tx * The transaction identifier -or- zero (0L) IFF the operation is * NOT isolated by a transaction. * @param proc * The procedure to be executed. * * @throws InterruptedException * @throws ExecutionException */ public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException; /** * Streaming traversal of keys and/or values in a given key range. * <p> * Note: The rangeQuery operation is NOT allowed for either unisolated reads * or read-committed transactions (the underlying constraint is that the * {@link BTree} does NOT support traversal under concurrent modification * this operation is limited to read-only or fully isolated transactions). * * @param tx * @param name * @param fromKey * @param toKey * @param countOnly * @param keysOnly * @param valuesOnly * * @exception InterruptedException * if the operation was interrupted (typically by * {@link #shutdownNow()}. * @exception ExecutionException * If the operation caused an error. See * {@link ExecutionException#getCause()} for the underlying * error. * @exception UnsupportedOperationException * If the tx is zero (0L) (indicating an unisolated * operation) -or- if the identifed transaction is * {@link IsolationEnum#ReadCommitted}. */ public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException; /** * Maps an operation against all key/value pairs in a key range, writing the * result onto a reducer service. */ public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException; } --- NEW FILE: EmbeddedDataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 15, 2007 */ package com.bigdata.service; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.bigdata.objndx.IBatchOp; import com.bigdata.service.DataService.RangeQueryResult; import com.bigdata.util.concurrent.DaemonThreadFactory; /** * Implementation suitable for a standalone embedded database. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo either decouple client operations from the data service (they are * synchronous) or drop this class and just use DataService directly for * an embedded data service (or potentially integrate a * {@link DataService} and {@link TransactionService} instance together * using delegation patterns). */ public class EmbeddedDataService implements IDataService { private final DataService delegate; /** * Pool of threads for decoupling client operations. */ final protected ExecutorService opService; public EmbeddedDataService(Properties properties) { delegate = new DataService(properties); // setup thread pool for decoupling client operations. opService = Executors.newFixedThreadPool(100, DaemonThreadFactory .defaultThreadFactory()); } /** * Polite shutdown does not accept new requests and will shutdown once * the existing requests have been processed. */ public void shutdown() { delegate.shutdown(); } /** * Shutdown attempts to abort in-progress requests and shutdown as soon * as possible. */ public void shutdownNow() { delegate.shutdownNow(); } public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { delegate.batchOp(tx, name, op); } public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { delegate.map(tx, name, fromKey, toKey, op); } public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException { return delegate.rangeQuery(tx, name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { delegate.submit(tx, proc); } public void abort(long tx) { delegate.abort(tx); } public void commit(long tx) { delegate.commit(tx); } } --- NEW FILE: DataServiceClient.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 15, 2007 */ package com.bigdata.service; import java.util.Properties; import java.util.concurrent.ExecutionException; import com.bigdata.objndx.IBatchOp; import com.bigdata.service.DataService.RangeQueryResult; /** * Client facing interface for communications with a data service. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo provide implementations for embedded vs remote data service instances. * Only remote data service instances discovered via the metadata locator * service will support a scale-out solution. */ public class DataServiceClient implements IDataService { final IDataService delegate; public DataServiceClient(Properties properties) { // @todo provide option for other kinds of connections. delegate = new EmbeddedDataServiceClient(properties); } private class EmbeddedDataServiceClient extends DataService { EmbeddedDataServiceClient(Properties properties) { super(properties); } } /* * @todo implement remote data service client talking to NIO service * instance. this needs to locate the transaction manager service and * the metadata service for each index used by the client. */ abstract private class NIODataServiceClient implements IDataService { } /** * Polite shutdown does not accept new requests and will shutdown once * the existing requests have been processed. */ public void shutdown() { ((DataService)delegate).shutdown(); } /** * Shutdown attempts to abort in-progress requests and shutdown as soon * as possible. */ public void shutdownNow() { ((DataService)delegate).shutdownNow(); } public void batchOp(long tx, String name, IBatchOp op) throws InterruptedException, ExecutionException { delegate.batchOp(tx, name, op); } public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { delegate.map(tx, name, fromKey, toKey, op); } public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException { return delegate.rangeQuery(tx, name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { delegate.submit(tx, proc); } public void abort(long tx) { delegate.abort(tx); } public void commit(long tx) { delegate.commit(tx); } } --- NEW FILE: IProcedure.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 15, 2007 */ package com.bigdata.service; import com.bigdata.journal.IIndexStore; import com.bigdata.journal.IJournal; import com.bigdata.journal.ITx; /** * A procedure to be executed on an {@link IDataService}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IProcedure { /** * Run the procedure. * <p> * Unisolated procedures have "auto-commit" ACID properties for the local * {@link IDataService} on which they execute, but DO NOT have distributed * ACID properties. In order for a distributed procedure to be ACID, the * procedure MUST be fully isolated. * * @param tx * The transaction identifier (aka start time) -or- zero (0L) IFF * this is an unisolationed operation. * @param store * The store against which writes will be made. If the procedure * is running inside of a transaction, then this will be an * {@link ITx}. If the procedure is running unisolated, then * this will be an {@link IJournal}. */ public void apply(long tx,IIndexStore store); } --- NEW FILE: TransactionService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use th... [truncated message content] |