|
From: Bryan T. <tho...@us...> - 2007-04-21 10:37:23
|
Update of /cvsroot/cweb/bigdata/src/java/com/bigdata/service In directory sc8-pr-cvs4.sourceforge.net:/tmp/cvs-serv23696/src/java/com/bigdata/service Modified Files: DataServer.java IWritePipeline.java IDataService.java DataService.java DataServiceClient.java EmbeddedDataService.java IReadOnlyProcedure.java IProcedure.java Log Message: Wrote test case for IDataService#submit(...) and modified that method to return an optional result from the procedure. Index: DataServer.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** DataServer.java 20 Apr 2007 16:36:27 -0000 1.5 --- DataServer.java 21 Apr 2007 10:37:16 -0000 1.6 *************** *** 59,62 **** --- 59,67 ---- /** * The bigdata data server. + * <p> + * The {@link DataServer} starts the {@link DataService}. The server and + * service are configured using a {@link Configuration} file whose name is + * passed to the {@link DataServer#DataServer(String[])} constructor or + * {@link #main(String[])}. * * @see src/resources/config for sample configurations. *************** *** 86,91 **** /** ! * Starts a new {@link DataServer}. ! * * @param args * The name of the {@link Configuration} file for the service. --- 91,101 ---- /** ! * Starts a new {@link DataServer}. This can be done programmatically ! * by executing ! * <pre> ! * new DataServer(args).run(); ! * </pre> ! * within a {@link Thread}. ! * * @param args * The name of the {@link Configuration} file for the service. Index: IWritePipeline.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IWritePipeline.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IWritePipeline.java 18 Mar 2007 22:29:43 -0000 1.1 --- IWritePipeline.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.service; + import com.bigdata.rawstore.IRawStore; + /** * An interface used to pipeline writes against index partitions over one or *************** *** 60,63 **** --- 62,112 ---- * on which they are writing and the write requests should be chained down * the configured write pipeline. + * + * @todo verify that conditional insert logic can not cause inconsistent data to + * appear depending on the order in which writes are received by the data + * services in a pipeline. For the pipeline to be consistent the order in + * which client operations execute MUST NOT differ on different data + * services in the pipeline for the same index partition. Consider that + * two clients are loading RDF documents whose terms overlap. If the order + * of the client operations differs on the different services for the + * pipeline, then different term identifiers could be assigned to the same + * term by different data services. + * + * @todo commit (and group commit) semantics must be respected by the pipeline. + * this is basically a (potentially special) case of a 2-phase commit. If + * the, e.g., the last data services in the pipeline suddenly runs out of + * disk or otherwise "hiccups" then then it will not be consistent with + * the other replicas of the index partition in that pipeline. I need to + * think through how to handle this further. For example, the commit could + * propagate along the pipeline, but that does not work if group commit is + * triggered by different events (latency and data volumn) on different + * data services. + * <p> + * A distributed file system solves this problem by having only a single + * data service that is the chokepoint for concurrency control (and hence + * for consistency control) for any given index partition. Essentially, + * the distributed file system provides media redundency - the same bytes + * in the same order appear in each copy of the file backing the journal. + * <p> + * So, it seems that a way to achieve that without a distributed file + * system is to have the write pipeline operate and the {@link IRawStore} + * API. It simply streams writes down to the next service in the pipeline + * and that is the sole way in which downstream services have their stores + * written. Since low-level writes can be 1 GB/sec on a transient buffer, + * this protocol could be separated from the data service and become a + * media redundency protocol only. Downstream writes would not even need + * to sync to disk on "sync" but only on buffer overflow since the data + * service at the head of the pipeline is already providing restart-safe + * state and they are providing redundency for the first point of failure. + * If the data service does fail, then the first media redundency service + * would sync its state to disk and take over as the data service. + * <p> + * Work through how the service accepts responsibility for media + * redundency for files, how it names its local files (source host / + * filename?), how replicated files are managed (close, closeAndDelete, + * bulk read, syncToDisk, etc.) + * <p> + * Work through how index partitions can be shed or picked up by data + * services in this media redundency model. */ public interface IWritePipeline { Index: IDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IDataService.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** IDataService.java 20 Apr 2007 17:24:23 -0000 1.9 --- IDataService.java 21 Apr 2007 10:37:16 -0000 1.10 *************** *** 45,48 **** --- 45,49 ---- import java.io.IOException; + import java.io.Serializable; import java.util.UUID; import java.util.concurrent.ExecutionException; *************** *** 299,359 **** throws InterruptedException, ExecutionException, IOException, IOException; ! ! // /** ! // * Typesafe enum for flags that control the behavior of ! // * {@link IDataService#rangeQuery(long, String, byte[], byte[], int, int)} ! // * ! // * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! // * @version $Id$ ! // */ ! // public static enum RangeQueryEnum { ! // ! // /** ! // * Flag specifies that keys in the key range will be returned. When not ! // * given, the keys will NOT be included in the {@link ResultSetChunk}s ! // * sent to the client. ! // */ ! // Keys(1 << 1), ! // ! // /** ! // * Flag specifies that values in the key range will be returned. When ! // * not given, the values will NOT be included in the ! // * {@link ResultSetChunk}s sent to the client. ! // */ ! // Values(1 << 2); ! // ! // private final int flag; ! // ! // private RangeQueryEnum(int flag) { ! // ! // this.flag = flag; ! // ! // } ! // ! // /** ! // * True iff this flag is set. ! // * ! // * @param flags ! // * An integer on which zero or more flags have been set. ! // * ! // * @return True iff this flag is set. ! // */ ! // public boolean isSet(int flags) { ! // ! // return (flags & flag) == 1; ! // ! // } ! // ! // /** ! // * The bit mask for this flag. ! // */ ! // public final int valueOf() { ! // ! // return flag; ! // ! // } ! // ! // }; ! /** * <p> --- 300,304 ---- throws InterruptedException, ExecutionException, IOException, IOException; ! /** * <p> *************** *** 385,401 **** * The procedure to be executed. * * @throws IOException * @throws InterruptedException * @throws ExecutionException */ ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException; - // /** - // * Execute a map worker task against all key/value pairs in a key range, - // * writing the results onto N partitions of an intermediate file. - // */ - // public void map(long tx, String name, byte[] fromKey, byte[] toKey, - // IMapOp op) throws InterruptedException, ExecutionException; - } --- 330,344 ---- * The procedure to be executed. * + * @return The result, which is entirely defined by the procedure + * implementation and which MAY be null. In general, this MUST be + * {@link Serializable} since it may have to pass across a network + * interface. + * * @throws IOException * @throws InterruptedException * @throws ExecutionException */ ! public Object submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException; } Index: DataServiceClient.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataServiceClient.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** DataServiceClient.java 20 Apr 2007 17:24:24 -0000 1.7 --- DataServiceClient.java 21 Apr 2007 10:37:16 -0000 1.8 *************** *** 112,118 **** // } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { ! delegate.submit(tx, proc); ! } public void abort(long tx) throws IOException { --- 112,118 ---- // } ! // public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException, IOException { ! // delegate.submit(tx, proc); ! // } public void abort(long tx) throws IOException { Index: IProcedure.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IProcedure.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IProcedure.java 15 Mar 2007 16:11:11 -0000 1.1 --- IProcedure.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 1,45 **** /** ! The Notice below must appear in each file of the Source Code of any ! copy you distribute of the Licensed Product. Contributors to any ! Modifications may add their own copyright notices to identify their ! own contributions. ! License: ! The contents of this file are subject to the CognitiveWeb Open Source ! License Version 1.1 (the License). You may not copy or use this file, ! in either source code or executable form, except in compliance with ! the License. You may obtain a copy of the License from ! http://www.CognitiveWeb.org/legal/license/ ! Software distributed under the License is distributed on an AS IS ! basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See ! the License for the specific language governing rights and limitations ! under the License. ! Copyrights: ! Portions created by or assigned to CognitiveWeb are Copyright ! (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact ! information for CognitiveWeb is available at ! http://www.CognitiveWeb.org ! Portions Copyright (c) 2002-2003 Bryan Thompson. ! Acknowledgements: ! Special thanks to the developers of the Jabber Open Source License 1.0 ! (JOSL), from which this License was derived. This License contains ! terms that differ from JOSL. ! Special thanks to the CognitiveWeb Open Source Contributors for their ! suggestions and support of the Cognitive Web. ! Modifications: ! */ /* * Created on Mar 15, 2007 --- 1,45 ---- /** ! The Notice below must appear in each file of the Source Code of any ! copy you distribute of the Licensed Product. Contributors to any ! Modifications may add their own copyright notices to identify their ! own contributions. ! License: ! The contents of this file are subject to the CognitiveWeb Open Source ! License Version 1.1 (the License). You may not copy or use this file, ! in either source code or executable form, except in compliance with ! the License. You may obtain a copy of the License from ! http://www.CognitiveWeb.org/legal/license/ ! Software distributed under the License is distributed on an AS IS ! basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See ! the License for the specific language governing rights and limitations ! under the License. ! Copyrights: ! Portions created by or assigned to CognitiveWeb are Copyright ! (c) 2003-2003 CognitiveWeb. All Rights Reserved. Contact ! information for CognitiveWeb is available at ! http://www.CognitiveWeb.org ! Portions Copyright (c) 2002-2003 Bryan Thompson. ! Acknowledgements: ! Special thanks to the developers of the Jabber Open Source License 1.0 ! (JOSL), from which this License was derived. This License contains ! terms that differ from JOSL. ! Special thanks to the CognitiveWeb Open Source Contributors for their ! suggestions and support of the Cognitive Web. ! Modifications: ! */ /* * Created on Mar 15, 2007 *************** *** 48,51 **** --- 48,53 ---- package com.bigdata.service; + import java.io.Serializable; + import com.bigdata.journal.IIndexStore; import com.bigdata.journal.IJournal; *************** *** 54,62 **** /** * A procedure to be executed on an {@link IDataService}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IProcedure { /** --- 56,72 ---- /** * A procedure to be executed on an {@link IDataService}. + * <p> + * Note: while this interface is {@link Serializable}, that provides only for + * communicating state to the {@link IDataService}. If an instance of this + * procedure will cross a network interface, then the implementation Class MUST + * be available to the {@link IDataService} on which it will execute. This can + * be as simple as bundling the procedure into a JAR that is part of the + * CLASSPATH used to start a {@link DataService} or you can use downloaded code + * with the JINI codebase mechanism. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ ! public interface IProcedure extends Serializable { /** *************** *** 76,81 **** * {@link ITx}. If the procedure is running unisolated, then * this will be an {@link IJournal}. */ ! public void apply(long tx,IIndexStore store); ! } --- 86,96 ---- * {@link ITx}. If the procedure is running unisolated, then * this will be an {@link IJournal}. + * + * @return The result, which is entirely defined by the procedure + * implementation and which MAY be null. In general, this MUST be + * {@link Serializable} since it may have to pass across a network + * interface. */ ! public Object apply(long tx, IIndexStore store); ! } Index: EmbeddedDataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/EmbeddedDataService.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** EmbeddedDataService.java 20 Apr 2007 17:24:24 -0000 1.6 --- EmbeddedDataService.java 21 Apr 2007 10:37:16 -0000 1.7 *************** *** 121,127 **** // } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { ! delegate.submit(tx, proc); ! } public void abort(long tx) throws IOException { --- 121,127 ---- // } ! // public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { ! // delegate.submit(tx, proc); ! // } public void abort(long tx) throws IOException { Index: IReadOnlyProcedure.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/IReadOnlyProcedure.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** IReadOnlyProcedure.java 15 Mar 2007 16:11:10 -0000 1.1 --- IReadOnlyProcedure.java 21 Apr 2007 10:37:16 -0000 1.2 *************** *** 48,52 **** package com.bigdata.service; ! public interface IReadOnlyProcedure { } \ No newline at end of file --- 48,59 ---- package com.bigdata.service; ! /** ! * Procedures that implement this marker interface will be executed within a ! * read-only context and MUST NOT attempt to write on an index. ! * ! * @author <a href="mailto:tho...@us...">Bryan Thompson</a> ! * @version $Id$ ! */ ! public interface IReadOnlyProcedure extends IProcedure { } \ No newline at end of file Index: DataService.java =================================================================== RCS file: /cvsroot/cweb/bigdata/src/java/com/bigdata/service/DataService.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** DataService.java 20 Apr 2007 17:24:23 -0000 1.9 --- DataService.java 21 Apr 2007 10:37:16 -0000 1.10 *************** *** 49,53 **** import java.io.IOException; - import java.io.Serializable; import java.net.InetSocketAddress; import java.rmi.Remote; --- 49,52 ---- *************** *** 71,74 **** --- 70,75 ---- import com.bigdata.isolation.UnisolatedBTree; import com.bigdata.journal.AbstractJournal; + import com.bigdata.journal.IAtomicStore; + import com.bigdata.journal.ICommitter; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; *************** *** 76,81 **** /** ! * An implementation of a data service suitable for use with RPC, direct client ! * calls (if decoupled by an operation queue), or a NIO interface. * <p> * This implementation is thread-safe. It will block for each operation. It MUST --- 77,82 ---- /** ! * An implementation of a network-capable {@link IDataService}. The service is ! * started using the {@link DataServer} class. * <p> * This implementation is thread-safe. It will block for each operation. It MUST *************** *** 101,113 **** * an NIO interface to the data service. * ! * @todo add assertOpen() throughout ! * ! * @todo declare interface for managing service shutdown()/shutdownNow()? * * @todo support group commit for unisolated writes. i may have to refactor some * to get group commit to work for both transaction commits and unisolated * writes. basically, the tasks on the ! * {@link AbstractJournal#writeService write service} need to get ! * aggregated. * * @todo implement NIODataService, RPCDataService(possible), EmbeddedDataService --- 102,135 ---- * an NIO interface to the data service. * ! * @todo Note that "auto-commit" is provided for unisolated writes. This relies ! * on two things. First, only the {@link UnisolatedBTree} recoverable from ! * {@link AbstractJournal#getIndex(String)} is mutable - all other ways to ! * recover the named index will return a read-only view of a historical ! * committed state. Second, an explicit {@link IAtomicStore#commit()} must ! * be performed to make the changes restart-safe. The commit can only be ! * performed when no unisolated write is executing (even presuming that ! * different mutable btrees can receive writes concurrently) since it will ! * cause all dirty {@link ICommitter} to become restart-safe. Group commit ! * is essential to high throughput when unisolated writes are relatively ! * small. * * @todo support group commit for unisolated writes. i may have to refactor some * to get group commit to work for both transaction commits and unisolated * writes. basically, the tasks on the ! * {@link AbstractJournal#writeService} need to get aggregated (or each ! * commit examines the length of the write queue (basically, is there ! * another write in the queue or is this the last one), latency and data ! * volumn since the last commit and makes a decision whether or not to ! * commit at that time; if the commit is deferred, then it is placed onto ! * a queue of operations that have not finished and for which we can not ! * yet report "success" - even though additional unisolated writes must ! * continue to run; we also need to make sure that a commit will occur at ! * the first opportunity following the minimum latency -- even if no ! * unisolated writes are scheduled (or a single client would hang waiting ! * for a commit). ! * ! * @todo add assertOpen() throughout ! * ! * @todo declare interface for managing service shutdown()/shutdownNow()? * * @todo implement NIODataService, RPCDataService(possible), EmbeddedDataService *************** *** 155,159 **** * bi-directional transfer? * ! * @todo We will use non-blocking I/O for the page transfer protocol. Review * options to secure that protocol since we can not use JERI for that * purpose. For example, using SSL or an SSH tunnel. For most purposes I --- 177,182 ---- * bi-directional transfer? * ! * @todo We will use non-blocking I/O for the data transfer protocol in order to ! * support an efficient pipelining of writes across data services. Review * options to secure that protocol since we can not use JERI for that * purpose. For example, using SSL or an SSH tunnel. For most purposes I *************** *** 458,462 **** } ! public void submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { --- 481,485 ---- } ! public Object submit(long tx, IProcedure proc) throws InterruptedException, ExecutionException { *************** *** 469,477 **** if(isolated) { ! txService.submit(new TxProcedureTask(tx,proc)).get(); } else if( readOnly ) { ! readService.submit(new UnisolatedReadProcedureTask(proc)).get(); } else { --- 492,519 ---- if(isolated) { ! return txService.submit(new TxProcedureTask(tx,proc)).get(); } else if( readOnly ) { ! /* ! * FIXME The IReadOnlyInterface is a promise that the procedure will ! * not write on an index (or anything on the store), but it is NOT a ! * guarentee. Consider removing that interface and the option to run ! * unisolated as anything but "read/write" since a "read-only" that ! * in fact attempted to write could cause problems with the index ! * data structure. Alternatively, examine other means for running a ! * "read-only" unisolated store that enforces read-only semantics. ! * ! * For example, since the IIndexStore defines only a single method, ! * getIndex(String), we could provide an implementation of that ! * method that always selected a historical committed state for the ! * index. This would make writes impossible since they would be ! * rejected by the index object itself. ! * ! * Fix this and write tests that demonstrate that writes are ! * rejected if the proc implements IReadOnlyProcedure. ! */ ! ! return readService.submit(new UnisolatedReadProcedureTask(proc)).get(); } else { *************** *** 481,485 **** * complete writes MUST be committed. */ ! journal.serialize(new UnisolatedReadWriteProcedureTask(proc)).get(); } --- 523,527 ---- * complete writes MUST be committed. */ ! return journal.serialize(new UnisolatedReadWriteProcedureTask(proc)).get(); } *************** *** 511,523 **** /** * ! * FIXME the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for reads as ! * well as writes. * ! * FIXME support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that encode a ! * column name and datum or write time into the key to those that will ! * filter based on inspection of the value associated with the key, e.g., ! * only values having some attribute. * * @todo if we allow the filter to cause mutations (e.g., deleting matching --- 553,565 ---- /** * ! * @todo the iterator needs to be aware of the defintion of a "row" for the ! * sparse row store so that we can respect the atomic guarentee for ! * reads as well as writes. * ! * @todo support filters. there are a variety of use cases from clients that ! * are aware of version counters and delete markers to clients that ! * encode a column name and datum or write time into the key to those ! * that will filter based on inspection of the value associated with ! * the key, e.g., only values having some attribute. * * @todo if we allow the filter to cause mutations (e.g., deleting matching *************** *** 1099,1105 **** public Object call() throws Exception { ! proc.apply(tx.getStartTimestamp(),tx); ! ! return null; } --- 1141,1145 ---- public Object call() throws Exception { ! return proc.apply(tx.getStartTimestamp(),tx); } *************** *** 1123,1129 **** public Object call() throws Exception { ! proc.apply(0L,journal); ! ! return null; } --- 1163,1167 ---- public Object call() throws Exception { ! return proc.apply(0L,journal); } *************** *** 1155,1166 **** } ! public Long call() throws Exception { try { ! super.call(); // commit (synchronous, immediate). ! return journal.commit(); } catch(Throwable t) { --- 1193,1206 ---- } ! public Object call() throws Exception { try { ! Object result = super.call(); // commit (synchronous, immediate). ! journal.commit(); ! ! return result; } catch(Throwable t) { |