From: <tho...@us...> - 2010-09-09 17:17:30
|
Revision: 3526 http://bigdata.svn.sourceforge.net/bigdata/?rev=3526&view=rev Author: thompsonbry Date: 2010-09-09 17:17:21 +0000 (Thu, 09 Sep 2010) Log Message: ----------- Working through support for moving bindingSet chunks around in scale-out and life cycle management of buffers in scale-out. I've raised the read/write timestamp into operator annotations. This might turn into a single "timestamp" operator and a BOp#isMutationOp() method to mark operators which write data rather than reading data. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/AllocationContextKey.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ServiceContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ShardContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -147,13 +147,6 @@ */ public interface Annotations { -// /** -// * A cross reference to the query identifier. This is required on -// * operators which associate distributed state with a query. [We can -// * probably get this from the evaluation context.] -// */ -// String QUERY_REF = "queryRef"; - /** * The unique identifier within a query for a specific {@link BOp}. The * {@link #QUERY_ID} and the {@link #BOP_ID} together provide a unique @@ -161,11 +154,19 @@ * query. */ String BOP_ID = "bopId"; - + /** * The timeout for the operator evaluation (milliseconds). * * @see #DEFAULT_TIMEOUT + * + * @todo Probably support both deadlines and timeouts. A deadline + * expresses when the query must be done while a timeout expresses + * how long it may run. A deadline may be imposed as soon as the + * query plan is formulated and could even be communicated from a + * remote client (e.g., as an httpd header). A timeout will always + * be interpreted with respect to the time when the query began to + * execute. */ String TIMEOUT = "timeout"; @@ -175,12 +176,30 @@ long DEFAULT_TIMEOUT = Long.MAX_VALUE; /** + * The timestamp (or transaction identifier) associated with a read from + * the database. + * + * @todo Combine the read and write timestamps as a single + * <code>TX</code> value and require this on any operator which + * reads or writes on the database. + */ + String READ_TIMESTAMP = BOp.class.getName() + ".readTimestamp"; + + /** + * The timestamp (or transaction identifier) associated with a write on + * the database. + */ + String WRITE_TIMESTAMP = BOp.class.getName() + ".writeTimestamp"; + + /** * For hash partitioned operators, this is the set of the member nodes * for the operator. * <p> * This annotation is required for such operators since the set of known * nodes of a given type (such as all data services) can otherwise * change at runtime. + * + * @todo Move onto an interface parallel to {@link IShardwisePipelineOp} */ String MEMBER_SERVICES = "memberServices"; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -30,6 +30,7 @@ import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; @@ -227,6 +228,10 @@ * source if the source will be ignored). * @throws IllegalArgumentException * if the <i>sink</i> is <code>null</code> + * + * @todo modify to accept {@link IChunkMessage} or an interface available + * from getChunk() on {@link IChunkMessage} which provides us with + * flexible mechanisms for accessing the chunk data. */ // * @throws IllegalArgumentException // * if the <i>indexManager</i> is <code>null</code> Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -50,8 +50,6 @@ /** * {@inheritDoc} - * - * @todo verify that this is a decent hash function. */ public int hashCode() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -1,44 +1,87 @@ package com.bigdata.bop.engine; +import java.io.Serializable; + import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.fed.FederatedRunningQuery; import com.bigdata.relation.accesspath.IAsynchronousIterator; /** - * A chunk of intermediate results which are ready to be consumed by some - * {@link BOp} in a specific query. + * An non-{@link Serializable} chunk of intermediate results which are ready to + * be consumed by some {@link BOp} in a specific query (this is only used in + * query evaluation for the standalone database). */ -public class BindingSetChunk { +public class BindingSetChunk implements IChunkMessage { + /** The query controller. */ + private final IQueryClient clientProxy; + /** * The query identifier. */ - final long queryId; + private final long queryId; /** * The target {@link BOp}. */ - final int bopId; + private final int bopId; /** * The index partition which is being targeted for that {@link BOp}. */ - final int partitionId; + private final int partitionId; /** * The binding sets to be consumed by that {@link BOp}. */ - final IAsynchronousIterator<IBindingSet[]> source; + private IAsynchronousIterator<IBindingSet[]> source; - public BindingSetChunk(final long queryId, final int bopId, - final int partitionId, + public IQueryClient getQueryController() { + return clientProxy; + } + + public long getQueryId() { + return queryId; + } + + public int getBOpId() { + return bopId; + } + + public int getPartitionId() { + return partitionId; + } + + public boolean isMaterialized() { + return true; + } + + /** + * + * @todo constructor to accept the BlockingBuffer instead as part of + * {@link IChunkMessage} harmonization (or an "IChunk" API). + */ + public BindingSetChunk(final IQueryClient clientProxy, final long queryId, + final int bopId, final int partitionId, final IAsynchronousIterator<IBindingSet[]> source) { + + if (clientProxy == null) + throw new IllegalArgumentException(); + if (source == null) throw new IllegalArgumentException(); + + this.clientProxy = clientProxy; + this.queryId = queryId; + this.bopId = bopId; + this.partitionId = partitionId; + this.source = source; + } public String toString() { @@ -47,5 +90,13 @@ + ",partitionId=" + partitionId + "}"; } + + public void materialize(FederatedRunningQuery runningQuery) { + // NOP + } + public IAsynchronousIterator<IBindingSet[]> iterator() { + return source; + } + } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -0,0 +1,93 @@ +package com.bigdata.bop.engine; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.fed.FederatedRunningQuery; +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.service.ResourceService; + +/** + * A message describing a chunk of intermediate results which are available for + * processing. There are several implementations of this interface supporting + * same-JVM messages, thick RMI messages, and RMI messages where the payload is + * materialized using NIO transfers from the {@link ResourceService}. + */ +public interface IChunkMessage { + + /** The proxy for the query controller. */ + IQueryClient getQueryController(); + + /** The query identifier. */ + long getQueryId(); + + /** The identifier for the target {@link BOp}. */ + int getBOpId(); + + /** The identifier for the target index partition. */ + int getPartitionId(); + + /* + * @todo Report the #of bytes available with this message. However, first + * figure out if that if the #of bytes in this {@link OutputChunk} or across + * all {@link OutputChunk}s available for the target service and sink. + */ + // @todo move to concrete subclass or allow ZERO if data are in memory (no RMI). +// /** The #of bytes of data which are available for that operator. */ +// int getBytesAvailable(); + + /** + * Return <code>true</code> if the chunk is materialized on the receiver. + */ + boolean isMaterialized(); + + /** + * Materialize the chunk on the receiver. + * + * @param runningQuery + * The running query. + */ + void materialize(FederatedRunningQuery runningQuery); + + /** + * Visit the binding sets in the chunk. + * + * @todo we do not need to use {@link IAsynchronousIterator} any more. This + * could be much more flexible and should be harmonized to support + * high volume operators, GPU operators, etc. probably the right thing + * to do is introduce another interface here with a getChunk():IChunk + * where IChunk let's you access the chunks data in different ways + * (and chunks can be both {@link IBindingSet}[]s and element[]s so we + * might need to raise that into the interfaces and/or generics as + * well). + * + * @todo It is likely that we can convert to the use of + * {@link BlockingQueue} instead of {@link BlockingBuffer} in the + * operators and then handle the logic for combining chunks inside of + * the {@link QueryEngine}. E.g., by scanning this list for chunks for + * the same bopId and combining them logically into a single chunk. + * <p> + * For scale-out, chunk combination will naturally occur when the node + * on which the operator will run requests the {@link ByteBuffer}s + * from the source nodes. Those will get wrapped up logically into a + * source for processing. For selective operators, those chunks can be + * combined before we execute the operator. For unselective operators, + * we are going to run over all the data anyway. + */ + IAsynchronousIterator<IBindingSet[]> iterator(); + + // /** + // * The Internet address and port of a {@link ResourceService} from which + // * the receiver may demand the data. + // */ + // InetSocketAddress getServiceAddr(); + // + // /** + // * The set of resources on the sender which comprise the data. + // */ + // Iterator<UUID> getChunkIds(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -1,76 +1,24 @@ package com.bigdata.bop.engine; -import java.rmi.Remote; import java.rmi.RemoteException; -import com.bigdata.bop.BOp; - /** * Interface for a client executing queries (the query controller). */ public interface IQueryClient extends IQueryPeer { - /* - * @todo Could return a data structure which encapsulates the query results - * and could allow multiple results from a query, e.g., one per step in a - * program. - */ - // /** -// * Evaluate a query which materializes elements, such as an -// * {@link IPredicate}. +// * Return the query. // * // * @param queryId -// * The unique identifier for the query. -// * @param timestamp -// * The timestamp or transaction against which the query will run. -// * @param query -// * The query to evaluate. -// * @param source -// * The initial binding sets to get the query going (this is -// * typically an iterator visiting a single empty binding set). +// * The query identifier. +// * @return The query. // * -// * @return An iterator visiting the elements materialized by the query. -// * -// * @throws Exception +// * @throws RemoteException // */ -// public IChunkedIterator<?> eval(long queryId, long timestamp, BOp query) -// throws Exception; +// public BOp getQuery(long queryId) throws RemoteException; -// /** -// * Evaluate a query which visits {@link IBindingSet}s, such as a join. -// * -// * @param queryId -// * The unique identifier for the query. -// * @param timestamp -// * The timestamp or transaction against which the query will run. -// * @param query -// * The query to evaluate. -// * @param source -// * The initial binding sets to get the query going (this is -// * typically an iterator visiting a single empty binding set). -// * -// * @return An iterator visiting {@link IBindingSet}s which result from -// * evaluating the query. -// * -// * @throws Exception -// */ -// public IChunkedIterator<IBindingSet> eval(long queryId, long timestamp, -// BOp query, IAsynchronousIterator<IBindingSet[]> source) -// throws Exception; - /** - * Return the query. - * - * @param queryId - * The query identifier. - * @return The query. - * - * @throws RemoteException - */ - public BOp getQuery(long queryId) throws RemoteException; - - /** * Notify the client that execution has started for some query, operator, * node, and index partition. */ @@ -84,22 +32,4 @@ */ public void haltOp(HaltOpMessage msg) throws RemoteException; -// /** -// * Notify the query controller that a chunk of intermediate results is -// * available for the query. -// * -// * @param queryId -// * The query identifier. -// */ -// public void addChunk(long queryId) throws RemoteException; -// -// /** -// * Notify the query controller that a chunk of intermediate results was -// * taken for processing by the query. -// * -// * @param queryId -// * The query identifier. -// */ -// public void takeChunk(long queryId) throws RemoteException; - } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -0,0 +1,25 @@ +package com.bigdata.bop.engine; + +import com.bigdata.bop.BindingSetPipelineOp; + +/** + * A query declaration. + */ +public interface IQueryDecl { + + /** + * The proxy for the query controller. + */ + IQueryClient getQueryController(); + + /** + * The query identifier. + */ + long getQueryId(); + + /** + * The query. + */ + BindingSetPipelineOp getQuery(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -1,6 +1,5 @@ package com.bigdata.bop.engine; -import java.net.InetSocketAddress; import java.rmi.Remote; import java.rmi.RemoteException; import java.util.UUID; @@ -23,26 +22,30 @@ UUID getServiceUUID() throws RemoteException; /** + * Declare a query to a peer. This message is sent to the peer before any + * other message for that query and declares the query and the query + * controller with which the peer must communicate during query evaluation. + * + * @param queryDecl + * The query declaration. + * + * @throws UnsupportedOperationException + * unless running in scale-out. + */ + void declareQuery(IQueryDecl queryDecl); + + /** * Notify a service that a buffer having data for some {@link BOp} in some * running query is available. The receiver may request the data when they * are ready. If the query is cancelled, then the sender will drop the * buffer. * - * @param clientProxy - * proxy used to communicate with the client running the query. - * @param serviceAddr - * address which may be used to demand the data. - * @param queryId - * the unique query identifier. - * @param bopId - * the identifier for the target {@link BOp}. + * @param msg + * The message. * - * @return <code>true</code> unless the receiver knows that the query has - * already been cancelled. + * @throws UnsupportedOperationException + * unless running in scale-out. */ -// * @param nbytes -// * The #of bytes of data which are available for that operator. - void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, - long queryId, int bopId/*, int nbytes*/) throws RemoteException; + void bufferReady(IChunkMessage msg) throws RemoteException; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -27,6 +27,7 @@ package com.bigdata.bop.engine; +import com.bigdata.bop.BOp; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.journal.IIndexManager; import com.bigdata.service.IBigdataFederation; @@ -58,12 +59,18 @@ /** * The timestamp or transaction identifier against which the query is * reading. + * + * @todo may be moved into the individual operator. See + * {@link BOp.Annotations#READ_TIMESTAMP} */ long getReadTimestamp(); /** * The timestamp or transaction identifier against which the query is * writing. + * + * @todo may be moved into the individual operator. See + * {@link BOp.Annotations#WRITE_TIMESTAMP} */ long getWriteTimestamp(); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -0,0 +1,82 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 9, 2010 + */ + +package com.bigdata.bop.engine; + +import java.io.Serializable; + +import com.bigdata.bop.BindingSetPipelineOp; + +/** + * Default implementation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class QueryDecl implements IQueryDecl, Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final long queryId; + + private final IQueryClient clientProxy; + + private final BindingSetPipelineOp query; + + public QueryDecl(final IQueryClient clientProxy, final long queryId, + final BindingSetPipelineOp query) { + + if (clientProxy == null) + throw new IllegalArgumentException(); + + if (query == null) + throw new IllegalArgumentException(); + + this.clientProxy = clientProxy; + + this.queryId = queryId; + + this.query = query; + + } + + public BindingSetPipelineOp getQuery() { + return query; + } + + public IQueryClient getQueryController() { + return clientProxy; + } + + public long getQueryId() { + return queryId; + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -27,7 +27,6 @@ package com.bigdata.bop.engine; -import java.net.InetSocketAddress; import java.rmi.RemoteException; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -50,8 +49,6 @@ import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.ITx; -import com.bigdata.journal.TimestampUtility; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.SPORelation; import com.bigdata.relation.IMutableRelation; @@ -328,6 +325,11 @@ .getLogger(QueryEngine.class); /** + * Error message used if a query is not running. + */ + protected static final transient String ERR_QUERY_NOT_RUNNING = "Query is not running:"; + + /** * Access to the indices. * <p> * Note: You MUST NOT use unisolated indices without obtaining the necessary @@ -477,6 +479,19 @@ private volatile boolean shutdown = false; /** + * Return if the query engine is running. + * + * @throws IllegalStateException + * if the query engine is shutting down. + */ + protected void assertRunning() { + + if (shutdown) + throw new IllegalStateException("Shutting down."); + + } + + /** * Runnable submits chunks available for evaluation against running queries. * * @todo Handle priority for selective queries based on the time remaining @@ -519,7 +534,7 @@ final long queryId = q.getQueryId(); if (q.isCancelled()) continue; - final BindingSetChunk chunk = q.chunksIn.poll(); + final IChunkMessage chunk = q.chunksIn.poll(); if (chunk == null) { // not expected, but can't do anything without a chunk. if (log.isDebugEnabled()) @@ -528,7 +543,7 @@ } if (log.isTraceEnabled()) log.trace("Accepted chunk: queryId=" + queryId - + ", bopId=" + chunk.bopId); + + ", bopId=" + chunk.getBOpId()); try { // create task. final FutureTask<?> ft = q.newChunkTask(chunk); @@ -558,19 +573,27 @@ * * @param chunk * A chunk of intermediate results. + * + * @throws IllegalArgumentException + * if the chunk is <code>null</code>. + * @throws IllegalStateException + * if the chunk is not materialized. */ - void add(final BindingSetChunk chunk) { + void acceptChunk(final IChunkMessage chunk) { if (chunk == null) throw new IllegalArgumentException(); - final RunningQuery q = runningQueries.get(chunk.queryId); + if (!chunk.isMaterialized()) + throw new IllegalStateException(); + + final RunningQuery q = runningQueries.get(chunk.getQueryId()); if(q == null) throw new IllegalStateException(); // add chunk to the query's input queue on this node. - q.add(chunk); + q.acceptChunk(chunk); // add query to the engine's task queue. priorityQueue.add(q); @@ -657,33 +680,35 @@ * IQueryPeer */ - public void bufferReady(IQueryClient clientProxy, - InetSocketAddress serviceAddr, long queryId, int bopId) { - // NOP + public void declareQuery(final IQueryDecl queryDecl) { + + throw new UnsupportedOperationException(); + } + public void bufferReady(IChunkMessage msg) { + + throw new UnsupportedOperationException(); + + } + /* * IQueryClient */ - /** - * @todo Define the behavior for these methods if the queryId is not found - * whether because the caller has the wrong value or because the query - * has terminated. - */ - public BOp getQuery(final long queryId) throws RemoteException { - - final RunningQuery q = runningQueries.get(queryId); - - if (q != null) { - - return q.getQuery(); - - } - - return null; - - } +// public BOp getQuery(final long queryId) throws RemoteException { +// +// final RunningQuery q = runningQueries.get(queryId); +// +// if (q != null) { +// +// return q.getQuery(); +// +// } +// +// return null; +// +// } public void startOp(final StartOpMessage msg) throws RemoteException { @@ -715,85 +740,110 @@ * * @param queryId * The unique identifier for the query. - * @param readTimestamp - * The timestamp or transaction against which the query will run. - * @param writeTimestamp - * The timestamp or transaction against which the query will - * write. * @param query * The query to evaluate. * * @return An iterator visiting {@link IBindingSet}s which result from * evaluating the query. * - * @throws IllegalArgumentException - * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} - * (queries may not read on the unisolated indices). - * @throws IllegalArgumentException - * if the <i>writeTimestamp</i> is neither - * {@link ITx#UNISOLATED} nor a read-write transaction - * identifier. * @throws IllegalStateException * if the {@link QueryEngine} has been {@link #shutdown()}. * @throws Exception - * - * @todo Consider elevating the read/write timestamps into the query plan as - * annotations. Closure would then rewrite the query plan for each - * pass, replacing the readTimestamp with the new read-behind - * timestamp. [This is related to how we will handle sequences of - * steps, parallel steps, and closure of steps.] */ - public RunningQuery eval(final long queryId, final long readTimestamp, - final long writeTimestamp, final BindingSetPipelineOp query) - throws Exception { + public RunningQuery eval(final long queryId, + final BindingSetPipelineOp query) throws Exception { if (query == null) throw new IllegalArgumentException(); - - if (readTimestamp == ITx.UNISOLATED) - throw new IllegalArgumentException(); - - if (TimestampUtility.isReadOnly(writeTimestamp)) - throw new IllegalArgumentException(); + final RunningQuery runningQuery = newRunningQuery(this, queryId, +// System.currentTimeMillis()/* begin */, + true/* controller */, this/* clientProxy */, query); + + assertRunning(); + final long timeout = query.getProperty(BOp.Annotations.TIMEOUT, BOp.Annotations.DEFAULT_TIMEOUT); - final RunningQuery runningQuery = newRunningQuery(this, queryId, - readTimestamp, writeTimestamp, - System.currentTimeMillis()/* begin */, timeout, - true/* controller */, this/* clientProxy */, query); + if (timeout < 0) + throw new IllegalArgumentException(BOp.Annotations.TIMEOUT); - if (shutdown) { + if (timeout != Long.MAX_VALUE) { - throw new IllegalStateException("Shutting down."); + // Compute the deadline (may overflow if timeout is very large). + final long deadline = System.currentTimeMillis() + timeout; + if (deadline > 0) { + /* + * Impose a deadline on the query. + */ + runningQuery.setDeadline(deadline); + + } + } - runningQueries.put(queryId, runningQuery); + putRunningQuery(queryId, runningQuery); return runningQuery; } /** + * Return the {@link RunningQuery} associated with that query identifier. + * + * @param queryId + * The query identifier. + * + * @return The {@link RunningQuery} -or- <code>null</code> if there is no + * query associated with that query identifier. + */ + protected RunningQuery getRunningQuery(final long queryId) { + + return runningQueries.get(queryId); + + } + + /** + * Places the {@link RunningQuery} object into the internal map. + * + * @param queryId + * The query identifier. + * @param runningQuery + * The {@link RunningQuery}. + */ + protected void putRunningQuery(final long queryId, + final RunningQuery runningQuery) { + + if (runningQuery == null) + throw new IllegalArgumentException(); + + runningQueries.put(queryId, runningQuery); + + } + + /** * Factory for {@link RunningQuery}s. */ protected RunningQuery newRunningQuery(final QueryEngine queryEngine, - final long queryId, final long readTimestamp, - final long writeTimestamp, final long begin, final long timeout, - final boolean controller, final IQueryClient clientProxy, - final BindingSetPipelineOp query) { + final long queryId, final boolean controller, + final IQueryClient clientProxy, final BindingSetPipelineOp query) { - return new RunningQuery(this, queryId, readTimestamp, writeTimestamp, - System.currentTimeMillis()/* begin */, timeout, - true/* controller */, this/* clientProxy */, query, - newQueryBuffer(query)); + return new RunningQuery(this, queryId, true/* controller */, + this/* clientProxy */, query, newQueryBuffer(query)); } /** * Return a buffer onto which the solutions will be written. + * + * @todo This method is probably in the wrong place. We should use whatever + * is associated with the top-level {@link BOp} in the query and then + * rely on the NIO mechanisms to move the data around as necessary. + * + * @todo Could return a data structure which encapsulates the query results + * and could allow multiple results from a query, e.g., one per step + * in a program. */ protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( final BindingSetPipelineOp query) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-09 13:50:53 UTC (rev 3525) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-09 17:17:21 UTC (rev 3526) @@ -27,7 +27,6 @@ */ package com.bigdata.bop.engine; -import java.nio.ByteBuffer; import java.rmi.RemoteException; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -43,7 +42,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -54,9 +52,9 @@ import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; -import com.bigdata.bop.ap.Predicate; import com.bigdata.journal.IIndexManager; -import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.journal.ITx; +import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.service.IBigdataFederation; @@ -103,10 +101,15 @@ */ final private long writeTimestamp; +// /** +// * The timestamp when the query was accepted by this node (ms). +// */ +// final private long begin; /** - * The timestamp when the query was accepted by this node (ms). + * The query deadline. The value is the system clock time in milliseconds + * when the query is due and {@link Long#MAX_VALUE} if there is no deadline. */ - final private long begin; + final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); /** * How long the query is allowed to run (elapsed milliseconds) -or- @@ -129,15 +132,16 @@ */ final private IQueryClient clientProxy; - /** The query iff materialized on this node. */ - final private AtomicReference<BOp> queryRef; +// /** The query iff materialized on this node. */ +// final private AtomicReference<BOp> queryRef; + /** The query. */ + final private BOp query; /** * The buffer used for the overall output of the query pipeline. * - * @todo How does the pipeline get attached to this buffer? Via a special - * operator? Or do we just target the coordinating {@link QueryEngine} - * as the sink of the last operator so we can use NIO transfers? + * FIXME SCALEOUT: This should only exist on the query controller. Other + * nodes will send {@link IChunkMessage}s to the query controller. */ final private IBlockingBuffer<IBindingSet[]> queryBuffer; @@ -208,26 +212,49 @@ private final Set<Integer/*bopId*/> startedSet = new LinkedHashSet<Integer>(); /** - * The chunks available for immediate processing. + * The chunks available for immediate processing (they must have been + * materialized). * <p> * Note: This is package private so it will be visible to the * {@link QueryEngine}. + */ + final/* private */BlockingQueue<IChunkMessage> chunksIn = new LinkedBlockingDeque<IChunkMessage>(); + + /** + * Set the query deadline. The query will be cancelled when the deadline is + * passed. If the deadline is passed, the query is immediately cancelled. * - * @todo It is likely that we can convert to the use of - * {@link BlockingQueue} instead of {@link BlockingBuffer} in the - * operators and then handle the logic for combining chunks inside of - * the {@link QueryEngine}. E.g., by scanning this list for chunks for - * the same bopId and combining them logically into a single chunk. - * <p> - * For scale-out, chunk combination will naturally occur when the node - * on which the operator will run requests the {@link ByteBuffer}s - * from the source nodes. Those will get wrapped up logically into a - * source for processing. For selective operators, those chunks can be - * combined before we execute the operator. For unselective operators, - * we are going to run over all the data anyway. + * @param deadline + * The deadline. + * @throws IllegalArgumentException + * if the deadline is non-positive. + * @throws IllegalStateException + * if the deadline was already set. + * @throws UnsupportedOperationException + * unless node is the query controller. */ - final /*private*/ BlockingQueue<BindingSetChunk> chunksIn = new LinkedBlockingDeque<BindingSetChunk>(); + public void setDeadline(final long deadline) { + if(!controller) + throw new UnsupportedOperationException(); + + if (deadline <= 0) + throw new IllegalArgumentException(); + + // set the deadline. + if (!this.deadline + .compareAndSet(Long.MAX_VALUE/* expect */, deadline/* update */)) { + // the deadline is already set. + throw new IllegalStateException(); + } + + if (deadline < System.currentTimeMillis()) { + // deadline has already expired. + cancel(true/* mayInterruptIfRunning */); + } + + } + /** * The class executing the query on this node. */ @@ -259,41 +286,45 @@ } - /** - * Return the operator tree for this query. If query processing is - * distributed and the query has not been materialized on this node, then it - * is materialized now. - * - * @return The query. - */ public BOp getQuery() { + return query; + } + +// /** +// * Return the operator tree for this query. If query processing is +// * distributed and the query has not been materialized on this node, then it +// * is materialized now. +// * +// * @return The query. +// */ +// public BOp getQuery() { +// +// if (queryRef.get() == null) { +// +// synchronized (queryRef) { +// +// if (queryRef.get() == null) { +// +// try { +// +// queryRef.set(clientProxy.getQuery(queryId)); +// +// } catch (RemoteException e) { +// +// throw new RuntimeException(e); +// +// } +// +// } +// +// } +// +// } +// +// return queryRef.get(); +// +// } - if (queryRef.get() == null) { - - synchronized (queryRef) { - - if (queryRef.get() == null) { - - try { - - queryRef.set(clientProxy.getQuery(queryId)); - - } catch (RemoteException e) { - - throw new RuntimeException(e); - - } - - } - - } - - } - - return queryRef.get(); - - } - /** * Return <code>true</code> iff this is the query controller. */ @@ -305,11 +336,8 @@ /** * Return the current statistics for the query and <code>null</code> unless - * this is the query controller. - * - * @todo When the query is done, there will be one entry in this map for - * each operator in the pipeline. Non-pipeline operators such as - * {@link Predicate}s do not currently make it into this map. + * this is the query controller. For {@link BindingSetPipelineOp} operator + * which is evaluated there will be a single entry in this map. */ public Map<Integer/*bopId*/,BOpStats> getStats() { @@ -323,35 +351,100 @@ * @param begin * @param clientProxy * @param query - * The query (optional). + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws IllegalArgumentException + * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} + * (queries may not read on the unisolated indices). + * @throws IllegalArgumentException + * if the <i>writeTimestamp</i> is neither + * {@link ITx#UNISOLATED} nor a read-write transaction + * identifier. + * + * @todo is queryBuffer required? should it be allocated from the top bop? */ public RunningQuery(final QueryEngine queryEngine, final long queryId, - final long readTimestamp, final long writeTimestamp, - final long begin, final long timeout, final boolean controller, +// final long begin, + final boolean controller, final IQueryClient clientProxy, final BOp query, final IBlockingBuffer<IBindingSet[]> queryBuffer) { + + if (queryEngine == null) + throw new IllegalArgumentException(); + + if (clientProxy == null) + throw new IllegalArgumentException(); + + if (query == null) + throw new IllegalArgumentException(); + this.queryEngine = queryEngine; this.queryId = queryId; - this.readTimestamp = readTimestamp; - this.writeTimestamp = writeTimestamp; - this.begin = begin; - this.timeout = timeout; +// this.begin = begin; this.controller = controller; this.clientProxy = clientProxy; - this.queryRef = new AtomicReference<BOp>(query); - if (controller && query == null) - throw new IllegalArgumentException(); + this.query = query; this.queryBuffer = queryBuffer; this.bopIndex = BOpUtility.getIndex(query); this.statsMap = controller ? new ConcurrentHashMap<Integer, BOpStats>() : null; + /* + * @todo when making a per-bop annotation, queries must obtain a tx for + * each timestamp up front on the controller and rewrite the bop to hold + * the tx until it is done. + * + * @todo This is related to how we handle sequences of steps, parallel + * steps, closure of steps, and join graphs. Those operations need to be + * evaluated on the controller. We will have to model the relationship + * between the subquery and the query in order to terminate the subquery + * when the query halts and to terminate the query if the subquery + * fails. + * + * @todo Closure operations must rewrite the query to update the + * annotations. Each pass in a closure needs to be its own "subquery" + * and will need to have a distinct queryId. + */ + final Long readTimestamp = query + .getProperty(BOp.Annotations.READ_TIMESTAMP); + + // @todo remove default when elevating to per-writable bop annotation. + final long writeTimestamp = query.getProperty( + BOp.Annotations.WRITE_TIMESTAMP, ITx.UNISOLATED); + + if (readTimestamp == null) + throw new IllegalArgumentException(); + + if (readTimestamp.longValue() == ITx.UNISOLATED) + throw new IllegalArgumentException(); + + if (TimestampUtility.isReadOnly(writeTimestamp)) + throw new IllegalArgumentException(); + + this.readTimestamp = readTimestamp; + + this.writeTimestamp = writeTimestamp; + + this.timeout = query.getProperty(BOp.Annotations.TIMEOUT, + BOp.Annotations.DEFAULT_TIMEOUT); + + if (timeout < 0) + throw new IllegalArgumentException(); + } /** - * Create a {@link BindingSetChunk} from a sink and add it to the queue. + * Take a chunk generated by some pass over an operator and make it + * available to the target operator. How this is done depends on whether the + * query is running against a standalone database or the scale-out database. * <p> - * Note: If we are running standalone, then we leave the data on the heap - * rather than formatting it onto a {@link ByteBuffer}. + * Note: The return value is used as part of the termination criteria for + * the query. + * <p> + * The default implementation supports a standalone database. The generated + * chunk is left on the Java heap and handed off synchronously using + * {@link QueryEngine#add(IChunkMessage)}. That method will queue the chunk + * for asynchronous processing. * * @param sinkId * The identifier of the target operator. @@ -363,39 +456,42 @@ * one chunk per index partition over which the intermediate results * were mapped. */ - protected <E> int add(final int sinkId, + protected <E> int handleOutputChunk(final int sinkId, final IBlockingBuffer<IBindingSet[]> sink) { /* * Note: The partitionId will always be -1 in scale-up. */ - final BindingSetChunk chunk = new BindingSetChunk(queryId, sinkId, - -1/* partitionId */, sink.iterator()); + final BindingSetChunk chunk = new BindingSetChunk(clientProxy, queryId, + sinkId, -1/* partitionId */, sink.iterator()); - queryEngine.add(chunk); + queryEngine.acceptChunk(chunk); return 1; - } + } /** * Make a chunk of binding sets available for consumption by the query. * <p> * Note: this is invoked by {@link QueryEngine#add(BindingSetChunk)}. * - * @param chunk + * @param msg * The chunk. */ - void add(final BindingSetChunk chunk) { + protected void acceptChunk(final IChunkMessage msg) { - if (chunk == null) + if (msg == null) throw new IllegalArgumentException(); + if (!msg.isMaterialized()) + throw new IllegalStateException(); + // verify still running. future.halted(); // add chunk to be consumed. - chunksIn.add(chunk); + chunksIn.add(msg); if (log.isDebugEnabled()) log.debug("queryId=" + queryId + ", chunksIn.size()=" @@ -409,30 +505,31 @@ * * @todo this should reject multiple invocations for a given query instance. */ - public void startQuery(final BindingSetChunk chunk) { + public void startQuery(final IChunkMessage chunk) { if (!controller) throw new UnsupportedOperationException(); if (chunk == null) throw new IllegalArgumentException(); - if (chunk.queryId != queryId) // @todo equals() if queryId is UUID. + if (chunk.getQueryId() != queryId) // @todo equals() if queryId is UUID. throw new IllegalArgumentException(); + final int bopId = chunk.getBOpId(); runStateLock.lock(); try { lifeCycleSetUpQuery(); availableChunkCount++; { - AtomicLong n = availableChunkCountMap.get(chunk.bopId); + AtomicLong n = availableChunkCountMap.get(bopId); if (n == null) - availableChunkCountMap.put(chunk.bopId, n = new AtomicLong()); + availableChunkCountMap.put(bopId, n = new AtomicLong()); n.incrementAndGet(); } if (log.isInfoEnabled()) log.info("queryId=" + queryId + ",runningTaskCount=" + runningTaskCount + ",availableChunks=" + availableChunkCount); - System.err.println("startQ : bopId=" + chunk.bopId + ",running=" + System.err.println("startQ : bopId=" + bopId + ",running=" + runningTaskCount + ",available=" + availableChunkCount); - queryEngine.add(chunk); + queryEngine.acceptChunk(chunk); } finally { runStateLock.unlock(); } @@ -484,13 +581,9 @@ System.err.println("startOp: bopId=" + msg.bopId + ",running=" + runningTaskCount + ",available=" + availableChunkCount + ",fanIn=" + msg.nchunks); - final long elapsed = System.currentTimeMillis() - begin; - if (log.isTraceEnabled()) - log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId - + ",serviceId=" + msg.serviceId + " : runningTaskCount=" - + runningTaskCount + ", availableChunkCount=" - + availableChunkCount + ", elapsed=" + elapsed); - if (elapsed > timeout) { + if (deadline.get() < System.currentTimeMillis()) { + if (log.isTraceEnabled()) + log.trace("queryId: deadline expired."); future.halt(new TimeoutException()); cancel(true/* mayInterruptIfRunning */); } @@ -563,13 +656,13 @@ + runningTaskCount; assert availableChunkCount >= 0 : "availableChunkCount=" + availableChunkCount; - final long elapsed = System.currentTimeMillis() - begin; +// final long elapsed = System.currentTimeMillis() - begin; if (log.isTraceEnabled()) log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId + ",serviceId=" + queryEngine.getServiceUUID() + ", nchunks=" + fanOut + " : runningTaskCount=" + runningTaskCount + ", availableChunkCount=" - + availableChunkCount + ", elapsed=" + elapsed); + + availableChunkCount);// + ", elapsed=" + elapsed); // test termination criteria if (msg.cause != null) { // operator failed on this chunk. @@ -582,8 +675,9 @@ // success (all done). future.halt(getStats()); cancel(true/* mayInterruptIfRunning */); - } else if (elapsed > timeout) { - // timeout + } else if (deadline.get() < System.currentTimeMillis()) { + if (log.isTraceEnabled()) + log.trace("queryId: deadline expired."); future.halt(new TimeoutException()); cancel(true/* mayInterruptIfRunning */); } @@ -614,8 +708,8 @@ if (!runStateLock.isHeldByCurrentThread()) throw new IllegalMonitorStateException(); - return PipelineUtility.isDone(bopId, queryRef.get(), bopIndex, - runningCountMap, availableChunkCountMap); + return PipelineUtility.isDone(bopId, query, bopIndex, runningCountMap, + availableChunkCountMap); } @@ -681,14 +775,16 @@ * A chunk to be consumed. */ @SuppressWarnings("unchecked") - protected FutureTask<Void> newChunkTask(final BindingSetChunk chunk) { + protected FutureTask<Void> newChunkTask(final IChunkMessage chunk) { /* * Look up the BOp in the index, create the BOpContext for that BOp, and * return the value returned by BOp.eval(context). */ - final BOp bop = bopIndex.get(chunk.bopId); + final int bopId = chunk.getBOpId(); + final int partitionId = chunk.getPartitionId(); + final BOp bop = bopIndex.get(bopId); if (bop == null) { - throw new NoSuchBOpException(chunk.bopId); + throw new NoSuchBOpException(bopId); } if (!(bop instanceof BindingSetPipelineOp)) { /* @@ -701,7 +797,7 @@ // self final BindingSetPipelineOp op = ((BindingSetPipelineOp) bop); // parent (null if this is the root of the operator tree). - final BOp p = BOpUtility.getParent(queryRef.get(), op); + final BOp p = BOpUtility.getParent(query, op); // sink (null unless parent is defined) final Integer sinkId = p == null ? null : (Integer) p .getProperty(BindingSetPipelineOp.Annotations.BOP_ID); @@ -716,8 +812,8 @@ final IBlockingBuffer<IBindingSet[]> altSink = altSinkId == null ? null : op.newBuffer(); // context - final BOpContext context = new BOpContext(this, chunk.partitionId, op - .newStats(), chunk.source, sink, altSink); + final BOpContext context = new BOpContext(this, partitionId, op + .newStats(), chunk.iterator(), sink, altSink); // FutureTask for operator execution (not running yet). final FutureTask<Void> f = op.eval(context); // Hook the FutureTask. @@ -729,29 +825,29 @@ int altSinkChunksOut = 0; try { clientProxy.startOp(new StartOpMessage(queryId, - chunk.bopId, chunk.partitionId, serviceId, fanIn)); + bopId, partitionId, serviceId, fanIn)); if (log.isDebugEnabled()) log.debug("Running chunk: queryId=" + queryId - + ", bopId=" + chunk.bopId + ", bop=" + bop); + + ", bopId=" + bopId + ", bop=" + bop); f.run(); // run f.get(); // verify success if (sink != queryBuffer && !sink.isEmpty()) { // handle output chunk. - sinkChunksOut += add(sinkId, sink); + sinkChunksOut += handleOutputChunk(sinkId, sink); } if (altSink != queryBuffer && altSink != null && !altSink.isEmpty()) { // handle alt sink output chunk. - altSinkChunksOut += add(altSinkId, altSink); + altSinkChunksOut += handleOutputChunk(altSinkId, altSink); } - clientProxy.haltOp(new HaltOpMessage(queryId, chunk.bopId, - chunk.partitionId, serviceId, null/* cause */, + clientProxy.haltOp(new HaltOpMessage(queryId, bopId, + partitionId, serviceId, null/* cause */, sinkId, sinkChunksOut, altSinkId, altSinkChunksOut, context.getStats())); } catch (Throwable t) { try { clientProxy.haltOp(new HaltOpMessage(queryId, - chunk.bopId, chunk.partitionId, serviceId, + bopId, partitionId, serviceId, t/* cause */, sinkId, sinkChunksOut, altSinkId, altSinkChunksOut, context.getStats())); } catch (RemoteE... [truncated message content] |