From: <tho...@us...> - 2010-09-14 14:46:10
|
Revision: 3543 http://bigdata.svn.sourceforge.net/bigdata/?rev=3543&view=rev Author: thompsonbry Date: 2010-09-14 14:46:01 +0000 (Tue, 14 Sep 2010) Log Message: ----------- Refactored the RunState and ChunkTask out of the RunningQuery. Working on the federation based unit test setup. We can not use the EmbeddedFederation for this because the serviceId is shared by both data service instances. Unfortunately, we can no longer easily use the JiniServiceHelper either due to things like the jini group setup. I am going to tackle this next on a workstation with more RAM so I can attach to a running federation. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/R.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestMapBindingSetsOverShards.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/jini/start/config/JiniCoreServicesConfiguration.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/jini/start/process/JiniCoreServicesProcessHelper.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/service/jini/util/ branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/service/jini/util/JiniCoreServicesHelper.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/service/jini/util/JiniServicesHelper.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniCoreServicesHelper.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/util/JiniServicesHelper.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -3,7 +3,6 @@ import java.io.Serializable; import com.bigdata.bop.BOp; -import com.bigdata.bop.IBindingSet; import com.bigdata.bop.fed.FederatedRunningQuery; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -94,8 +93,16 @@ // NOP } - public IAsynchronousIterator<E[]> iterator() { - return source; + public IChunkAccessor<E> getChunkAccessor() { + return new ChunkAccessor(); } + private class ChunkAccessor implements IChunkAccessor<E> { + + public IAsynchronousIterator<E[]> iterator() { + return source; + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -18,68 +18,66 @@ private static final long serialVersionUID = 1L; /** The identifier of the query. */ - final long queryId; + final public long queryId; /** The identifier of the operator. */ - final int bopId; + final public int bopId; /** - * The index partition identifier against which the operator was - * executing. + * The index partition identifier against which the operator was executing. */ - final int partitionId; + final public int partitionId; /** * The identifier of the service on which the operator was executing. */ - final UUID serviceId; + final public UUID serviceId; /** * * The cause and <code>null</code> if the operator halted normally. */ - final Throwable cause; + final public Throwable cause; /** - * The operator identifier for the primary sink -or- <code>null</code> - * if there is no primary sink (for example, if this is the last - * operator in the pipeline). + * The operator identifier for the primary sink -or- <code>null</code> if + * there is no primary sink (for example, if this is the last operator in + * the pipeline). */ - final Integer sinkId; + final public Integer sinkId; /** - * The number of the {@link BindingSetChunk}s that were output for the - * primary sink. (This information is used for the atomic termination - * decision.) + * The number of the {@link IChunkMessage}s that were output for the primary + * sink. (This information is used for the atomic termination decision.) * <p> * For a given downstream operator this is ONE (1) for scale-up. For - * scale-out, this is one per index partition over which the - * intermediate results were mapped. + * scale-out, this is one per index partition over which the intermediate + * results were mapped. */ - final int sinkChunksOut; + final public int sinkChunksOut; /** - * The operator identifier for the alternative sink -or- - * <code>null</code> if there is no alternative sink. + * The operator identifier for the alternative sink -or- <code>null</code> + * if there is no alternative sink. */ - final Integer altSinkId; + final public Integer altSinkId; /** - * The number of the {@link BindingSetChunk}s that were output for the - * alternative sink. (This information is used for the atomic - * termination decision.) + * The number of the {@link IChunkMessage}s that were output for the + * alternative sink. (This information is used for the atomic termination + * decision.) * <p> * For a given downstream operator this is ONE (1) for scale-up. For - * scale-out, this is one per index partition over which the - * intermediate results were mapped. It is zero if there was no - * alternative sink for the operator. + * scale-out, this is one per index partition over which the intermediate + * results were mapped. It is zero if there was no alternative sink for the + * operator. */ - final int altSinkChunksOut; + final public int altSinkChunksOut; /** - * The statistics for the execution of the bop against the partition on - * the service. + * The statistics for the execution of the bop against the partition on the + * service. */ - final BOpStats taskStats; + final public BOpStats taskStats; /** * @param queryId @@ -88,19 +86,18 @@ * The operator whose execution phase has terminated for a * specific index partition and input chunk. * @param partitionId - * The index partition against which the operator was - * executed. + * The index partition against which the operator was executed. * @param serviceId * The node which executed the operator. * @param cause * <code>null</code> unless execution halted abnormally. * @param chunksOut - * A map reporting the #of binding set chunks which were - * output for each downstream operator for which at least one - * chunk of output was produced. + * A map reporting the #of binding set chunks which were output + * for each downstream operator for which at least one chunk of + * output was produced. * @param taskStats - * The statistics for the execution of that bop on that shard - * and service. + * The statistics for the execution of that bop on that shard and + * service. */ public HaltOpMessage( // @@ -110,17 +107,6 @@ final Integer altSinkId, final int altSinkChunksOut,// final BOpStats taskStats) { - if (altSinkId != null && sinkId == null) { - // The primary sink must be defined if the altSink is defined. - throw new IllegalArgumentException(); - } - - if (sinkId != null && altSinkId != null - && sinkId.intValue() == altSinkId.intValue()) { - // The primary and alternative sink may not be the same operator. - throw new IllegalArgumentException(); - } - this.queryId = queryId; this.bopId = bopId; this.partitionId = partitionId; @@ -132,4 +118,5 @@ this.altSinkChunksOut = altSinkChunksOut; this.taskStats = taskStats; } -} \ No newline at end of file + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -0,0 +1,96 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 13, 2010 + */ + +package com.bigdata.bop.engine; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.striterator.IChunkedIterator; + +/** + * API providing a variety of ways to access chunks of data (data are typically + * elements or binding sets). + * + * @todo Expose an {@link IChunkedIterator}, which handles both element at a + * time and chunk at a time. + * + * @todo Expose a mechanism to visit the direct {@link ByteBuffer} slices in + * which the data are stored. For an operator which executes on a GPU, we + * want to transfer the data from the direct {@link ByteBuffer} in which + * it was received into a direct {@link ByteBuffer} which is a slice onto + * its VRAM. (And obviously we need to do the reverse with the outputs of + * a GPU operator). + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IChunkAccessor<E> { + + /** + * Visit the binding sets in the chunk. + * + * @deprecated We do not need to use {@link IAsynchronousIterator} any more. + * This could be much more flexible and should be harmonized to + * support high volume operators, GPU operators, etc. probably + * the right thing to do is introduce another interface here + * with a getChunk():IChunk where IChunk let's you access the + * chunks data in different ways (and chunks can be both + * {@link IBindingSet}[]s and element[]s so we might need to + * raise that into the interfaces and/or generics as well). + * + * @todo It is likely that we can convert to the use of + * {@link BlockingQueue} instead of {@link BlockingBuffer} in the + * operators and then handle the logic for combining chunks inside of + * the {@link QueryEngine}. E.g., by scanning this list for chunks for + * the same bopId and combining them logically into a single chunk. + * <p> + * For scale-out, chunk combination will naturally occur when the node + * on which the operator will run requests the {@link ByteBuffer}s + * from the source nodes. Those will get wrapped up logically into a + * source for processing. For selective operators, those chunks can be + * combined before we execute the operator. For unselective operators, + * we are going to run over all the data anyway. + */ + IAsynchronousIterator<E[]> iterator(); + +// /** +// * Chunked iterator pattern. The iterator may be used for element at a time +// * processing, but the underlying iterator operators in chunks. The size of +// * the chunks depends originally on the data producer, but smaller chunks +// * may be automatically combined into larger chunks both during production +// * and when data are buffered, whether to get them off of the heap or to +// * transfer them among nodes. +// * +// * @return +// */ +// IChunkedIterator<E> chunkedIterator(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -57,35 +57,10 @@ * Discard the materialized data. */ void release(); - + /** - * Visit the binding sets in the chunk. - * - * @todo we do not need to use {@link IAsynchronousIterator} any more. This - * could be much more flexible and should be harmonized to support - * high volume operators, GPU operators, etc. probably the right thing - * to do is introduce another interface here with a getChunk():IChunk - * where IChunk let's you access the chunks data in different ways - * (and chunks can be both {@link IBindingSet}[]s and element[]s so we - * might need to raise that into the interfaces and/or generics as - * well). - * - * @todo It is likely that we can convert to the use of - * {@link BlockingQueue} instead of {@link BlockingBuffer} in the - * operators and then handle the logic for combining chunks inside of - * the {@link QueryEngine}. E.g., by scanning this list for chunks for - * the same bopId and combining them logically into a single chunk. - * <p> - * For scale-out, chunk combination will naturally occur when the node - * on which the operator will run requests the {@link ByteBuffer}s - * from the source nodes. Those will get wrapped up logically into a - * source for processing. For selective operators, those chunks can be - * combined before we execute the operator. For unselective operators, - * we are going to run over all the data anyway. - * - * @throws IllegalStateException - * if the payload is not materialized. + * Return an interface which may be used to access the chunk's data. */ - IAsynchronousIterator<E[]> iterator(); + IChunkAccessor<E> getChunkAccessor(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -2,21 +2,24 @@ import java.rmi.RemoteException; +import com.bigdata.bop.BindingSetPipelineOp; + /** * Interface for a client executing queries (the query controller). */ public interface IQueryClient extends IQueryPeer { -// /** -// * Return the query. -// * -// * @param queryId -// * The query identifier. -// * @return The query. -// * -// * @throws RemoteException -// */ -// public BOp getQuery(long queryId) throws RemoteException; + /** + * Return the query. + * + * @param queryId + * The query identifier. + * @return The query. + * + * @throws IllegalArgumentException + * if there is no such query. + */ + public BindingSetPipelineOp getQuery(long queryId) throws RemoteException; /** * Notify the client that execution has started for some query, operator, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -45,7 +45,6 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; import com.bigdata.bop.bset.Union; -import com.bigdata.bop.fed.FederatedQueryEngine; import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; @@ -54,7 +53,6 @@ import com.bigdata.rdf.spo.SPORelation; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.Program; @@ -413,7 +411,7 @@ /** * The currently executing queries. */ - final ConcurrentHashMap<Long/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<Long, RunningQuery>(); + final protected ConcurrentHashMap<Long/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<Long, RunningQuery>(); /** * A priority queue of {@link RunningQuery}s having binding set chunks @@ -513,7 +511,8 @@ */ private class QueryEngineTask implements Runnable { public void run() { - System.err.println("QueryEngine running: " + this); + if(log.isInfoEnabled()) + log.info("running: " + this); while (true) { try { final RunningQuery q = priorityQueue.take(); @@ -522,11 +521,12 @@ continue; final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll(); if (log.isTraceEnabled()) - log.trace("Accepted chunk: queryId=" + queryId - + ", bopId=" + chunk.getBOpId()); - // create task. + log.trace("Accepted chunk: " + chunk); try { + // create task. final FutureTask<?> ft = q.newChunkTask(chunk); + if (log.isDebugEnabled()) + log.debug("Running chunk: " + chunk); // execute task. localIndexManager.getExecutorService().execute(ft); } catch (RejectedExecutionException ex) { @@ -670,6 +670,9 @@ // remove from the set of running queries. runningQueries.remove(q.getQueryId(), q); + + if (log.isInfoEnabled()) + log.info("Removed entry for query: " + q.getQueryId()); } @@ -800,6 +803,17 @@ return runningQueries.get(queryId); } + + public BindingSetPipelineOp getQuery(final long queryId) { + + final RunningQuery q = getRunningQuery(queryId); + + if (q == null) + throw new IllegalArgumentException(); + + return q.getQuery(); + + } /** * Places the {@link RunningQuery} object into the internal map. @@ -827,30 +841,8 @@ final IQueryClient clientProxy, final BindingSetPipelineOp query) { return new RunningQuery(this, queryId, true/* controller */, - this/* clientProxy */, query, newQueryBuffer(query)); + this/* clientProxy */, query); } - /** - * Return a buffer onto which the solutions will be written. - * - * @todo This method is probably in the wrong place. We should use whatever - * is associated with the top-level {@link BOp} in the query and then - * rely on the NIO mechanisms to move the data around as necessary. - * - * @todo Could return a data structure which encapsulates the query results - * and could allow multiple results from a query, e.g., one per step - * in a program. - * - * @deprecated This is going away. - * - * @see FederatedQueryEngine#newQueryBuffer(BindingSetPipelineOp) - */ - protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( - final BindingSetPipelineOp query) { - - return query.newBuffer(); - - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-14 13:50:31 UTC (rev 3542) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-14 14:46:01 UTC (rev 3543) @@ -53,6 +53,7 @@ import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; +import com.bigdata.bop.PipelineOp; import com.bigdata.bop.bset.CopyBindingSetOp; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; @@ -72,6 +73,12 @@ .getLogger(RunningQuery.class); /** + * Logger for the {@link ChunkTask}. + */ + private final static Logger chunkTaskLog = Logger + .getLogger(ChunkTask.class); + + /** * The run state of the query and the result of the computation iff it * completes execution normally (without being interrupted, cancelled, etc). */ @@ -91,11 +98,6 @@ /** The unique identifier for this query. */ final private long queryId; -// /** -// * The timestamp when the query was accepted by this node (ms). -// */ -// final private long begin; - /** * The query deadline. The value is the system clock time in milliseconds * when the query is due and {@link Long#MAX_VALUE} if there is no deadline. @@ -105,12 +107,6 @@ final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); /** - * How long the query is allowed to run (elapsed milliseconds) -or- - * {@link Long#MAX_VALUE} if there is no deadline. - */ - final private long timeout; - - /** * <code>true</code> iff the outer {@link QueryEngine} is the controller for * this query. */ @@ -126,7 +122,7 @@ final private IQueryClient clientProxy; /** The query. */ - final private BOp query; + final private BindingSetPipelineOp query; /** * The buffer used for the overall output of the query pipeline. @@ -153,59 +149,20 @@ private final ConcurrentHashMap<BSBundle, Future<?>> operatorFutures = new ConcurrentHashMap<BSBundle, Future<?>>(); /** - * A lock guarding {@link #runningTaskCount}, {@link #availableChunkCount}, - * {@link #availableChunkCountMap}. + * A lock guarding {@link RunState#runningTaskCount}, + * {@link RunState#availableChunkCount}, + * {@link RunState#availableChunkCountMap}. This is <code>null</code> unless + * this is the query controller. + * + * @see RunState */ - private final ReentrantLock runStateLock = new ReentrantLock(); + private final ReentrantLock runStateLock; /** - * The #of tasks for this query which have started but not yet halted and - * ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. + * The run state of this query and <code>null</code> unless this is the + * query controller. */ - private long runningTaskCount = 0; - - /** - * The #of chunks for this query of which a running task has made available - * but which have not yet been accepted for processing by another task and - * ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private long availableChunkCount = 0; - - /** - * A map reporting the #of chunks available for each operator in the - * pipeline (we only report chunks for pipeline operators). The total #of - * chunks available for any given operator in the pipeline is reported by - * {@link #availableChunkCount}. - * <p> - * The movement of the intermediate binding set chunks forms an acyclic - * directed graph. This map is used to track the #of chunks available for - * each bop in the pipeline. When a bop has no more incoming chunks, we send - * an asynchronous message to all nodes on which that bop had executed - * informing the {@link QueryEngine} on that node that it should immediately - * release all resources associated with that bop. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); - - /** - * A collection reporting on the #of instances of a given {@link BOp} which - * are concurrently executing. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Map<Integer/*bopId*/, AtomicLong/*runningCount*/> runningCountMap = new LinkedHashMap<Integer, AtomicLong>(); - - /** - * A collection of the operators which have executed at least once. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Set<Integer/*bopId*/> startedSet = new LinkedHashSet<Integer>(); + final private RunState runState; /** * The chunks available for immediate processing (they must have been @@ -285,7 +242,7 @@ /** * Return the operator tree for this query. */ - public BOp getQuery() { + public BindingSetPipelineOp getQuery() { return query; } @@ -329,8 +286,8 @@ public RunningQuery(final QueryEngine queryEngine, final long queryId, // final long begin, final boolean controller, - final IQueryClient clientProxy, final BOp query, - final IBlockingBuffer<IBindingSet[]> queryBuffer) { + final IQueryClient clientProxy, final BindingSetPipelineOp query + ) { if (queryEngine == null) throw new IllegalArgumentException(); @@ -342,21 +299,41 @@ throw new IllegalArgumentException(); this.queryEngine = queryEngine; + this.queryId = queryId; -// this.begin = begin; + this.controller = controller; + this.clientProxy = clientProxy; + this.query = query; - this.queryBuffer = queryBuffer; + this.bopIndex = BOpUtility.getIndex(query); + this.statsMap = controller ? new ConcurrentHashMap<Integer, BOpStats>() : null; + + runStateLock = controller ? new ReentrantLock() : null; - this.timeout = query.getProperty(BOp.Annotations.TIMEOUT, - BOp.Annotations.DEFAULT_TIMEOUT); + runState = controller ? new RunState(this) : null; + + this.queryBuffer = newQueryBuffer(); + + } - if (timeout < 0) - throw new IllegalArgumentException(); + /** + * Return the buffer on which the solutions will be written (if any). This + * is based on the top-level operator in the query plan. + * + * @return The buffer for the solutions -or- <code>null</code> if the + * top-level operator in the query plan is a mutation operator. + */ + protected IBlockingBuffer<IBindingSet[]> newQueryBuffer() { + + if (query.isMutation()) + return null; + + return ((BindingSetPipelineOp) query).newBuffer(); } @@ -423,27 +400,88 @@ if (log.isDebugEnabled()) log.debug("queryId=" + queryId + ", chunksIn.size()=" - + chunksIn.size()); + + chunksIn.size() + ", msg=" + msg); } /** - * Invoked once by the query controller with the initial - * {@link BindingSetChunk} which gets the query moving. - * - * @todo this should reject multiple invocations for a given query instance. + * The run state for the query. */ - public void startQuery(final IChunkMessage<IBindingSet> chunk) { - if (!controller) - throw new UnsupportedOperationException(); - if (chunk == null) - throw new IllegalArgumentException(); - if (chunk.getQueryId() != queryId) // @todo equals() if queryId is UUID. - throw new IllegalArgumentException(); - final int bopId = chunk.getBOpId(); - runStateLock.lock(); - try { - lifeCycleSetUpQuery(); + static private class RunState { + + /** + * The query. + */ + private final RunningQuery query; + + /** + * The query identifier. + */ + private final long queryId; + + /** + * The #of tasks for this query which have started but not yet halted + * and ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long runningTaskCount = 0; + + /** + * The #of chunks for this query of which a running task has made + * available but which have not yet been accepted for processing by + * another task and ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long availableChunkCount = 0; + + /** + * A map reporting the #of chunks available for each operator in the + * pipeline (we only report chunks for pipeline operators). The total + * #of chunks available across all operators in the pipeline is reported + * by {@link #availableChunkCount}. + * <p> + * The movement of the intermediate binding set chunks forms an acyclic + * directed graph. This map is used to track the #of chunks available + * for each bop in the pipeline. When a bop has no more incoming chunks, + * we send an asynchronous message to all nodes on which that bop had + * executed informing the {@link QueryEngine} on that node that it + * should immediately release all resources associated with that bop. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection reporting on the #of instances of a given {@link BOp} + * which are concurrently executing. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection of the operators which have executed at least once. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>(); + + public RunState(final RunningQuery query) { + + this.query = query; + + this.queryId = query.queryId; + + } + + public void startQuery(final IChunkMessage<?> msg) { + + query.lifeCycleSetUpQuery(); + + final Integer bopId = Integer.valueOf(msg.getBOpId()); + availableChunkCount++; { AtomicLong n = availableChunkCountMap.get(bopId); @@ -451,114 +489,81 @@ availableChunkCountMap.put(bopId, n = new AtomicLong()); n.incrementAndGet(); } + if (log.isInfoEnabled()) log.info("queryId=" + queryId + ",runningTaskCount=" + runningTaskCount + ",availableChunks=" + availableChunkCount); + System.err.println("startQ : bopId=" + bopId + ",running=" + runningTaskCount + ",available=" + availableChunkCount); - queryEngine.acceptChunk(chunk); - } finally { - runStateLock.unlock(); + } - } - /** - * Message provides notice that the operator has started execution and will - * consume some specific number of binding set chunks. - * - * @param bopId - * The identifier of the operator. - * @param partitionId - * The index partition identifier against which the operator is - * executing. - * @param serviceId - * The identifier of the service on which the operator is - * executing. - * @param fanIn - * The #of chunks that will be consumed by the operator - * execution. - * - * @throws UnsupportedOperationException - * If this node is not the query coordinator. - */ - public void startOp(final StartOpMessage msg) { - if (!controller) - throw new UnsupportedOperationException(); - final Integer bopId = Integer.valueOf(msg.bopId); - runStateLock.lock(); - try { + public void startOp(final StartOpMessage msg) { + + final Integer bopId = Integer.valueOf(msg.bopId); + runningTaskCount++; { AtomicLong n = runningCountMap.get(bopId); if (n == null) runningCountMap.put(bopId, n = new AtomicLong()); n.incrementAndGet(); - if(startedSet.add(bopId)) { + if (startedSet.add(bopId)) { // first evaluation pass for this operator. - lifeCycleSetUpOperator(msg.bopId); + query.lifeCycleSetUpOperator(bopId); } } + availableChunkCount -= msg.nchunks; + { AtomicLong n = availableChunkCountMap.get(bopId); if (n == null) throw new AssertionError(); n.addAndGet(-msg.nchunks); } - System.err.println("startOp: bopId=" + msg.bopId + ",running=" + + System.err.println("startOp: bopId=" + bopId + ",running=" + runningTaskCount + ",available=" + availableChunkCount + ",fanIn=" + msg.nchunks); - if (deadline.get() < System.currentTimeMillis()) { + + // check deadline. + if (query.deadline.get() < System.currentTimeMillis()) { + if (log.isTraceEnabled()) - log.trace("queryId: deadline expired."); - future.halt(new TimeoutException()); - cancel(true/* mayInterruptIfRunning */); + log.trace("expired: queryId=" + queryId + ", deadline=" + + query.deadline); + + query.future.halt(new TimeoutException()); + + query.cancel(true/* mayInterruptIfRunning */); + } - } finally { - runStateLock.unlock(); + } - } - /** - * Message provides notice that the operator has ended execution. The - * termination conditions for the query are checked. (For scale-out, the - * node node controlling the query needs to be involved for each operator - * start/stop in order to make the termination decision atomic). - * - * @throws UnsupportedOperationException - * If this node is not the query coordinator. - */ - public void haltOp(final HaltOpMessage msg) { - if (!controller) - throw new UnsupportedOperationException(); - runStateLock.lock(); - try { - // update per-operator statistics. - { - final BOpStats stats = statsMap.get(msg.bopId); - if (stats == null) { - statsMap.put(msg.bopId, msg.taskStats); - } else { - stats.add(msg.taskStats); - } - } - /* - * Update termination criteria counters. - */ + /** + * Update termination criteria counters. + */ + public void haltOp(final HaltOpMessage msg) { + // chunks generated by this task. final int fanOut = msg.sinkChunksOut + msg.altSinkChunksOut; availableChunkCount += fanOut; if (msg.sinkId != null) { AtomicLong n = availableChunkCountMap.get(msg.sinkId); if (n == null) - availableChunkCountMap.put(msg.sinkId, n = new AtomicLong()); + availableChunkCountMap + .put(msg.sinkId, n = new AtomicLong()); n.addAndGet(msg.sinkChunksOut); } if (msg.altSinkId != null) { AtomicLong n = availableChunkCountMap.get(msg.altSinkId); if (n == null) - availableChunkCountMap.put(msg.altSinkId, n = new AtomicLong()); + availableChunkCountMap.put(msg.altSinkId, + n = new AtomicLong()); n.addAndGet(msg.altSinkChunksOut); } // one less task is running. @@ -575,7 +580,7 @@ * No more chunks can appear for this operator so invoke its end * of life cycle hook. */ - lifeCycleTearDownOperator(msg.bopId); + query.lifeCycleTearDownOperator(msg.bopId); } System.err.println("haltOp : bopId=" + msg.bopId + ",running=" + runningTaskCount + ",available=" + availableChunkCount @@ -584,61 +589,156 @@ + runningTaskCount; assert availableChunkCount >= 0 : "availableChunkCount=" + availableChunkCount; -// final long elapsed = System.currentTimeMillis() - begin; if (log.isTraceEnabled()) - log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId - + ",serviceId=" + queryEngine.getServiceUUID() - + ", nchunks=" + fanOut + " : runningTaskCount=" - + runningTaskCount + ", availableChunkCount=" - + availableChunkCount);// + ", elapsed=" + elapsed); + log.trace("bopId=" + msg.bopId + ",partitionId=" + + msg.partitionId + ",serviceId=" + + query.queryEngine.getServiceUUID() + ", nchunks=" + + fanOut + " : runningTaskCount=" + runningTaskCount + + ", availableChunkCount=" + availableChunkCount); // test termination criteria if (msg.cause != null) { // operator failed on this chunk. log.error("Error: Canceling query: queryId=" + queryId + ",bopId=" + msg.bopId + ",partitionId=" + msg.partitionId, msg.cause); - future.halt(msg.cause); - cancel(true/* mayInterruptIfRunning */); + query.future.halt(msg.cause); + query.cancel(true/* mayInterruptIfRunning */); } else if (runningTaskCount == 0 && availableChunkCount == 0) { // success (all done). - future.halt(getStats()); - cancel(true/* mayInterruptIfRunning */); - } else if (deadline.get() < System.currentTimeMillis()) { if (log.isTraceEnabled()) - log.trace("queryId: deadline expired."); - future.halt(new TimeoutException()); - cancel(true/* mayInterruptIfRunning */); + log.trace("success: queryId=" + queryId); + query.future.halt(query.getStats()); + query.cancel(true/* mayInterruptIfRunning */); + } else if (query.deadline.get() < System.currentTimeMillis()) { + if (log.isTraceEnabled()) + log.trace("expired: queryId=" + queryId + ", deadline=" + + query.deadline); + query.future.halt(new TimeoutException()); + query.cancel(true/* mayInterruptIfRunning */); } + } + + /** + * Return <code>true</code> the specified operator can no longer be + * triggered by the query. The specific criteria are that no operators + * which are descendants of the specified operator are running or have + * chunks available against which they could run. Under those conditions + * it is not possible for a chunk to show up which would cause the + * operator to be executed. + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> if the operator can not be triggered given + * the current query activity. + * + * @throws IllegalMonitorStateException + * unless the {@link #runStateLock} is held by the caller. + */ + protected boolean isOperatorDone(final int bopId) { + + return PipelineUtility.isDone(bopId, query.getQuery(), + query.bopIndex, runningCountMap, availableChunkCountMap); + + } + + } // class RunState + + /** + * Invoked once by the query controller with the initial + * {@link BindingSetChunk} which gets the query moving. + * + * @todo this should reject multiple invocations for a given query instance. + */ + public void startQuery(final IChunkMessage<IBindingSet> msg) { + + if (!controller) + throw new UnsupportedOperationException(); + + if (msg == null) + throw new IllegalArgumentException(); + + if (msg.getQueryId() != queryId) // @todo equals() if queryId is UUID. + throw new IllegalArgumentException(); + + runStateLock.lock(); + + try { + + runState.startQuery(msg); + + queryEngine.acceptChunk(msg); + } finally { + runStateLock.unlock(); + } + } /** - * Return <code>true</code> the specified operator can no longer be - * triggered by the query. The specific criteria are that no operators which - * are descendants of the specified operator are running or have chunks - * available against which they could run. Under those conditions it is not - * possible for a chunk to show up which would cause the operator to be - * executed. + * Message provides notice that the operator has started execution and will + * consume some specific number of binding set chunks. * - * @param bopId - * Some operator identifier. + * @param msg The {@link StartOpMessage}. * - * @return <code>true</code> if the operator can not be triggered given the - * current query activity. + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + public void startOp(final StartOpMessage msg) { + + if (!controller) + throw new UnsupportedOperationException(); + + runStateLock.lock(); + + try { + + runState.startOp(msg); + + } finally { + + runStateLock.unlock(); + + } + + } + + /** + * Message provides notice that the operator has ended execution. The + * termination conditions for the query are checked. (For scale-out, the + * node node controlling the query needs to be involved for each operator + * start/stop in order to make the termination decision atomic). * - * @throws IllegalMonitorStateException - * unless the {@link #runStateLock} is held by the caller. + * @param msg The {@link HaltOpMessage} + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. */ - protected boolean isOperatorDone(final int bopId) { + public void haltOp(final HaltOpMessage msg) { + + if (!controller) + throw new UnsupportedOperationException(); - if (!runStateLock.isHeldByCurrentThread()) - throw new IllegalMonitorStateException(); + // update per-operator statistics. + final BOpStats tmp = statsMap.putIfAbsent(msg.bopId, msg.taskStats); - return PipelineUtility.isDone(bopId, query, bopIndex, runningCountMap, - availableChunkCountMap); + if (tmp != null) + tmp.add(msg.taskStats); + runStateLock.lock(); + + try { + + runState.haltOp(msg); + + } finally { + + runStateLock.unlock(); + + } + } /** @@ -703,100 +803,222 @@ * A chunk to be consumed. */ @SuppressWarnings("unchecked") - protected FutureTask<Void> newChunkTask(final IChunkMessage<IBindingSet> chunk) { - /* - * Look up the BOp in the index, create the BOpContext for that BOp, and - * return the value returned by BOp.eval(context). - */ - final int bopId = chunk.getBOpId(); - final int partitionId = chunk.getPartitionId(); - final BOp bop = bopIndex.get(bopId); - if (bop == null) { - throw new NoSuchBOpException(bopId); - } - if (!(bop instanceof BindingSetPipelineOp)) { - /* - * @todo evaluation of element[] pipelines needs to use pretty much - * the same code, but it needs to be typed for E[] rather than - * IBindingSet[]. - * - * @todo evaluation of Monet style BATs would also operate under - * different assumptions, closer to those of an element[]. - */ - throw new UnsupportedOperationException(bop.getClass().getName()); - } - // self - final BindingSetPipelineOp op = ((BindingSetPipelineOp) bop); - // parent (null if this is the root of the operator tree). - final BOp p = BOpUtility.getParent(query, op); - // sink (null unless parent is defined) - final Integer sinkId = p == null ? null : (Integer) p - .getProperty(BindingSetPipelineOp.Annotations.BOP_ID); - final IBlockingBuffer<IBindingSet[]> sink = (p == null ? queryBuffer - : op.newBuffer()); - // altSink (null when not specified). - final Integer altSinkId = (Integer) op - .getProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF); - if (altSinkId != null && !bopIndex.containsKey(altSinkId)) { - throw new NoSuchBOpException(altSinkId); - } - final IBlockingBuffer<IBindingSet[]> altSink = altSinkId == null ? null - : op.newBuffer(); - // context - final BOpContext context = new BOpContext(this, partitionId, op - .newStats(), chunk.iterator(), sink, altSink); - // FutureTask for operator execution (not running yet). - final FutureTask<Void> f = op.eval(context); - // Hook the FutureTask. - final Runnable r = new Runnable() { - public void run() { - final UUID serviceId = queryEngine.getServiceUUID(); - int fanIn = 1; - int sinkChunksOut = 0; - int altSinkChunksOut = 0; - try { - clientProxy.startOp(new StartOpMessage(queryId, - bopId, partitionId, serviceId, fanIn)); - if (log.isDebugEnabled()) - log.debug("Running chunk: queryId=" + queryId - + ", bopId=" + bopId + ", bop=" + bop); - f.run(); // run - f.get(); // verify success - if (sink != queryBuffer && !sink.isEmpty()) { - // handle output chunk. - sinkChunksOut += handleOutputChunk(sinkId, sink); - } - if (altSink != queryBuffer && altSink != null - && !altSink.isEmpty()) { - // handle alt sink output chunk. - altSinkChunksOut += handleOutputChunk(altSinkId, altSink); - } - clientProxy.haltOp(new HaltOpMessage(queryId, bopId, - partitionId, serviceId, null/* cause */, - sinkId, sinkChunksOut, altSinkId, - altSinkChunksOut, context.getStats())); - } catch (Throwable t) { - try { - clientProxy.haltOp(new HaltOpMessage(queryId, - bopId, partitionId, serviceId, - t/* cause */, sinkId, sinkChunksOut, altSinkId, - altSinkChunksOut, context.getStats())); - } catch (RemoteException e) { - cancel(true/* mayInterruptIfRunning */); - log.error("queryId=" + queryId, e); - } - } - } - }; + protected FutureTask<Void> newChunkTask( + final IChunkMessage<IBindingSet> chunk) { + + // create runnable to evaluate a chunk for an operator and partition. + final Runnable r = new ChunkTask(chunk); + // wrap runnable. final FutureTask<Void> f2 = new FutureTask(r, null/* result */); + // add to list of active futures for this query. - operatorFutures.put(new BSBundle(bopId, partitionId), f2); + operatorFutures.put(new BSBundle(chunk.getBOpId(), chunk + .getPartitionId()), f2); + // return : caller will execute. return f2; + } /** + * Runnable evaluates an operator for some chunk of inputs. In scale-out, + * the operator may be evaluated against some partition of a scale-out + * index. + */ + private class ChunkTask implements Runnable { + + /** Alias for the {@link ChunkTask}'s logger. */ + private final Logger log = chunkTaskLog; + + /** The index of the bop which is being evaluated. */ + private final int bopId; + + /** + * The index partition against which the operator is being evaluated and + * <code>-1</code> if the operator is not being evaluated against a + * shard. + */ + private final int partitionId; + + /** The operator which is being evaluated. */ + private final BOp bop; + + /** + * The index of the operator which is the default sink for outputs + * generated by this evaluation. This is the + * {@link BOp.Annotations#BOP_ID} of the parent of this operator. This + * will be <code>null</code> if the operator does not have a parent and + * is not a query since no outputs will be generated in that case. + */ + private final Integer sinkId; + + /** + * The index of the operator which is the alternative sink for outputs + * generated by this evaluation. This is <code>null</code> unless the + * operator explicitly specifies an alternative sink using + * {@link BindingSetPipelineOp.Annotations#ALT_SINK_REF}. + */ + private final Integer altSinkId; + + /** + * The sink on which outputs destined for the {@link #sinkId} operator + * will be written and <code>null</code> if {@link #sinkId} is + * <code>null</code>. + */ + private final IBlockingBuffer<IBindingSet[]> sink; + + /** + * The sink on which outputs destined for the {@link #altSinkId} + * operator will be written and <code>null</code> if {@link #altSinkId} + * is <code>null</code>. + */ + private final IBlockingBuffer<IBindingSet[]> altSink; + + /** + * The evaluation context for this operator. + */ + private final BOpContext<IBindingSet> context; + + /** + * {@link FutureTask} which evaluates the operator (evaluation is + * delegated to this {@link FutureTask}). + */ + private final FutureTask<Void> ft; + + /** + * Create a task to consume a chunk. This looks up the {@link BOp} which + * is the target for the message in the {@link RunningQuery#bopIndex}, + * creates the sink(s) for the {@link BOp}, creates the + * {@link BOpContext} for that {@link BOp}, and wraps the value returned + * by {@link PipelineOp#eval(BOpContext)} in order to handle the outputs + * written on those sinks. + * + * @param chunk + * A message containing the materialized chunk and metadata + * about the operator which will consume that chunk. + */ + public ChunkTask(final IChunkMessage<IBindingSet> chunk) { + bopId = chunk.getBOpId(); + partitionId = chunk.getPartitionId(); + bop = bopIndex.get(bopId); + if (bop == null) { + throw new NoSuchBOpException(bopId); + } + if (!(bop instanceof BindingSetPipelineOp)) { + /* + * @todo evaluation of element[] pipelines needs to use pretty + * much the same code, but it needs to be typed for E[] rather + * than IBindingSet[]. + * + * @todo evaluation of Monet style BATs would also operate under + * different assumptions, closer to those of an element[]. + */ + throw new UnsupportedOperationException(bop.getClass() + .getName()); + } + + // self + final BindingSetPipelineOp op = ((BindingSetPipelineOp) bop); + + // parent (null if this is the root of the operator tree). + final BOp p = BOpUtility.getParent(query, op); + + // sink (null unless parent is defined) + sinkId = p == null ? null : (Integer) p + .getProperty(BindingSetPipelineOp.Annotations.BOP_ID); + + // altSink (null when not specified). + altSinkId = (Integer) op + .getProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF); + + if (altSinkId != null && !bopIndex.containsKey(altSinkId)) + throw new NoSuchBOpException(altSinkId); + + if (altSinkId != null && sinkId == null) { + throw new RuntimeException( + "The primary sink must be defined if the altSink is defined: " + + bop); + } + + if (sinkId != null && altSinkId != null + && sinkId.intValue() == altSinkId.intValue()) { + throw new RuntimeException( + "The primary and alternative sink may not be the same operator: " + + bop); + } + + sink = (p == null ? queryBuffer : op.newBuffer()); + + altSink = altSinkId == null ? null : op.newBuffer(); + + // context + context = new BOpContext<IBindingSet>(RunningQuery.this, + partitionId, op.newStats(), chunk.getChunkAccessor() + .iterator(), sink, altSink); + + // FutureTask for operator execution (not running yet). + ft = op.eval(context); + + } + + /** + * Evaluate the {@link IChunkMessage}. + */ + public void run() { + final UUID serviceId = queryEngine.getServiceUUID(); + int fanIn = 1; + int sinkChunksOut = 0; + int altSinkChunksOut = 0; + try { + clientProxy.startOp(new StartOpMessage(queryId, + bopId, partitionId, serviceId, fanIn)); + if (log.isDebugEnabled()) + log.debug("Running chunk: queryId=" + queryId + ", bopId=" + + bopId + ", bop=" + bop); + ft.run(); // run + ft.get(); // verify success + if (sink != null && sink != queryBuffer && !sink.isEmpty()) { + /* + * Handle sink output, sending appropriate chunk + * message(s). + * + * Note: This maps output over shards/nodes in s/o. + ... [truncated message content] |