From: Bryan T. <tho...@us...> - 2007-03-18 12:59:39
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv12438/src/java/com/bigdata/service Modified Files: EmbeddedDataService.java IDataService.java DataService.java IMapOp.java DataServiceClient.java NIODataService.java TransactionService.java Added Files: MapReduceService.java IMetadataService.java IServiceShutdown.java package.html MetadataService.java Log Message: A little more work setting up for the services architecture. Index: NIODataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/NIODataService.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** NIODataService.java 15 Mar 2007 16:11:10 -0000 1.1 --- NIODataService.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 86,90 **** * replication. */ ! public class NIODataService { // --- 86,100 ---- * replication. */ ! public class NIODataService implements IServiceShutdown { ! ! public void shutdown() { ! // TODO Auto-generated method stub ! ! } ! ! public void shutdownNow() { ! // TODO Auto-generated method stub ! ! } // Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** DataService.java 15 Mar 2007 16:11:11 -0000 1.1 --- DataService.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 135,139 **** * abort the tx. */ ! public class DataService implements IDataService { protected Journal journal; --- 135,139 ---- * abort the tx. */ ! public class DataService implements IDataService, IServiceShutdown { protected Journal journal; *************** *** 286,293 **** */ ! public void commit(long tx) { // will place task on writeService and block iff necessary. ! journal.commit(tx); } --- 286,293 ---- */ ! public long commit(long tx) { // will place task on writeService and block iff necessary. ! return journal.commit(tx); } *************** *** 390,396 **** } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, ! boolean countOnly, boolean keysOnly, boolean valuesOnly) ! throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); --- 390,395 ---- } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException { if( name == null ) throw new IllegalArgumentException(); *************** *** 400,406 **** "Unisolated context not allowed"); ! RangeQueryResult result = (RangeQueryResult)txService.submit( ! new RangeQueryTask(tx, name, fromKey, toKey, countOnly, ! keysOnly, valuesOnly)).get(); return result; --- 399,404 ---- "Unisolated context not allowed"); ! RangeQueryResult result = (RangeQueryResult) txService.submit( ! new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); return result; *************** *** 408,445 **** } ! /** ! * @todo if unisolated or isolated at the read-commit level, then the ! * operation really needs to be broken down by partition or perhaps by ! * index segment leaf so that we do not have too much latency during a ! * read (this could be done for rangeQuery as well). ! * ! * @todo if fully isolated, then there is no problem running map. ! * ! * @todo The definition of a row is different if using a key formed from the ! * column name, application key, and timestamp. ! * ! * @todo For at least GOM we need to deserialize rows from byte[]s, so we ! * need to have the (de-)serializer to the application level value on ! * hand. ! */ ! public void map(long tx, String name, byte[] fromKey, byte[] toKey, ! IMapOp op) throws InterruptedException, ExecutionException { ! ! if( name == null ) throw new IllegalArgumentException(); ! ! if (tx == 0L) ! throw new UnsupportedOperationException( ! "Unisolated context not allowed"); ! ! RangeQueryResult result = (RangeQueryResult) txService.submit( ! new RangeQueryTask(tx, name, fromKey, toKey, false, false, ! false)).get(); ! ! // @todo resolve the reducer service. ! IReducer reducer = null; ! ! op.apply(result.itr, reducer); ! ! } /** --- 406,444 ---- } ! // /** ! // * @todo if unisolated or isolated at the read-commit level, then the ! // * operation really needs to be broken down by partition or perhaps by ! // * index segment leaf so that we do not have too much latency during a ! // * read (this could be done for rangeQuery as well). ! // * ! // * @todo if fully isolated, then there is no problem running map. ! // * ! // * @todo The definition of a row is different if using a key formed from the ! // * column name, application key, and timestamp. ! // * ! // * @todo For at least GOM we need to deserialize rows from byte[]s, so we ! // * need to have the (de-)serializer to the application level value on ! // * hand. ! // */ ! // public void map(long tx, String name, byte[] fromKey, byte[] toKey, ! // IMapOp op) throws InterruptedException, ExecutionException { ! // ! // if( name == null ) throw new IllegalArgumentException(); ! // ! // if (tx == 0L) ! // throw new UnsupportedOperationException( ! // "Unisolated context not allowed"); ! // ! // int flags = 0; // @todo set to deliver keys + values for map op. ! // ! // RangeQueryResult result = (RangeQueryResult) txService.submit( ! // new RangeQueryTask(tx, name, fromKey, toKey, flags)).get(); ! // ! // // @todo resolve the reducer service. ! // IReducer reducer = null; ! // ! // op.apply(result.itr, reducer); ! // ! // } /** *************** *** 630,641 **** private final byte[] fromKey; private final byte[] toKey; ! private final boolean countOnly; ! private final boolean keysOnly; ! private final boolean valuesOnly; private final ITx tx; ! public RangeQueryTask(long startTime, String name, byte[] fromKey, byte[] toKey, ! boolean countOnly, boolean keysOnly, boolean valuesOnly) { assert startTime != 0L; --- 629,638 ---- private final byte[] fromKey; private final byte[] toKey; ! private final int flags; private final ITx tx; ! public RangeQueryTask(long startTime, String name, byte[] fromKey, ! byte[] toKey, int flags) { assert startTime != 0L; *************** *** 664,670 **** this.fromKey = fromKey; this.toKey = toKey; ! this.countOnly = countOnly; ! this.keysOnly = keysOnly; ! this.valuesOnly = valuesOnly; } --- 661,665 ---- this.fromKey = fromKey; this.toKey = toKey; ! this.flags = flags; } *************** *** 681,684 **** --- 676,681 ---- final int count = ndx.rangeCount(fromKey, toKey); + + boolean countOnly = false; final IEntryIterator itr = (countOnly ? null : ndx.rangeIterator( *************** *** 686,690 **** return new RangeQueryResult(count, itr, tx.getStartTimestamp(), ! name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } --- 683,687 ---- return new RangeQueryResult(count, itr, tx.getStartTimestamp(), ! name, fromKey, toKey, flags); } *************** *** 709,719 **** public final byte[] fromKey; public final byte[] toKey; ! public final boolean countOnly; ! public final boolean keysOnly; ! public final boolean valuesOnly; public RangeQueryResult(int count, IEntryIterator itr, long startTime, String name, ! byte[] fromKey, byte[] toKey, boolean countOnly, ! boolean keysOnly, boolean valuesOnly) { this.count = count; --- 706,713 ---- public final byte[] fromKey; public final byte[] toKey; ! public final int flags; public RangeQueryResult(int count, IEntryIterator itr, long startTime, String name, ! byte[] fromKey, byte[] toKey, int flags) { this.count = count; *************** *** 725,731 **** this.fromKey = fromKey; this.toKey = toKey; ! this.countOnly = countOnly; ! this.keysOnly = keysOnly; ! this.valuesOnly = valuesOnly; } --- 719,723 ---- this.fromKey = fromKey; this.toKey = toKey; ! this.flags = flags; } Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IDataService.java 15 Mar 2007 16:11:11 -0000 1.1 --- IDataService.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 174,180 **** * @param fromKey * @param toKey ! * @param countOnly ! * @param keysOnly ! * @param valuesOnly * * @exception InterruptedException --- 174,178 ---- * @param fromKey * @param toKey ! * @param flags (@todo define flags: count yes/no, keys yes/no, values yes/no) * * @exception InterruptedException *************** *** 191,203 **** */ public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, boolean countOnly, boolean keysOnly, ! boolean valuesOnly) throws InterruptedException, ExecutionException; ! /** ! * Maps an operation against all key/value pairs in a key range, writing the ! * result onto a reducer service. ! */ ! public void map(long tx, String name, byte[] fromKey, byte[] toKey, ! IMapOp op) throws InterruptedException, ExecutionException; } --- 189,200 ---- */ public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, ! byte[] toKey, int flags) throws InterruptedException, ExecutionException; ! // /** ! // * Execute a map worker task against all key/value pairs in a key range, ! // * writing the results onto N partitions of an intermediate file. ! // */ ! // public void map(long tx, String name, byte[] fromKey, byte[] toKey, ! // IMapOp op) throws InterruptedException, ExecutionException; } Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** DataServiceClient.java 15 Mar 2007 16:11:12 -0000 1.1 --- DataServiceClient.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 116,125 **** } ! public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { ! delegate.map(tx, name, fromKey, toKey, op); ! } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } --- 116,125 ---- } ! // public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { ! // delegate.map(tx, name, fromKey, toKey, op); ! // } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); } *************** *** 132,137 **** } ! public void commit(long tx) { ! delegate.commit(tx); } --- 132,137 ---- } ! public long commit(long tx) { ! return delegate.commit(tx); } --- NEW FILE: package.html --- <html> <head> <title>Services</title> </head> <body> <p> This package provides implementations of services (data service, transaction manager service, metadata locator service). </p> </body> </html> Index: TransactionService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/TransactionService.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TransactionService.java 15 Mar 2007 16:11:11 -0000 1.1 --- TransactionService.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 98,102 **** * and index segments? */ ! public class TransactionService implements ITransactionManager { /** --- 98,102 ---- * and index segments? */ ! public class TransactionService implements ITransactionManager, IServiceShutdown { /** --- NEW FILE: MetadataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 17, 2007 */ package com.bigdata.service; import java.net.InetSocketAddress; import java.util.Properties; import com.bigdata.journal.Journal; import com.bigdata.scaleup.MasterJournal; import com.bigdata.scaleup.MetadataIndex; /** * Implementation of a metadata service for a named scale-out index. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public class MetadataService implements IMetadataService, IServiceShutdown { /** * The name of the index. */ protected final String name; /** * The name of the journal on which the metadata index is stored. * * @todo support two-tier metadata index and reconcile with * {@link MetadataIndex} and {@link MasterJournal}. */ protected final Journal journal; protected MetadataIndex mdi; public MetadataService(Properties properties) { /* * @todo setup/resolve the journal and the metadata index on * the journal. */ throw new UnsupportedOperationException(); } public static void main(String[] args) { } public InetSocketAddress getDataService(byte[] key) { // TODO Auto-generated method stub return null; } public int getEntryCount() { // TODO Auto-generated method stub return 0; } public int rangeCount(byte[] fromKey,byte[] toKey) { // TODO Auto-generated method stub return 0; } public void shutdown() { // TODO Auto-generated method stub } public void shutdownNow() { // TODO Auto-generated method stub } } Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** EmbeddedDataService.java 15 Mar 2007 16:11:10 -0000 1.1 --- EmbeddedDataService.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 69,73 **** * using delegation patterns). */ ! public class EmbeddedDataService implements IDataService { private final DataService delegate; --- 69,73 ---- * using delegation patterns). */ ! public class EmbeddedDataService implements IDataService, IServiceShutdown { private final DataService delegate; *************** *** 112,121 **** } ! public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { ! delegate.map(tx, name, fromKey, toKey, op); ! } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, boolean countOnly, boolean keysOnly, boolean valuesOnly) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, countOnly, keysOnly, valuesOnly); } --- 112,121 ---- } ! // public void map(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) throws InterruptedException, ExecutionException { ! // delegate.map(tx, name, fromKey, toKey, op); ! // } ! public RangeQueryResult rangeQuery(long tx, String name, byte[] fromKey, byte[] toKey, int flags) throws InterruptedException, ExecutionException { ! return delegate.rangeQuery(tx, name, fromKey, toKey, flags); } *************** *** 128,133 **** } ! public void commit(long tx) { ! delegate.commit(tx); } --- 128,133 ---- } ! public long commit(long tx) { ! return delegate.commit(tx); } Index: IMapOp.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IMapOp.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IMapOp.java 15 Mar 2007 16:11:12 -0000 1.1 --- IMapOp.java 17 Mar 2007 23:14:58 -0000 1.2 *************** *** 1,5 **** package com.bigdata.service; ! import com.bigdata.objndx.IEntryIterator; /** --- 1,5 ---- package com.bigdata.service; ! import com.bigdata.objndx.BytesUtil; /** *************** *** 12,29 **** /** ! * The name of the reducer service. */ ! public String getReducer(); /** ! * Apply the operator to the key/value stream, writing results onto the ! * reducer service. * ! * @param src ! * The key/value stream. ! * @param reducer ! * The reducer service. */ ! public void apply(IEntryIterator src,IReducer reducer); ! } \ No newline at end of file --- 12,55 ---- /** ! * The hash function used to assign map output keys to reduce tasks. This is ! * normally <code>hash(key) mod R</code>, where hash(key) is ! * {@link BytesUtil#hash(byte[])} and R is the #of reduce tasks. ! * ! * @param key ! * @return */ ! public int reduceHashCode(byte[] key); ! ! /** ! * Each map task will be presented with key-value pairs. When the source is ! * an index, the key-value pairs will be presented in key order. The map ! * operator is responsible for writting zero or more key value pairs on the ! * output sink. Those key value pairs will be assigned to N different reduce ! * tasks by applying the user-defined hash function to the output key. ! * ! * @param key ! * The input key. ! * @param val ! * The input value. ! * @param out ! * The output sink. ! */ ! public void map(byte[] key, byte[] val, IOutput out); /** ! * Each reduce task will be presented with a series of key-value pairs in ! * key order. However, the keys will be distributed across the N reduce ! * tasks by the used defined hash function, so this is NOT a total ordering ! * over the intermediate keys. * ! * @param key ! * @param val */ ! public void reduce(byte[] key, byte[] val); ! public static interface IOutput { ! ! public void append(byte[] key,byte[] val); ! ! } ! } --- NEW FILE: IServiceShutdown.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 17, 2007 */ package com.bigdata.service; /** * API for service shutdown. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo reconcile with jini startup/shutdown. */ public interface IServiceShutdown { /** * The service will no longer accept new requests, but existing requests * will be processed (sychronous). * * @return Once the service has finished processing pending requests. */ public void shutdown(); /** * The service will no longer accept new requests and will make a best * effort attempt to terminate all existing requests and return ASAP. * * @return Once the service has shutdown. */ public void shutdownNow(); } --- NEW FILE: MapReduceService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 17, 2007 */ package com.bigdata.service; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.bigdata.util.concurrent.DaemonThreadFactory; /** * A draft implementation of a map/reduce service. Map/reduce is a functional * programming style in which a program is broken down into a <i>map</i> and a * <i>reduce</i> operation. Those operations are trivially parallelized and * distributed across one or more worker tasks on available hosts. There will be * M map tasks and N reduce tasks for each map/reduce operation. * <p> * The inputs to the map operation are key-value pairs. Logicall, each map * operation processes a key-value pair, writing a set of intermediate key-value * pairs as its output. The outputs are automatically partitioned into N local * temporary files (one per reduce task) using a user-defined hash function. * <p> * Each reduce task reads from the M distinct files (one per map operation) * having data for the intermediate key-value partition assigned to that reduce * task. The keys in those partitions are essentially random since they are * assigned to partitions by the (user-defined) hash function. Before execution, * the reduce task inputs are placed into a total order, e.g., using a sort or * bulk index load. The reduce task is then run on the total order, writing its * outputs onto a single output file. There will be one such output file per * reduce task. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * * @todo the reduce output is written to an arbitrary sink. common uses are * probably a NOP (the N merged indices are the outputs) a 2nd stage * reduce (producing a total merge of the intermediate values) or various * kinds of local flat files. * * @todo the master needs to keep track of the state each worker task on each * host. * * @todo the master needs a queue for input jobs and should expose an HTML * monitor for job status, as well as email notification of results. * * @todo The Apache Hadoop project provides a MapReduce implementation. Explore * possible ways in which an integration could be achieved. * * @todo support "debugging" using a version that executes tasks for a single * partition of subset of the data. */ public class MapReduceService implements IServiceShutdown { /** * Queue of executing jobs. */ final protected ExecutorService jobService = Executors .newSingleThreadExecutor(DaemonThreadFactory .defaultThreadFactory()); /** * @todo define lookup of the bigdata instance against which the named * indices will be resolved. * * @param properties */ public MapReduceService(Properties properties) { } public void shutdown() { jobService.shutdown(); } public void shutdownNow() { jobService.shutdownNow(); } /** * Submit a job. * @param m * @param n * @param tx * @param name * @param fromKey * @param toKey * @param op */ public Future submit(int m, int n, long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) { return jobService.submit(new Job(m,n,tx,name,fromKey,toKey,op)); } /** * A scheduled map/reduce task. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class Job implements Callable<Object> { final int m; final int n; final long tx; final String name; final byte[] fromKey; final byte[] toKey; final IMapOp op; /* * @todo status for each of the M map tasks and N reduce tasks, * including where those tasks are running. */ final Object status = null; /* * @todo these of course need to be remote tasks that are tracked with * a heartbeat. */ final ExecutorService mapService; final ExecutorService reduceService; /** * A map/reduce job whose inputs are a key range of a named index. * * @param m * @param n * @param tx * @param fromKey * @param toKey * @param op * * @todo verify named index exists. * * @todo by passing in the isolation level rather than the start time * (or 0L) we can use the most recent data and do not retain old * resources (journals and index segments) however, the downside * is that you can create a multi-job transaction. by passing in * the tx time, you can create a multi-job transaction if that is * desired. * * @todo supporting unisolated reads requires traversal under concurrent * modification, e.g., cursor restart after each "batch" read. */ public Job(int m, int n, long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op) { assert m > 0; assert n > 0; assert name != null; this.m = m; this.n = n; this.tx = tx; this.name = name; this.fromKey = fromKey; this.toKey = toKey; this.op = op; mapService = Executors.newFixedThreadPool(m, DaemonThreadFactory.defaultThreadFactory()); reduceService = Executors.newFixedThreadPool(n, DaemonThreadFactory.defaultThreadFactory()); } /** * Resolve the metadata service for the named index. * * @param name * The index name. * * @return The metadata service for the named index. * * @todo change the return type. */ protected IMetadataService getMetadataService(String name) { throw new UnsupportedOperationException(); } /** * @todo assuming the data is in indices, partition the input data. this * can be done with high accuracy by doing a rangeCount for the * index, and then requesting the M-1 keys at the index entry * positions that evenly divide the key space. since M can be * rather large, we can start tasks as we go. if we have to * restart a task, we can get its (approximate) key range by * issuing new queries against the index. * * @todo an alternative would assign map tasks to the hosts on which the * data resides (this can apply with index inputs or with flat * file inputs). if we are aware of the network topology, then we * can assign the map tasks to hosts "near" the hosts on which the * data resides. * * @todo start up M map tasks; there is no point starting reduce tasks * until the map tasks have completed since we need to provide a * total ordering into the reduce tasks. */ public Object call() throws Exception { IMetadataService mds = getMetadataService(name); final int nentries = mds.rangeCount(fromKey,toKey); return null; } } /** * A worker for a map task. * * @todo buffer the intermediate results for each map task on M buffers * (indexed by the user-defined hash of the intermediate key); the * buffers are btrees multiplexed on a journal; on the overflow, evict * one index segment per btree resulting in N index segments per map * task (so that the reduce task will start with each input partition * in sorted order); * * @todo the map task should be run on the data service (i.e., define a * procedure and submit it to the data service). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class MapWorker implements Callable<Object> { protected final long tx; protected final String name; protected final byte[] fromKey; protected final byte[] toKey; protected final IMapOp op; protected final int taskId; protected final int numReduce; public MapWorker(long tx, String name, byte[] fromKey, byte[] toKey, IMapOp op, int taskId, int numReduce) { this.tx = tx; this.name = name; this.fromKey = fromKey; this.toKey = toKey; this.op = op; this.taskId = taskId; this.numReduce = numReduce; } public Object call() throws Exception { // TODO Auto-generated method stub return null; } } /** * A worker for a reduce task. * * @todo reduce tasks may begin running as soon as intermediate output files * become available; the input to each reduce task is M index segments * (one per map task); the data in those segments are already in * sorted order, but they need to be placed into a total sorted order * before running the reduce task. Given the tools on hand, the * easiest way to achieve a total order over the reduce task inputs is * to build a partitioned index. Since each reduce task input is * already in sorted order, we can build the total sorted order by * reading from the M input segments in parallel (an M-way merge). * Since M can be quite high and the keys are randomly distributed * across the input by the user-defined hash function, this merge * operation will need to scale up to a large fan-in (100,000+). * * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public static class ReduceWorker implements Callable<Object> { public Object call() throws Exception { // TODO Auto-generated method stub return null; } } } --- NEW FILE: IMetadataService.java --- /** The Notice below must appear in each file of the Source Code of any copy you distribute of the Licensed Product. Contributors to any Modifications may add their own copyright notices to identify their own contributions. License: The contents of this file are subject to the CognitiveWeb Open Source License Version 1.1 (the License). You may not copy or use this file, in either source code or executable form, except in compliance with the License. You may obtain a copy of the License from http://www.CognitiveWeb.org/legal/license/ Software distributed under the License is distributed on an AS IS basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. Copyrights: Portions created by or assigned to CognitiveWeb are Copyright (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact information for CognitiveWeb is available at http://www.CognitiveWeb.org Portions Copyright (c) 2002-2003 Bryan Thompson. Acknowledgements: Special thanks to the developers of the Jabber Open Source License 1.0 (JOSL), from which this License was derived. This License contains terms that differ from JOSL. Special thanks to the CognitiveWeb Open Source Contributors for their suggestions and support of the Cognitive Web. Modifications: */ /* * Created on Mar 17, 2007 */ package com.bigdata.service; import java.net.InetSocketAddress; /** * A metadata service for a named index. * <p> * The metadata service maintains locator information for the data service * instances responsible for each partition in the named index. Partitions * are automatically split when they overflow (~200M) and joined when they * underflow (~50M). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IMetadataService { /** * The approximate number of entries in the index (non-transactional). */ public int getEntryCount(); /** * The approximate number of entries in the index for the specified key * range (non-transactional). * * @param fromKey * @param toKey * @return */ public int rangeCount(byte[] fromKey,byte[] toKey); /** * Return the address of the {@link IDataService} that has current primary * responsibility for the index partition that includes the specified key. * * @param key * The key. * * @return The locator for the {@link IDataService} with primary * responsibility for the index partition in which that key would be * located. * * @todo return primary and secondary data service locators with lease. * * @todo return primary and secondary data service locators with lease for * the index partition that would contain the key plus some number of * index partitions surrounding that partition. */ public InetSocketAddress getDataService(byte[] key); } |