From: <tho...@us...> - 2010-09-08 13:16:42
|
Revision: 3515 http://bigdata.svn.sourceforge.net/bigdata/?rev=3515&view=rev Author: thompsonbry Date: 2010-09-08 13:16:30 +0000 (Wed, 08 Sep 2010) Log Message: ----------- Modified the ResourceService to use InetSocketAddress and NicUtil for configuration purposes. This change also touched the StoreManager and the MoveTask. Added a BOpEvaluationContext which specifies whether an operator must be mapped across shards, nodes, evaluated on the query controller, or if it may be evaluated without sending its inputs anywhere. This is only for scale-out. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/AbstractSampleIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestReceiveBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestReceiveFile.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -135,6 +135,14 @@ BOp clone(); /** + * Return the evaluation context for the operator. The default is + * {@link BOpEvaluationContext#ANY}. Operators which must be mapped against + * shards, mapped against nodes, or evaluated on the query controller must + * override this method. + */ + public BOpEvaluationContext getEvaluationContext(); + + /** * Interface declaring well known annotations. */ public interface Annotations { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -299,6 +299,7 @@ } + @SuppressWarnings("unchecked") public <T> T getProperty(final String name) { return (T) annotations.get(name); @@ -307,6 +308,7 @@ public <T> T getRequiredProperty(final String name) { + @SuppressWarnings("unchecked") final T tmp = (T) annotations.get(name); if (tmp == null) @@ -336,4 +338,10 @@ } + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.ANY; + + } + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -0,0 +1,48 @@ +package com.bigdata.bop; + +import com.bigdata.bop.bset.ConditionalRoutingOp; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.DistinctBindingSetOp; +import com.bigdata.bop.solutions.SliceOp; + +/** + * Type safe enumeration indicates where an operator may be evaluated. Operators + * fall into several distinct categories based on whether or not their inputs + * need to be made available on specific nodes ({@link #HASHED} or + * {@link #SHARDED}), whether they can be evaluated anywhere their inputs may + * exist ({@link #ANY}), or whether they must be evaluated at the query + * controller ({@link #CONTROLLER}). + * <p> + * Note: All operators are evaluated locally when running against a standalone + * database. + */ +public enum BOpEvaluationContext { + + /** + * The operator may be evaluated anywhere, including piecewise evaluation on + * any node of the cluster where its inputs are available. This is used for + * operators which do not need to concentrate or coordinate their inputs + * such as {@link ConditionalRoutingOp}. + */ + ANY, + /** + * The input to the operator must be mapped across nodes using a hash + * partitioning schema and the operator must be evaluated on each hash + * partition. This is used for operators such as + * {@link DistinctBindingSetOp}. + */ + HASHED, + /** + * The input to the operator must be mapped across the shards on which the + * operator must read or write and the operator must be evaluated shard wise + * on the services having access to each shard. For example, + * {@link PipelineJoin}. + */ + SHARDED, + /** + * The operator must be evaluated on the query controller. For example, + * {@link SliceOp} may not be evaluated piecewise. + */ + CONTROLLER; + +} \ No newline at end of file Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -50,10 +50,14 @@ * class depending on the operator). */ BOpStats newStats(); - + /** * Instantiate a buffer suitable as a sink for this operator. The buffer * will be provisioned based on the operator annotations. + * <p> + * Note: if the operation swallows binding sets from the pipeline (such as + * operators which write on the database) then the operator MAY return an + * immutable empty buffer. * * @return The buffer. */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -6,9 +6,9 @@ import com.bigdata.bop.BOp; /** - * Interface for a client executing queries. + * Interface for a client executing queries (the query controller). */ -public interface IQueryClient extends IQueryPeer, Remote { +public interface IQueryClient extends IQueryPeer { /* * @todo Could return a data structure which encapsulates the query results Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -28,7 +28,6 @@ package com.bigdata.bop.engine; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.rmi.RemoteException; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -410,6 +409,11 @@ /** * The currently executing queries. + * + * @todo DEADLINE: There should be a data structure representing + * {@link RunningQuery} having deadlines so we can + * {@link RunningQuery#cancel(boolean)} queries when their deadline + * expires. */ final ConcurrentHashMap<Long/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<Long, RunningQuery>(); @@ -480,20 +484,31 @@ * <p> * Handle priority for unselective queries based on the order in which * they are submitted? - * + * * @todo The approach taken by the {@link QueryEngine} executes one task per * pipeline bop per chunk. Outside of how the tasks are scheduled, * this corresponds closely to the historical pipeline query - * evaluation. The other difference is that there is less opportunity - * for concatenation of chunks. However, chunk concatenation could be - * performed here if we (a) mark the BindingSetChunk with a flag to - * indicate when it has been accepted; and (b) rip through the - * incoming chunks for the query for the target bop and combine them - * to feed the task. Chunks which have already been assigned would be - * dropped when take() discovers them above. [The chunk combination - * could also be done when we output the chunk if the sink has not - * been taken, e.g., by combining the chunk into the same target - * ByteBuffer, or when we add the chunk to the RunningQuery.] + * evaluation. + * <p> + * Chunk concatenation could be performed here if we (a) mark the + * {@link BindingSetChunk} with a flag to indicate when it has been + * accepted; and (b) rip through the incoming chunks for the query for + * the target bop and combine them to feed the task. Chunks which have + * already been assigned would be dropped when take() discovers them. + * [The chunk combination could also be done when we output the chunk + * if the sink has not been taken, e.g., by combining the chunk into + * the same target ByteBuffer, or when we add the chunk to the + * RunningQuery.] + * + * @todo SCALEOUT: High volume query operators must demand that their inputs + * are materialized before they can begin evaluation. Scaleout + * therefore requires a separate queue which looks at the metadata + * concerning chunks available on remote nodes for an operator which + * will run on this node and then demands the data either when the + * predecessors in the pipeline are done (operator at once evaluation) + * or when sufficient data are available to run the operator (mega + * chunk pipelining). Once the data are locally materialized, the + * operator may be queued for evaluation. */ private class QueryEngineTask implements Runnable { public void run() { @@ -778,17 +793,7 @@ } /** - * - * @todo if the top bop is an operation which writes on the database then it - * should swallow the binding sets from the pipeline and we should be - * able to pass along a <code>null</code> query buffer. - * - * @todo SCALEOUT: Return a proxy for the query buffer either here or when - * the query is sent along to another node for evaluation? - * <p> - * Actually, it would be nice if we could reuse the same NIO transfer - * of {@link ByteBuffer}s to move the final results back to the client - * rather than using a proxy object for the query buffer. + * Return a buffer onto which the solutions will be written. */ protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( final BindingSetPipelineOp query) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -55,7 +55,6 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.ap.Predicate; -import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -123,7 +122,11 @@ final private boolean controller; /** - * The client executing this query. + * The client executing this query (aka the query controller). + * <p> + * Note: The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. */ final private IQueryClient clientProxy; @@ -142,11 +145,11 @@ /** * An index from the {@link BOp.Annotations#BOP_ID} to the {@link BOp}. */ - private final Map<Integer, BOp> bopIndex; + protected final Map<Integer, BOp> bopIndex; /** - * A collection of the currently executing future for operators for this - * query. + * A collection of {@link Future}s for currently executing operators for + * this query. */ private final ConcurrentHashMap<BSBundle, Future<?>> operatorFutures = new ConcurrentHashMap<BSBundle, Future<?>>(); @@ -245,11 +248,6 @@ * Note: This is package private so it will be visible to the * {@link QueryEngine}. * - * @todo SCALEOUT: We need to model the chunks available before they are - * materialized locally such that (a) they can be materialized on - * demand (flow control); and (b) we can run the operator when there - * are sufficient chunks available without taking on too much data. - * * @todo It is likely that we can convert to the use of * {@link BlockingQueue} instead of {@link BlockingBuffer} in the * operators and then handle the logic for combining chunks inside of @@ -273,6 +271,19 @@ return queryEngine; } + + /** + * The client executing this query (aka the query controller). + * <p> + * Note: The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. + */ + public IQueryClient getQueryController() { + + return clientProxy; + + } /** * The unique identifier for this query. @@ -296,7 +307,7 @@ synchronized (queryRef) { - if (queryRef == null) { + if (queryRef.get() == null) { try { @@ -378,14 +389,16 @@ * rather than formatting it onto a {@link ByteBuffer}. * * @param sinkId + * The identifier of the target operator. * @param sink + * The intermediate results to be passed to that target operator. * * @return The #of chunks made available for consumption by the sink. This * will always be ONE (1) for scale-up. For scale-out, there will be * one chunk per index partition over which the intermediate results * were mapped. */ - protected int add(final int sinkId, + protected <E> int add(final int sinkId, final IBlockingBuffer<IBindingSet[]> sink) { /* @@ -394,23 +407,11 @@ final BindingSetChunk chunk = new BindingSetChunk(queryId, sinkId, -1/* partitionId */, sink.iterator()); - addChunkToQueryEngine(chunk); + queryEngine.add(chunk); return 1; } - - /** - * Adds a chunk to the local {@link QueryEngine}. - * - * @param chunk - * The chunk. - */ - protected void addChunkToQueryEngine(final BindingSetChunk chunk) { - - queryEngine.add(chunk); - - } /** * Make a chunk of binding sets available for consumption by the query. @@ -541,16 +542,6 @@ * * @throws UnsupportedOperationException * If this node is not the query coordinator. - * - * @todo Clone the {@link BOpStats} before reporting to avoid concurrent - * modification? - * - * @todo SCALEOUT: Do not release buffers backing the binding set chunks - * generated by an operator or the outputs of the final operator (the - * query results) until the sink has accepted those outputs. This - * means that we must not release the output buffers when the bop - * finishes but when its consumer finishes draining the {@link BOp}s - * outputs. */ public void haltOp(final HaltOpMessage msg) { if (!controller) @@ -768,11 +759,6 @@ final Runnable r = new Runnable() { public void run() { final UUID serviceId = queryEngine.getServiceId(); - /* - * @todo SCALEOUT: Combine chunks available on the queue for the - * current bop. This is not exactly "fan in" since multiple - * chunks could be available in scaleup as well. - */ int fanIn = 1; int sinkChunksOut = 0; int altSinkChunksOut = 0; @@ -822,17 +808,6 @@ * Return an iterator which will drain the solutions from the query. The * query will be cancelled if the iterator is * {@link ICloseableIterator#close() closed}. - * - * @return - * - * @todo Not all queries produce binding sets. For example, mutation - * operations. We could return the mutation count for mutation - * operators, which could be reported by {@link BOpStats} for that - * operator (unitsOut). - * - * @todo SCALEOUT: Track chunks consumed by the client so we do not release - * the backing {@link ByteBuffer} before the client is done draining - * the iterator. */ public IAsynchronousIterator<IBindingSet[]> iterator() { @@ -854,14 +829,17 @@ } /** - * @todo Cancelled queries must reject or drop new chunks, etc. - * <p> - * Queries must release all of their resources when they are done(). - * <p> - * Queries MUST NOT cause the solutions to be discarded before the - * client can consume them. This means that we have to carefully - * integrate {@link SliceOp} or just wrap the query buffer to impose - * the slice (simpler). + * {@inheritDoc} + * <p> + * Cancelled queries : + * <ul> + * <li>must reject new chunks</li> + * <li>must cancel any running operators</li> + * <li>must not begin to evaluate operators</li> + * <li>must release all of their resources</li> + * <li>must not cause the solutions to be discarded before the client can + * consume them.</li> + * </ul> */ final public boolean cancel(final boolean mayInterruptIfRunning) { // halt the query. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -36,6 +36,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; @@ -262,4 +263,14 @@ } + /** + * This operator must be evaluated on the query controller. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.CONTROLLER; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -38,7 +38,11 @@ import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ManagedResourceService; @@ -110,10 +114,10 @@ * join in a query plan. */ private final IBigdataFederation<?> fed; - + /** - * A service used to expose {@link ByteBuffer}s and managed index resources - * for transfer to remote services in support of distributed query + * The service used to expose {@link ByteBuffer}s and managed index + * resources for transfer to remote services in support of distributed query * evaluation. */ private final ManagedResourceService resourceService; @@ -176,6 +180,17 @@ } + /** + * The service used to expose {@link ByteBuffer}s and managed index + * resources for transfer to remote services in support of distributed query + * evaluation. + */ + public ManagedResourceService getResourceService() { + + return resourceService; + + } + @Override public void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, long queryId, int bopId) { @@ -201,4 +216,34 @@ } + /** + * {@inheritDoc} + * + * @todo Historically, this has been a proxy object for an {@link IBuffer} + * on the {@link IQueryClient query controller}. However, it would be + * nice if we could reuse the same NIO transfer of {@link ByteBuffer}s + * to move the final results back to the client rather than using a + * proxy object for the query buffer. + * <p> + * In scale-out we must track chunks consumed by the client so we do + * not release the backing {@link ByteBuffer} on which the solutions + * are marshalled before the client is done draining the iterator. If + * the solutions are generated on the peers, then the peers must + * retain the data until the client has consumed them or have + * transferred the solutions to itself. + * <p> + * The places where this can show up as a problem are {@link SliceOp}, + * when a query deadline is reached, and when a query terminates + * normally. Also pay attention when the client closes the + * {@link IAsynchronousIterator} from which it is draining solutions + * early. + */ + @Override + protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( + final BindingSetPipelineOp query) { + + return query.newBuffer(); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -31,11 +31,16 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.engine.BindingSetChunk; +import com.bigdata.bop.IPredicate; import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.mdi.PartitionLocator; +import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.jini.master.IAsynchronousClientTask; +import com.bigdata.striterator.IKeyOrder; /** * Extends {@link RunningQuery} to provide additional state and logic required @@ -43,8 +48,14 @@ * . * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ + * @version $Id: FederatedRunningQuery.java 3511 2010-09-06 20:45:37Z + * thompsonbry $ * + * @todo SCALEOUT: We need to model the chunks available before they are + * materialized locally such that (a) they can be materialized on demand + * (flow control); and (b) we can run the operator when there are + * sufficient chunks available without taking on too much data. + * * @todo SCALEOUT: Life cycle management of the operators and the query implies * both a per-query bop:NodeList map on the query coordinator identifying * the nodes on which the query has been executed and a per-query @@ -76,57 +87,137 @@ } /** - * Create a {@link BindingSetChunk} from a sink and add it to the queue. - * <p> - * Note: If we are running standalone, then we leave the data on the heap - * rather than formatting it onto a {@link ByteBuffer}. + * {@inheritDoc} * - * @param sinkId - * @param sink - * * @return The #of chunks made available for consumption by the sink. This * will always be ONE (1) for scale-up. For scale-out, there will be * one chunk per index partition over which the intermediate results * were mapped. * - * @todo <p> - * For selective queries in s/o, first format the data onto a list of - * byte[]s, one per target shard/node. Then, using a lock, obtain a - * ByteBuffer if there is none associated with the query yet. - * Otherwise, using the same lock, obtain a slice onto that ByteBuffer - * and put as much of the byte[] as will fit, continuing onto a newly - * recruited ByteBuffer if necessary. Release the lock and notify the - * target of the ByteBuffer slice (buffer#, off, len). Consider - * pushing the data proactively for selective queries. - * <p> - * For unselective queries in s/o, proceed as above but we need to get - * the data off the heap and onto the {@link ByteBuffer}s quickly - * (incrementally) and we want the consumers to impose flow control on - * the producers to bound the memory demand (this needs to be - * coordinated carefully to avoid deadlocks). Typically, large result - * sets should result in multiple passes over the consumer's shard - * rather than writing the intermediate results onto the disk. + * FIXME SCALEOUT: This is where we need to map the binding sets + * over the shards for the target operator. Once they are mapped, + * write the binding sets onto an NIO buffer for the target node and + * then send an RMI message to the node telling it that there is a + * chunk available for the given (queryId,bopId,partitionId). + * <p> + * For selective queries in s/o, first format the data onto a list + * of byte[]s, one per target shard/node. Then, using a lock, obtain + * a ByteBuffer if there is none associated with the query yet. + * Otherwise, using the same lock, obtain a slice onto that + * ByteBuffer and put as much of the byte[] as will fit, continuing + * onto a newly recruited ByteBuffer if necessary. Release the lock + * and notify the target of the ByteBuffer slice (buffer#, off, + * len). Consider pushing the data proactively for selective + * queries. + * <p> + * For unselective queries in s/o, proceed as above but we need to + * get the data off the heap and onto the {@link ByteBuffer}s + * quickly (incrementally) and we want the consumers to impose flow + * control on the producers to bound the memory demand (this needs + * to be coordinated carefully to avoid deadlocks). Typically, large + * result sets should result in multiple passes over the consumer's + * shard rather than writing the intermediate results onto the disk. * - * FIXME SCALEOUT: This is where we need to map the binding sets over - * the shards for the target operator. Once they are mapped, write the - * binding sets onto an NIO buffer for the target node and then send - * an RMI message to the node telling it that there is a chunk - * available for the given (queryId,bopId,partitionId). - */ + * */ @Override - protected int add(final int sinkId, + protected <E> int add(final int sinkId, final IBlockingBuffer<IBindingSet[]> sink) { - /* - * Note: The partitionId will always be -1 in scale-up. - */ - final BindingSetChunk chunk = new BindingSetChunk(getQueryId(), sinkId, - -1/* partitionId */, sink.iterator()); + if (sink == null) + throw new IllegalArgumentException(); - addChunkToQueryEngine(chunk); + final BOp bop = bopIndex.get(sinkId); - return 1; + if (bop == null) + throw new IllegalArgumentException(); + switch (bop.getEvaluationContext()) { + case ANY: + return super.add(sinkId, sink); + case HASHED: { + /* + * FIXME The sink self describes the nodes over which the + * binding sets will be mapped and the hash function to be applied + * so we look up those metadata and apply them to distributed the + * binding sets across the nodes. + */ + throw new UnsupportedOperationException(); + } + case SHARDED: { + /* + * FIXME The sink must read or write on a shard so we map the + * binding sets across the access path for the sink. + * + * @todo For a pipeline join, the predicate is the right hand + * operator of the sink. This might be true for INSERT and DELETE + * operators as well. + * + * @todo IKeyOrder tells us which index will be used and should be + * set on the predicate by the join optimizer. + * + * @todo Use the read or write timestamp depending on whether the + * operator performs mutation [this must be part of the operator + * metadata.] + * + * @todo Set the capacity of the the "map" buffer to the size of the + * data contained in the sink (in fact, we should just process the + * sink data in place). + */ + final IPredicate<E> pred = null; // @todo + final IKeyOrder<E> keyOrder = null; // @todo + final long timestamp = getReadTimestamp(); // @todo + final int capacity = 1000;// @todo + final MapBindingSetsOverShardsBuffer<IBindingSet, E> mapper = new MapBindingSetsOverShardsBuffer<IBindingSet, E>( + getFederation(), pred, keyOrder, timestamp, capacity) { + + @Override + IBuffer<IBindingSet> newBuffer(PartitionLocator locator) { + // TODO Auto-generated method stub + return null; + } + + }; + /* + * Map the binding sets over shards. + * + * FIXME The buffers created above need to become associated with + * this query as resources of the query. Once we are done mapping + * the binding sets over the shards, the target node for each buffer + * needs to be set an RMI message to let it know that there is a + * chunk available for it for the target operator. + */ + { + final IAsynchronousIterator<IBindingSet[]> itr = sink + .iterator(); + try { + while (itr.hasNext()) { + final IBindingSet[] chunk = itr.next(); + for (IBindingSet bset : chunk) { + mapper.add(bset); + } + } + } finally { + itr.close(); + sink.close(); + } + } + + throw new UnsupportedOperationException(); + } + case CONTROLLER: { + + final IQueryClient clientProxy = getQueryController(); + +// getQueryEngine().getResourceService().port; +// +// clientProxy.bufferReady(clientProxy, serviceAddr, getQueryId(), sinkId); + + throw new UnsupportedOperationException(); + } + default: + throw new AssertionError(bop.getEvaluationContext()); + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -84,9 +84,9 @@ extends AbstractUnsynchronizedArrayBuffer<E> { /** - * The predicate from which we generate the asBound predicates. This + * The predicate from which we generate the asBound binding sets. This * predicate and the {@link IKeyOrder} together determine the required - * access path. + * access path. */ private final IPredicate<F> pred; @@ -123,7 +123,12 @@ * @param fed * The federation. * @param pred - * The target predicate. + * The predicate associated with the target operator. The + * predicate identifies which variables and/or constants form the + * key for the access path and hence selects the shards on which + * the target operator must read or write. For example, when the + * target operator is a JOIN, this is the {@link IPredicate} + * associated with the right hand operator of the join. * @param keyOrder * Identifies the access path for the target predicate. * @param timestamp @@ -136,9 +141,10 @@ * The capacity of this buffer. */ public MapBindingSetsOverShardsBuffer( - final IBigdataFederation<?> fed, - final IPredicate<F> pred, final IKeyOrder<F> keyOrder, - final long timestamp, + final IBigdataFederation<?> fed,// + final IPredicate<F> pred, // + final IKeyOrder<F> keyOrder,// + final long timestamp,// final int capacity) { super(capacity); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -45,6 +45,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstraint; @@ -1774,4 +1775,14 @@ }// class JoinTask + /** + * This is a shard wise operator. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.SHARDED; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -35,8 +35,10 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IPredicate; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.ILocalBTreeView; @@ -70,6 +72,9 @@ * An ordered {@link IVariableOrConstant}[]. Elements will be created * using the binding sets which flow through the operator and * {@link IRelation#newElement(java.util.List, IBindingSet)}. + * + * @todo This should be an {@link IPredicate} and should be the right + * hand operand just like for a JOIN. */ String SELECTED = InsertOp.class.getName() + ".selected"; @@ -267,5 +272,15 @@ // .getClass()); // // a[i] = e; - + + /** + * This is a shard wise operator. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.SHARDED; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/AbstractSampleIndex.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/AbstractSampleIndex.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/AbstractSampleIndex.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -28,6 +28,7 @@ package com.bigdata.bop.ndx; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; @@ -103,4 +104,14 @@ } + /** + * This is a shard wise operator. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.SHARDED; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -33,6 +33,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; @@ -272,4 +273,14 @@ } + /** + * This operator must be evaluated on the query controller. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.CONTROLLER; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -30,6 +30,7 @@ import java.util.Map; import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; @@ -83,4 +84,16 @@ } + /** + * This operator must be evaluated on the query controller. + * + * @todo Define a distributed (external) merge sort operator. + */ + @Override + public BOpEvaluationContext getEvaluationContext() { + + return BOpEvaluationContext.CONTROLLER; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/MoveTask.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -31,7 +31,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -63,7 +63,6 @@ import com.bigdata.service.IMetadataService; import com.bigdata.service.MetadataService; import com.bigdata.service.ResourceService; -import com.bigdata.util.config.NicUtil; /** * Task moves an index partition to another {@link IDataService}. @@ -433,7 +432,7 @@ private final UUID targetDataServiceUUID; private final int targetIndexPartitionId; private final Event parentEvent; - private final InetAddress thisInetAddr; +// private final InetAddress thisInetAddr; /** * @@ -481,11 +480,11 @@ this.targetIndexPartitionId = targetIndexPartitionId; this.parentEvent = parentEvent; - try { - this.thisInetAddr = InetAddress.getByName(NicUtil.getIpAddress("default.nic", "default", false)); - } catch(Throwable t) { - throw new IllegalArgumentException(t.getMessage(), t); - } +// try { +// this.thisInetAddr = InetAddress.getByName(NicUtil.getIpAddress("default.nic", "default", false)); +// } catch(Throwable t) { +// throw new IllegalArgumentException(t.getMessage(), t); +// } } /** @@ -596,9 +595,8 @@ targetIndexPartitionId,// historicalWritesBuildResult.segmentMetadata,// bufferedWritesBuildResult.segmentMetadata,// - thisInetAddr, resourceManager - .getResourceServicePort()// + .getResourceService().getAddr()// )).get(); } catch (ExecutionException ex) { @@ -913,8 +911,8 @@ final private int targetIndexPartitionId; final private SegmentMetadata historyIndexSegmentMetadata; final private SegmentMetadata bufferedWritesIndexSegmentMetadata; - final private InetAddress addr; - final private int port; + final private InetSocketAddress addr; +// final private int port; /** * @param sourceIndexMetadata @@ -927,14 +925,14 @@ * Describes the {@link IndexSegmentStore} containing the * historical data for the source index partition. * @param bufferedWritesIndexSegmentMetadata - * Desribes the {@link IndexSegmentStore} containing the + * Describes the {@link IndexSegmentStore} containing the * buffered writes from the live journal for the source index * partition. * @param addr - * The {@link InetAddress} of the source data service. - * @param port - * The port at which the source data service has exposed its - * {@link ResourceService} + * The {@link InetSocketAddress} of the + * {@link ResourceService} running on the source data service + * (the one from which the resources will be copied during + * the move). */ ReceiveIndexPartitionTask(// final IndexMetadata sourceIndexMetadata,// @@ -942,8 +940,7 @@ final int targetIndexPartitionId,// final SegmentMetadata historyIndexSegmentMetadata,// final SegmentMetadata bufferedWritesIndexSegmentMetadata,// - final InetAddress addr, - final int port + final InetSocketAddress addr ) { this.sourceIndexMetadata = sourceIndexMetadata; @@ -952,7 +949,6 @@ this.historyIndexSegmentMetadata = historyIndexSegmentMetadata; this.bufferedWritesIndexSegmentMetadata = bufferedWritesIndexSegmentMetadata; this.addr = addr; - this.port = port; } // private transient DataService dataService; @@ -1000,8 +996,7 @@ targetIndexPartitionId,// historyIndexSegmentMetadata,// bufferedWritesIndexSegmentMetadata,// - addr,// - port// + addr// )).get(); // update the index partition receive counter. @@ -1058,8 +1053,8 @@ final private SegmentMetadata sourceBufferedWritesSegmentMetadata; final private Event parentEvent; final private String summary; - final InetAddress addr; - final int port; + final InetSocketAddress addr; +// final int port; /** * @param resourceManager @@ -1080,10 +1075,9 @@ * buffered writes from the live journal for the source index * partition. * @param addr - * The {@link InetAddress} of the source data service. - * @param port - * The port at which the source data service has exposed its - * {@link ResourceService} + * The {@link InetSocketAddress} of the + * {@link ResourceService} of the source data service (the + * one from which the resources will be copied). */ InnerReceiveIndexPartitionTask(final ResourceManager resourceManager, final String targetIndexName, @@ -1092,8 +1086,7 @@ final int targetIndexPartitionId, final SegmentMetadata historyIndexSegmentMetadata, final SegmentMetadata bufferedWritesIndexSegmentMetadata, - final InetAddress addr, - final int port + final InetSocketAddress addr ) { super(resourceManager.getConcurrencyManager(), ITx.UNISOLATED, @@ -1124,7 +1117,6 @@ this.sourceHistorySegmentMetadata = historyIndexSegmentMetadata; this.sourceBufferedWritesSegmentMetadata = bufferedWritesIndexSegmentMetadata; this.addr = addr; - this.port = port; this.summary = OverflowActionEnum.Move + "(" + sourceIndexName + "->" + targetIndexName + ")"; @@ -1267,7 +1259,7 @@ try { // read the resource, writing onto that file. - new ResourceService.ReadResourceTask(addr, port, + new ResourceService.ReadResourceTask(addr, sourceSegmentMetadata.getUUID(), file).call(); } catch (Throwable t) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -104,7 +104,6 @@ import com.bigdata.service.IBigdataFederation; import com.bigdata.service.ManagedResourceService; import com.bigdata.service.MetadataService; -import com.bigdata.service.ResourceService; import com.bigdata.sparse.SparseRowStore; import com.bigdata.util.concurrent.DaemonThreadFactory; import com.ibm.icu.impl.ByteBuffer; @@ -657,24 +656,24 @@ } - /** - * The port at which you can connect to the {@link ResourceService}. This - * service provides remote access to resources hosted by the owning - * {@link DataService}. This is used for moving resources to other data - * services in the federation, including supporting service failover. - * - * @return The port used to connect to that service. - * - * @todo this could also be used for remote backup. however, note that you - * can not read the live journal using this object. - */ - public int getResourceServicePort() { - - assertRunning(); - - return resourceService.port; - - } +// /** +// * The port at which you can connect to the {@link ResourceService}. This +// * service provides remote access to resources hosted by the owning +// * {@link DataService}. This is used for moving resources to other data +// * services in the federation, including supporting service failover. +// * +// * @return The port used to connect to that service. +// * +// * @todo this could also be used for remote backup. however, note that you +// * can not read the live journal using this object. +// */ +// public int getResourceServicePort() { +// +// assertRunning(); +// +// return resourceService.port; +// +// } /** * @see Options#IGNORE_BAD_FILES @@ -1569,7 +1568,7 @@ resourceService = new ManagedResourceService() { @Override - protected File getResource(UUID uuid) throws Exception { + protected File getResource(final UUID uuid) throws Exception { if (!isRunning()) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -64,12 +64,17 @@ private final ConcurrentHashMap<UUID, ByteBuffer> buffers = new ConcurrentHashMap<UUID, ByteBuffer>(); /** + * Constructor uses the default nic, any free port, and the default request + * service pool size. + * * @throws IOException */ public ManagedResourceService() throws IOException { + super(); } /** + * * @param port * @throws IOException */ @@ -82,8 +87,8 @@ * @param requestServicePoolSize * @throws IOException */ - public ManagedResourceService(int port, int requestServicePoolSize) - throws IOException { + public ManagedResourceService(final int port, + final int requestServicePoolSize) throws IOException { super(port, requestServicePoolSize); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java 2010-09-07 20:14:45 UTC (rev 3514) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java 2010-09-08 13:16:30 UTC (rev 3515) @@ -39,6 +39,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; @@ -70,11 +71,12 @@ import com.bigdata.rawstore.Bytes; import com.bigdata.util.concurrent.DaemonThreadFactory; import com.bigdata.util.concurrent.ShutdownHelper; +import com.bigdata.util.config.NicUtil; /** - * A class which permits buffers identified by a {@link UUID} to be read by a - * remote service. This class runs one thread to accept connections and thread - * pool to send data. + * A service which permits resources (managed files or buffers) identified by a + * {@link UUID} to be read by a remote service. This class runs one thread to + * accept connections and thread pool to send data. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -92,6 +94,12 @@ * overhead with establishing those connections and problems with * immediate reconnect under heavy load. * + * @todo Should have start() method to defer initialization of thread pools and + * the {@link ServerSocket} until outside of the constructor. Either + * handle visibility appropriately for those fields or move them and the + * shutdown protocol into an inner class which is only initialized when + * the service is running. + * * @todo Verify that multiple transfers may proceed in parallel to the from the * same source to the same receiver. This is important in order for low * latency queries to remain lively when we are moving a shard from one @@ -135,9 +143,15 @@ protected static final Logger log = Logger.getLogger(ResourceService.class); /** - * The port on which the service is accepting connections. + * The Internet socket address at which the service is accepting + * connections. */ - public final int port; + private final InetSocketAddress addr; + +// /** +// * The port on which the service is accepting connections. +// */ +// public final int port; /** * The server socket. @@ -150,163 +164,22 @@ private volatile boolean open = false; /** - * Performance counters for the {@link BufferService}. - * - * @todo could also monitor the accept and request thread pools. The latter - * is the more interesting from a workload perspective. + * Performance counters for this service. */ - static public class Counters { + public final Counters counters = new Counters(); - /** - * #of requests. - */ - public final CAT requestCount = new CAT(); - - /** - * #of requests which are denied. - */ - public final CAT denyCount = new CAT(); - - /** - * #of requests for which the resource was not found. - */ - public final CAT notFoundCount = new CAT(); - - /** - * #of requests which end in an internal error. - */ - public final CAT internalErrorCount = new CAT(); - - /** - * #of errors for responses where we attempt to write the requested data - * on the socket. - */ - public final CAT writeErrorCount = new CAT(); - - /** - * #of responses where we attempt to write the data on the socket. - */ - public final CAT nwrites = new CAT(); - - /** - * #of data bytes sent. - */ - public final CAT bytesWritten = new CAT(); - - /** - * The largest response written so far. - */ - public long maxWriteSize; - - /** - * A lock used to make updates to {@link #maxWriteSize} atomic. - */ - final private Object maxWriteSizeLock = new Object(); - - /** - * #of nanoseconds sending data (this will double count time for data - * that are served concurrently to different receivers). - */ - public final CAT elapsedWriteNanos = new CAT(); - - synchronized public CounterSet getCounters() { - - if (root == null) { - - root = new CounterSet(); - - /* - * #of requests and their status outcome counters. - */ - { - final CounterSet tmp = root.makePath("status"); - - tmp.addCounter("Request Count", new Instrument<Long>() { - public void sample() { - setValue(requestCount.get()); - } - }); - - tmp.addCounter("Deny Count", new Instrument<Long>() { - public void sample() { - setValue(denyCount.get()); - } - }); - - tmp.addCounter("Not Found Count", new Instrument<Long>() { - public void sample() { - setValue(notFoundCount.get()); - } - }); - - tmp.addCounter("Internal Error Count", new Instrument<Long>() { - public void sample() { - setValue(internalErrorCount.get()); - } - }); - - } - - /* - * writes (A write is a response where we try to write the file - * on the socket). - */ - { - - final CounterSet tmp = root.makePath("writes"); - tmp.addCounter("nwrites", new Instrument<Long>() { - public void sample() { - setValue(nwrites.get()); - } - }); - - tmp.addCounter("bytesWritten", new Instrument<Long>() { - public void sample() { - ... [truncated message content] |