From: <tho...@us...> - 2010-09-15 15:55:04
|
Revision: 3557 http://bigdata.svn.sourceforge.net/bigdata/?rev=3557&view=rev Author: thompsonbry Date: 2010-09-15 15:54:52 +0000 (Wed, 15 Sep 2010) Log Message: ----------- Changed queryId from 'long' to UUID, which is what the existing scale-out query code is using. Moved the 2DS distributed query test suite into the bigdata-jini module since it has a dependency on JiniClient. Extracted RunState from RunningQuery into its own class. Provided a logger for a table view of the RunState of a query as it evolves. Added some stress tests for concurrent query. Currently working through a concurrency issue in com.bigdata.bop.queryEngine.RunState. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/QueryContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ServiceContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ShardContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/ branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/ branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/ branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -18,7 +18,7 @@ private static final long serialVersionUID = 1L; /** The identifier of the query. */ - final public long queryId; + final public UUID queryId; /** The identifier of the operator. */ final public int bopId; @@ -101,7 +101,7 @@ */ public HaltOpMessage( // - final long queryId, final int bopId, final int partitionId, + final UUID queryId, final int bopId, final int partitionId, final UUID serviceId, Throwable cause, // final Integer sinkId, final int sinkChunksOut,// final Integer altSinkId, final int altSinkChunksOut,// Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -1,13 +1,10 @@ package com.bigdata.bop.engine; -import java.nio.ByteBuffer; -import java.util.concurrent.BlockingQueue; +import java.util.UUID; import com.bigdata.bop.BOp; -import com.bigdata.bop.IBindingSet; import com.bigdata.bop.fed.FederatedRunningQuery; -import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.btree.raba.IRaba; import com.bigdata.service.ResourceService; /** @@ -32,7 +29,7 @@ IQueryClient getQueryController(); /** The query identifier. */ - long getQueryId(); + UUID getQueryId(); /** The identifier for the target {@link BOp}. */ int getBOpId(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -1,6 +1,7 @@ package com.bigdata.bop.engine; import java.rmi.RemoteException; +import java.util.UUID; import com.bigdata.bop.BindingSetPipelineOp; @@ -19,7 +20,7 @@ * @throws IllegalArgumentException * if there is no such query. */ - BindingSetPipelineOp getQuery(long queryId) throws RemoteException; + BindingSetPipelineOp getQuery(UUID queryId) throws RemoteException; /** * Notify the client that execution has started for some query, operator, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryDecl.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -1,5 +1,7 @@ package com.bigdata.bop.engine; +import java.util.UUID; + import com.bigdata.bop.BindingSetPipelineOp; /** @@ -15,7 +17,7 @@ /** * The query identifier. */ - long getQueryId(); + UUID getQueryId(); /** * The query. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -55,24 +55,6 @@ */ IIndexManager getIndexManager(); -// /** -// * The timestamp or transaction identifier against which the query is -// * reading. -// * -// * @deprecated move into the individual operator. See -// * {@link BOp.Annotations#TIMESTAMP} -// */ -// long getReadTimestamp(); -// -// /** -// * The timestamp or transaction identifier against which the query is -// * writing. -// * -// * @deprecated moved into the individual operator. See -// * {@link BOp.Annotations#TIMESTAMP} -// */ -// long getWriteTimestamp(); - /** * Terminate query evaluation */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/LocalChunkMessage.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -1,6 +1,7 @@ package com.bigdata.bop.engine; import java.io.Serializable; +import java.util.UUID; import com.bigdata.bop.BOp; import com.bigdata.bop.fed.FederatedRunningQuery; @@ -21,7 +22,7 @@ /** * The query identifier. */ - private final long queryId; + private final UUID queryId; /** * The target {@link BOp}. @@ -42,7 +43,7 @@ return queryController; } - public long getQueryId() { + public UUID getQueryId() { return queryId; } @@ -59,12 +60,15 @@ } public LocalChunkMessage(final IQueryClient queryController, - final long queryId, final int bopId, final int partitionId, + final UUID queryId, final int bopId, final int partitionId, final IAsynchronousIterator<E[]> source) { if (queryController == null) throw new IllegalArgumentException(); + if (queryId == null) + throw new IllegalArgumentException(); + if (source == null) throw new IllegalArgumentException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryDecl.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -28,6 +28,7 @@ package com.bigdata.bop.engine; import java.io.Serializable; +import java.util.UUID; import com.bigdata.bop.BindingSetPipelineOp; @@ -44,18 +45,21 @@ */ private static final long serialVersionUID = 1L; - private final long queryId; + private final UUID queryId; private final IQueryClient clientProxy; private final BindingSetPipelineOp query; - public QueryDecl(final IQueryClient clientProxy, final long queryId, + public QueryDecl(final IQueryClient clientProxy, final UUID queryId, final BindingSetPipelineOp query) { if (clientProxy == null) throw new IllegalArgumentException(); + if (queryId == null) + throw new IllegalArgumentException(); + if (query == null) throw new IllegalArgumentException(); @@ -75,7 +79,7 @@ return clientProxy; } - public long getQueryId() { + public UUID getQueryId() { return queryId; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -28,10 +28,13 @@ package com.bigdata.bop.engine; import java.rmi.RemoteException; +import java.util.Comparator; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -412,13 +415,23 @@ /** * The currently executing queries. */ - final protected ConcurrentHashMap<Long/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<Long, RunningQuery>(); + final protected ConcurrentHashMap<UUID/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<UUID, RunningQuery>(); /** - * A priority queue of {@link RunningQuery}s having binding set chunks - * available for consumption. + * A queue of {@link RunningQuery}s having binding set chunks available for + * consumption. + * + * @todo Be careful when testing out a {@link PriorityBlockingQueue} here. + * First, that collection is intrinsically bounded (it is backed by an + * array) so it will BLOCK under heavy load and could be expected to + * have some resize costs if the queue size becomes too large. Second, + * either {@link RunningQuery} needs to implement an appropriate + * {@link Comparator} or we need to pass one into the constructor for + * the queue. */ - final private PriorityBlockingQueue<RunningQuery> priorityQueue = new PriorityBlockingQueue<RunningQuery>(); + final private BlockingQueue<RunningQuery> priorityQueue = new LinkedBlockingQueue<RunningQuery>(); +// final private BlockingQueue<RunningQuery> priorityQueue = new PriorityBlockingQueue<RunningQuery>( +// ); /** * @@ -480,7 +493,10 @@ * if the query engine is shutting down. */ protected void assertRunning() { - + + if (engineFuture.get() == null) + throw new IllegalStateException("Not initialized."); + if (shutdown) throw new IllegalStateException("Shutting down."); @@ -517,7 +533,7 @@ while (true) { try { final RunningQuery q = priorityQueue.take(); - final long queryId = q.getQueryId(); + final UUID queryId = q.getQueryId(); if (q.isCancelled()) continue; final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll(); @@ -553,7 +569,7 @@ * chunk will be attached to the query and the query will be scheduled for * execution. * - * @param chunk + * @param msg * A chunk of intermediate results. * * @throws IllegalArgumentException @@ -561,25 +577,27 @@ * @throws IllegalStateException * if the chunk is not materialized. */ - void acceptChunk(final IChunkMessage<IBindingSet> chunk) { + protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { - if (chunk == null) + if (msg == null) throw new IllegalArgumentException(); - if (!chunk.isMaterialized()) + if (!msg.isMaterialized()) throw new IllegalStateException(); - final RunningQuery q = runningQueries.get(chunk.getQueryId()); + final RunningQuery q = runningQueries.get(msg.getQueryId()); if(q == null) throw new IllegalStateException(); // add chunk to the query's input queue on this node. - q.acceptChunk(chunk); + q.acceptChunk(msg); + + assertRunning(); // add query to the engine's task queue. priorityQueue.add(q); - + } /** @@ -697,20 +715,6 @@ * IQueryClient */ -// public BOp getQuery(final long queryId) throws RemoteException { -// -// final RunningQuery q = runningQueries.get(queryId); -// -// if (q != null) { -// -// return q.getQuery(); -// -// } -// -// return null; -// -// } - public void startOp(final StartOpMessage msg) throws RemoteException { final RunningQuery q = runningQueries.get(msg.queryId); @@ -770,17 +774,20 @@ * needs to talk to a federation. There should be nothing DS * specific about the {@link FederatedQueryEngine}. */ - public RunningQuery eval(final long queryId, + public RunningQuery eval(final UUID queryId, final BindingSetPipelineOp query, final IChunkMessage<IBindingSet> msg) throws Exception { + if (queryId == null) + throw new IllegalArgumentException(); + if (query == null) throw new IllegalArgumentException(); if (msg == null) throw new IllegalArgumentException(); - if (queryId != msg.getQueryId()) // @todo use equals() to compare UUIDs. + if (!queryId.equals(msg.getQueryId())) throw new IllegalArgumentException(); final RunningQuery runningQuery = newRunningQuery(this, queryId, @@ -813,6 +820,8 @@ runningQuery.startQuery(msg); + acceptChunk(msg); + return runningQuery; } @@ -826,13 +835,13 @@ * @return The {@link RunningQuery} -or- <code>null</code> if there is no * query associated with that query identifier. */ - protected RunningQuery getRunningQuery(final long queryId) { + protected RunningQuery getRunningQuery(final UUID queryId) { return runningQueries.get(queryId); } - public BindingSetPipelineOp getQuery(final long queryId) { + public BindingSetPipelineOp getQuery(final UUID queryId) { final RunningQuery q = getRunningQuery(queryId); @@ -851,9 +860,12 @@ * @param runningQuery * The {@link RunningQuery}. */ - protected void putRunningQuery(final long queryId, + protected void putRunningQuery(final UUID queryId, final RunningQuery runningQuery) { + if (queryId == null) + throw new IllegalArgumentException(); + if (runningQuery == null) throw new IllegalArgumentException(); @@ -865,7 +877,7 @@ * Factory for {@link RunningQuery}s. */ protected RunningQuery newRunningQuery(final QueryEngine queryEngine, - final long queryId, final boolean controller, + final UUID queryId, final boolean controller, final IQueryClient clientProxy, final BindingSetPipelineOp query) { return new RunningQuery(this, queryId, true/* controller */, Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -0,0 +1,543 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 15, 2010 + */ + +package com.bigdata.bop.engine; + +import java.rmi.RemoteException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; + +/** + * The run state for a {@link RunningQuery}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +class RunState { + + static private final Logger log = Logger.getLogger(RunState.class); + + /** + * Inner class provides a 2nd logger used for tabular representations. + */ + static private class TableLog { + + static private final Logger tableLog = Logger.getLogger(TableLog.class); + + } + + /** + * The query. + */ + private final RunningQuery query; + + /** + * The query identifier. + */ + private final UUID queryId; + + /** + * The #of run state transitions which have occurred for this query. + */ + private long nsteps = 0; + + /** + * The #of tasks for this query which have started but not yet halted and + * ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long totalRunningTaskCount = 0; + + /** + * The #of chunks for this query of which a running task has made available + * but which have not yet been accepted for processing by another task and + * ZERO (0) if this is not the query coordinator. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private long totalAvailableChunkCount = 0; + + /** + * A map reporting the #of chunks available for each operator in the + * pipeline (we only report chunks for pipeline operators). The total #of + * chunks available across all operators in the pipeline is reported by + * {@link #totalAvailableChunkCount}. + * <p> + * The movement of the intermediate binding set chunks forms an acyclic + * directed graph. This map is used to track the #of chunks available for + * each bop in the pipeline. When a bop has no more incoming chunks, we send + * an asynchronous message to all nodes on which that bop had executed + * informing the {@link QueryEngine} on that node that it should immediately + * release all resources associated with that bop. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection reporting on the #of instances of a given {@link BOp} which + * are concurrently executing. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningTaskCountMap = new LinkedHashMap<Integer, AtomicLong>(); + + /** + * A collection of the operators which have executed at least once. + * <p> + * This is guarded by the {@link #runningStateLock}. + */ + private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>(); + + public RunState(final RunningQuery query) { + + this.query = query; + + this.queryId = query.getQueryId(); + + // this.nops = query.bopIndex.size(); + + } + + public void startQuery(final IChunkMessage<?> msg) { + + nsteps++; + + // query.lifeCycleSetUpQuery(); + + final Integer bopId = Integer.valueOf(msg.getBOpId()); + + totalAvailableChunkCount++; + + assert totalAvailableChunkCount == 1 : "totalAvailableChunkCount=" + + totalAvailableChunkCount + " :: msg=" + msg; + + { + + AtomicLong n = availableChunkCountMap.get(bopId); + + if (n == null) + availableChunkCountMap.put(bopId, n = new AtomicLong()); + + final long tmp = n.incrementAndGet(); + + assert tmp == 1 : "availableChunkCount=" + tmp + " for bopId=" + + msg.getBOpId() + " :: msg=" + msg; + + } + + if (log.isInfoEnabled()) + log.info("queryId=" + queryId + ",totalRunningTaskCount=" + + totalRunningTaskCount + ",totalAvailableChunkCount=" + + totalAvailableChunkCount); + + if (TableLog.tableLog.isInfoEnabled()) { + /* + * Note: RunState is only used by the query controller so this will + * not do an RMI and the RemoteException will not be thrown. + */ + final UUID serviceId; + try { + serviceId = msg.getQueryController().getServiceUUID(); + } catch (RemoteException ex) { + throw new AssertionError(ex); + } + TableLog.tableLog.info("\n\nqueryId=" + queryId + "\n"); + // TableLog.tableLog.info(query.getQuery().toString()+"\n"); + TableLog.tableLog.info(getTableHeader()); + TableLog.tableLog.info(getTableRow("startQ", serviceId, + -1/* shardId */, 1/* fanIn */)); + } + + System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId + + ",totalRunningTaskCount=" + totalRunningTaskCount + + ",totalAvailableTaskCount=" + totalAvailableChunkCount); + + } + + /** + * @return <code>true</code> if this is the first time we will evaluate the + * op. + */ + public boolean startOp(final StartOpMessage msg) { + + nsteps++; + + if (log.isTraceEnabled()) + log.trace(msg.toString()); + + final Integer bopId = Integer.valueOf(msg.bopId); + + totalRunningTaskCount++; + + assert totalRunningTaskCount >= 1 : "runningTaskCount=" + + totalRunningTaskCount + " :: msg=" + msg; + final boolean firstTime; + { + + AtomicLong n = runningTaskCountMap.get(bopId); + + if (n == null) + runningTaskCountMap.put(bopId, n = new AtomicLong()); + + final long tmp = n.incrementAndGet(); + + assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId=" + + msg.bopId + " :: msg=" + msg; + + firstTime = startedSet.add(bopId); + // + // // first evaluation pass for this operator. + // query.lifeCycleSetUpOperator(bopId); + // + // } + + } + + totalAvailableChunkCount -= msg.nchunks; + + assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" + + totalAvailableChunkCount + " :: msg=" + msg; + + { + + AtomicLong n = availableChunkCountMap.get(bopId); + + if (n == null) + throw new AssertionError(); + + final long tmp = n.addAndGet(-msg.nchunks); + + assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.bopId + " :: msg=" + msg; + + } + + System.err.println("startOp: nstep="+nsteps+", bopId=" + bopId + + ",totalRunningTaskCount=" + totalRunningTaskCount + + ",totalAvailableChunkCount=" + totalAvailableChunkCount + + ",fanIn=" + msg.nchunks); + + if (TableLog.tableLog.isInfoEnabled()) { + TableLog.tableLog.info(getTableRow("startOp", msg.serviceId, + msg.partitionId, msg.nchunks/* fanIn */)); + } + + // check deadline. + final long deadline = query.getDeadline(); + if (deadline < System.currentTimeMillis()) { + + if (log.isTraceEnabled()) + log.trace("expired: queryId=" + queryId + ", deadline=" + + deadline); + + query.future.halt(new TimeoutException()); + + query.cancel(true/* mayInterruptIfRunning */); + + } + return firstTime; + } + + /** + * Update termination criteria counters. @return <code>true</code> if the + * operator life cycle is over. + */ + public boolean haltOp(final HaltOpMessage msg) { + + nsteps++; + + if (log.isTraceEnabled()) + log.trace(msg.toString()); + + // chunks generated by this task. + final int fanOut = msg.sinkChunksOut + msg.altSinkChunksOut; + { + + totalAvailableChunkCount += fanOut; + + assert totalAvailableChunkCount >= 0 : "totalAvailableChunkCount=" + + totalAvailableChunkCount + " :: msg=" + msg; + + if (msg.sinkId != null) { + AtomicLong n = availableChunkCountMap.get(msg.sinkId); + if (n == null) + availableChunkCountMap + .put(msg.sinkId, n = new AtomicLong()); + + final long tmp = n.addAndGet(msg.sinkChunksOut); + + assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.sinkId + " :: msg=" + msg; + + } + + if (msg.altSinkId != null) { + + AtomicLong n = availableChunkCountMap.get(msg.altSinkId); + + if (n == null) + availableChunkCountMap.put(msg.altSinkId, + n = new AtomicLong()); + + final long tmp = n.addAndGet(msg.altSinkChunksOut); + + assert tmp >= 0 : "availableChunkCount=" + tmp + " for bopId=" + + msg.altSinkId + " :: msg=" + msg; + + } + + } + + // one less task is running. + totalRunningTaskCount--; + + assert totalRunningTaskCount >= 0 : "runningTaskCount=" + + totalRunningTaskCount + " :: msg=" + msg; + + { + + final AtomicLong n = runningTaskCountMap.get(msg.bopId); + + if (n == null) + throw new AssertionError(); + + final long tmp = n.decrementAndGet(); + + assert tmp >= 0 : "runningTaskCount=" + tmp + " for bopId=" + + msg.bopId + " :: msg=" + msg; + + } + + // Figure out if this operator is done. + final boolean isDone = isOperatorDone(msg.bopId); + + System.err.println("haltOp : nstep=" + nsteps + ", bopId=" + msg.bopId + + ",totalRunningTaskCount=" + totalRunningTaskCount + + ",totalAvailableTaskCount=" + totalAvailableChunkCount + + ",fanOut=" + fanOut); + + if (TableLog.tableLog.isInfoEnabled()) { + TableLog.tableLog.info(getTableRow("haltOp", msg.serviceId, + msg.partitionId, fanOut)); + } + + if (log.isTraceEnabled()) + log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId + + ",serviceId=" + query.getQueryEngine().getServiceUUID() + + ", nchunks=" + fanOut + " : totalRunningTaskCount=" + + totalRunningTaskCount + ", totalAvailableChunkCount=" + + totalAvailableChunkCount); + + // test termination criteria + final long deadline = query.getDeadline(); + if (msg.cause != null) { + + // operator failed on this chunk. + log.error("Error: Canceling query: queryId=" + queryId + ",bopId=" + + msg.bopId + ",partitionId=" + msg.partitionId, msg.cause); + + query.future.halt(msg.cause); + + query.cancel(true/* mayInterruptIfRunning */); + + } else if (totalRunningTaskCount == 0 && totalAvailableChunkCount == 0) { + + // success (all done). + if (log.isTraceEnabled()) + log.trace("success: queryId=" + queryId); + + query.future.halt(query.getStats()); + + query.cancel(true/* mayInterruptIfRunning */); + + } else if (deadline < System.currentTimeMillis()) { + + if (log.isTraceEnabled()) + log.trace("expired: queryId=" + queryId + ", deadline=" + + deadline); + + query.future.halt(new TimeoutException()); + + query.cancel(true/* mayInterruptIfRunning */); + + } + return isDone; + } + + /** + * Return <code>true</code> the specified operator can no longer be + * triggered by the query. The specific criteria are that no operators which + * are descendants of the specified operator are running or have chunks + * available against which they could run. Under those conditions it is not + * possible for a chunk to show up which would cause the operator to be + * executed. + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> if the operator can not be triggered given the + * current query activity. + * + * @throws IllegalMonitorStateException + * unless the {@link #runStateLock} is held by the caller. + */ + protected boolean isOperatorDone(final int bopId) { + + return PipelineUtility.isDone(bopId, query.getQuery(), query.bopIndex, + runningTaskCountMap, availableChunkCountMap); + + } + + /* + * Human readable representations of the query run state. + */ + + /** + * Human readable summary of the current {@link RunState}. + *<p> + * Note: You must holding the lock guarding the {@link RunState} to + * guarantee that will return a consistent representation. + */ + public String toString() { + + final StringBuilder sb = new StringBuilder(); + + sb.append(getClass().getName()); + sb.append("{nsteps=" + nsteps); + sb.append(",totalRunningTaskCount=" + totalRunningTaskCount); + sb.append(",totalAvailableTaskCount=" + totalAvailableChunkCount); + sb.append("}"); + + return sb.toString(); + + } + + private String getTableHeader() { + + final StringBuilder sb = new StringBuilder(); + + final Integer[] bopIds = query.bopIndex.keySet() + .toArray(new Integer[0]); + + Arrays.sort(bopIds); + + // header 2. + sb.append("step\tlabel\tshardId\tfanIO\tavail\trun"); + + for (int i = 0; i < bopIds.length; i++) { + + final Integer id = bopIds[i]; + + sb.append("\trun#" + id + "\tavail#" + id); + + } + + sb.append("\tserviceId"); + + sb.append('\n'); + + return sb.toString(); + + } + + /** + * Return a tabular representation of the query {@link RunState}. + *<p> + * Note: You must holding the lock guarding the {@link RunState} to + * guarantee that will return a consistent representation. + * + * @param label + * The state change level (startQ, startOp, haltOp). + * @param serviceId + * The node on which the operator is/was executed. + * @param shardId + * The index partition against which the operator was running and + * <code>-1</code> if the operator was not evaluated against a + * specific index partition. + * @param + * @param fanIO + * The fanIn (startQ,startOp) or fanOut (haltOp). + */ + private String getTableRow(final String label, final UUID serviceId, + final int shardId, final int fanIO) { + + final StringBuilder sb = new StringBuilder(); + + sb.append(Long.toString(nsteps)); + sb.append('\t'); + sb.append(label); + sb.append('\t'); + sb.append(Integer.toString(shardId)); + sb.append('\t'); + sb.append(Integer.toString(fanIO)); + sb.append('\t'); + sb.append(Long.toString(totalAvailableChunkCount)); + sb.append('\t'); + sb.append(Long.toString(totalRunningTaskCount)); + + final Integer[] bopIds = query.bopIndex.keySet() + .toArray(new Integer[0]); + + Arrays.sort(bopIds); + + for (int i = 0; i < bopIds.length; i++) { + + final Integer id = bopIds[i]; + + final AtomicLong nrunning = runningTaskCountMap.get(id); + + final AtomicLong navailable = availableChunkCountMap.get(id); + + sb.append("\t" + (navailable == null ? "N/A" : navailable.get())); + + sb.append("\t" + (nrunning == null ? "N/A" : nrunning.get())); + + } + + // Note: At the end to keep the table pretty. Will be null unless s/o. + sb.append('\t'); + sb.append(serviceId == null ? "N/A" : serviceId.toString()); + + sb.append('\n'); + + return sb.toString(); + + } + +} // class RunState Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -28,10 +28,7 @@ package com.bigdata.bop.engine; import java.rmi.RemoteException; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -81,8 +78,10 @@ /** * The run state of the query and the result of the computation iff it * completes execution normally (without being interrupted, cancelled, etc). + * <p> + * Note: Package private in order to expose this field to {@link RunState}. */ - final private Haltable<Map<Integer,BOpStats>> future = new Haltable<Map<Integer,BOpStats>>(); + final /*private*/ Haltable<Map<Integer,BOpStats>> future = new Haltable<Map<Integer,BOpStats>>(); /** * The runtime statistics for each {@link BOp} in the query and @@ -96,7 +95,7 @@ final private QueryEngine queryEngine; /** The unique identifier for this query. */ - final private long queryId; + final private UUID queryId; /** * The query deadline. The value is the system clock time in milliseconds @@ -149,8 +148,8 @@ private final ConcurrentHashMap<BSBundle, Future<?>> operatorFutures = new ConcurrentHashMap<BSBundle, Future<?>>(); /** - * A lock guarding {@link RunState#runningTaskCount}, - * {@link RunState#availableChunkCount}, + * A lock guarding {@link RunState#totalRunningTaskCount}, + * {@link RunState#totalAvailableChunkCount}, * {@link RunState#availableChunkCountMap}. This is <code>null</code> unless * this is the query controller. * @@ -209,6 +208,19 @@ } /** + * Return the query deadline (the time at which it will terminate regardless + * of its run state). + * + * @return The query deadline (milliseconds since the epoch) and + * {@link Long#MAX_VALUE} if no explicit deadline was specified. + */ + public long getDeadline() { + + return deadline.get(); + + } + + /** * The class executing the query on this node. */ public QueryEngine getQueryEngine() { @@ -233,7 +245,7 @@ /** * The unique identifier for this query. */ - public long getQueryId() { + public UUID getQueryId() { return queryId; @@ -283,15 +295,16 @@ * {@link ITx#UNISOLATED} nor a read-write transaction * identifier. */ - public RunningQuery(final QueryEngine queryEngine, final long queryId, -// final long begin, - final boolean controller, - final IQueryClient clientProxy, final BindingSetPipelineOp query - ) { + public RunningQuery(final QueryEngine queryEngine, final UUID queryId, + final boolean controller, final IQueryClient clientProxy, + final BindingSetPipelineOp query) { if (queryEngine == null) throw new IllegalArgumentException(); + if (queryId == null) + throw new IllegalArgumentException(); + if (clientProxy == null) throw new IllegalArgumentException(); @@ -392,6 +405,12 @@ if (!msg.isMaterialized()) throw new IllegalStateException(); + if (isCancelled()) + throw new IllegalStateException("Cancelled"); + + if (isDone()) + throw new IllegalStateException("Done"); + // verify still running. future.halted(); @@ -399,252 +418,11 @@ chunksIn.add(msg); if (log.isDebugEnabled()) - log.debug("queryId=" + queryId + ", chunksIn.size()=" - + chunksIn.size() + ", msg=" + msg); + log.debug("chunksIn.size()=" + chunksIn.size() + ", msg=" + msg); } /** - * The run state for the query. - */ - static private class RunState { - - /** - * The query. - */ - private final RunningQuery query; - - /** - * The query identifier. - */ - private final long queryId; - - /** - * The #of tasks for this query which have started but not yet halted - * and ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private long runningTaskCount = 0; - - /** - * The #of chunks for this query of which a running task has made - * available but which have not yet been accepted for processing by - * another task and ZERO (0) if this is not the query coordinator. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private long availableChunkCount = 0; - - /** - * A map reporting the #of chunks available for each operator in the - * pipeline (we only report chunks for pipeline operators). The total - * #of chunks available across all operators in the pipeline is reported - * by {@link #availableChunkCount}. - * <p> - * The movement of the intermediate binding set chunks forms an acyclic - * directed graph. This map is used to track the #of chunks available - * for each bop in the pipeline. When a bop has no more incoming chunks, - * we send an asynchronous message to all nodes on which that bop had - * executed informing the {@link QueryEngine} on that node that it - * should immediately release all resources associated with that bop. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Map<Integer/* bopId */, AtomicLong/* availableChunkCount */> availableChunkCountMap = new LinkedHashMap<Integer, AtomicLong>(); - - /** - * A collection reporting on the #of instances of a given {@link BOp} - * which are concurrently executing. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Map<Integer/* bopId */, AtomicLong/* runningCount */> runningCountMap = new LinkedHashMap<Integer, AtomicLong>(); - - /** - * A collection of the operators which have executed at least once. - * <p> - * This is guarded by the {@link #runningStateLock}. - */ - private final Set<Integer/* bopId */> startedSet = new LinkedHashSet<Integer>(); - - public RunState(final RunningQuery query) { - - this.query = query; - - this.queryId = query.queryId; - - } - - public void startQuery(final IChunkMessage<?> msg) { - - query.lifeCycleSetUpQuery(); - - final Integer bopId = Integer.valueOf(msg.getBOpId()); - - availableChunkCount++; - { - AtomicLong n = availableChunkCountMap.get(bopId); - if (n == null) - availableChunkCountMap.put(bopId, n = new AtomicLong()); - n.incrementAndGet(); - } - - if (log.isInfoEnabled()) - log.info("queryId=" + queryId + ",runningTaskCount=" - + runningTaskCount + ",availableChunks=" - + availableChunkCount); - - System.err.println("startQ : bopId=" + bopId + ",running=" - + runningTaskCount + ",available=" + availableChunkCount); - - } - - public void startOp(final StartOpMessage msg) { - - final Integer bopId = Integer.valueOf(msg.bopId); - - runningTaskCount++; - { - AtomicLong n = runningCountMap.get(bopId); - if (n == null) - runningCountMap.put(bopId, n = new AtomicLong()); - n.incrementAndGet(); - if (startedSet.add(bopId)) { - // first evaluation pass for this operator. - query.lifeCycleSetUpOperator(bopId); - } - } - - availableChunkCount -= msg.nchunks; - - { - AtomicLong n = availableChunkCountMap.get(bopId); - if (n == null) - throw new AssertionError(); - n.addAndGet(-msg.nchunks); - } - - System.err.println("startOp: bopId=" + bopId + ",running=" - + runningTaskCount + ",available=" + availableChunkCount - + ",fanIn=" + msg.nchunks); - - // check deadline. - if (query.deadline.get() < System.currentTimeMillis()) { - - if (log.isTraceEnabled()) - log.trace("expired: queryId=" + queryId + ", deadline=" - + query.deadline); - - query.future.halt(new TimeoutException()); - - query.cancel(true/* mayInterruptIfRunning */); - - } - - } - - /** - * Update termination criteria counters. - */ - public void haltOp(final HaltOpMessage msg) { - - // chunks generated by this task. - final int fanOut = msg.sinkChunksOut + msg.altSinkChunksOut; - availableChunkCount += fanOut; - if (msg.sinkId != null) { - AtomicLong n = availableChunkCountMap.get(msg.sinkId); - if (n == null) - availableChunkCountMap - .put(msg.sinkId, n = new AtomicLong()); - n.addAndGet(msg.sinkChunksOut); - } - if (msg.altSinkId != null) { - AtomicLong n = availableChunkCountMap.get(msg.altSinkId); - if (n == null) - availableChunkCountMap.put(msg.altSinkId, - n = new AtomicLong()); - n.addAndGet(msg.altSinkChunksOut); - } - // one less task is running. - runningTaskCount--; - { - final AtomicLong n = runningCountMap.get(msg.bopId); - if (n == null) - throw new AssertionError(); - n.decrementAndGet(); - } - // Figure out if this operator is done. - if (isOperatorDone(msg.bopId)) { - /* - * No more chunks can appear for this operator so invoke its end - * of life cycle hook. - */ - query.lifeCycleTearDownOperator(msg.bopId); - } - System.err.println("haltOp : bopId=" + msg.bopId + ",running=" - + runningTaskCount + ",available=" + availableChunkCount - + ",fanOut=" + fanOut); - assert runningTaskCount >= 0 : "runningTaskCount=" - + runningTaskCount; - assert availableChunkCount >= 0 : "availableChunkCount=" - + availableChunkCount; - if (log.isTraceEnabled()) - log.trace("bopId=" + msg.bopId + ",partitionId=" - + msg.partitionId + ",serviceId=" - + query.queryEngine.getServiceUUID() + ", nchunks=" - + fanOut + " : runningTaskCount=" + runningTaskCount - + ", availableChunkCount=" + availableChunkCount); - // test termination criteria - if (msg.cause != null) { - // operator failed on this chunk. - log.error("Error: Canceling query: queryId=" + queryId - + ",bopId=" + msg.bopId + ",partitionId=" - + msg.partitionId, msg.cause); - query.future.halt(msg.cause); - query.cancel(true/* mayInterruptIfRunning */); - } else if (runningTaskCount == 0 && availableChunkCount == 0) { - // success (all done). - if (log.isTraceEnabled()) - log.trace("success: queryId=" + queryId); - query.future.halt(query.getStats()); - query.cancel(true/* mayInterruptIfRunning */); - } else if (query.deadline.get() < System.currentTimeMillis()) { - if (log.isTraceEnabled()) - log.trace("expired: queryId=" + queryId + ", deadline=" - + query.deadline); - query.future.halt(new TimeoutException()); - query.cancel(true/* mayInterruptIfRunning */); - } - } - - /** - * Return <code>true</code> the specified operator can no longer be - * triggered by the query. The specific criteria are that no operators - * which are descendants of the specified operator are running or have - * chunks available against which they could run. Under those conditions - * it is not possible for a chunk to show up which would cause the - * operator to be executed. - * - * @param bopId - * Some operator identifier. - * - * @return <code>true</code> if the operator can not be triggered given - * the current query activity. - * - * @throws IllegalMonitorStateException - * unless the {@link #runStateLock} is held by the caller. - */ - protected boolean isOperatorDone(final int bopId) { - - return PipelineUtility.isDone(bopId, query.getQuery(), - query.bopIndex, runningCountMap, availableChunkCountMap); - - } - - } // class RunState - - /** * Invoked once by the query controller with the initial * {@link IChunkMessage} which gets the query moving. */ @@ -656,17 +434,17 @@ if (msg == null) throw new IllegalArgumentException(); - if (msg.getQueryId() != queryId) // @todo equals() if queryId is UUID. + if (!queryId.equals(msg.getQueryId())) throw new IllegalArgumentException(); runStateLock.lock(); try { - + + lifeCycleSetUpQuery(); + runState.startQuery(msg); - queryEngine.acceptChunk(msg); - } finally { runStateLock.unlock(); @@ -693,8 +471,9 @@ try { - runState.startOp(msg); - + if (runState.startOp(msg)) + lifeCycleSetUpOperator(msg.bopId); + } finally { runStateLock.unlock(); @@ -729,7 +508,16 @@ try { - runState.haltOp(msg); + if (runState.haltOp(msg)) { + + /* + * No more chunks can appear for this operator so invoke its end + * of life cycle hook. + */ + + lifeCycleTearDownOperator(msg.bopId); + + } } finally { @@ -996,19 +784,49 @@ altSinkChunksOut += handleOutputChunk(altSinkId, altSink); } - clientProxy.haltOp(new HaltOpMessage(queryId, bopId, - partitionId, serviceId, null/* cause */, - sinkId, sinkChunksOut, altSinkId, - altSinkChunksOut, context.getStats())); + final HaltOpMessage msg = new HaltOpMessage(queryId, bopId, + partitionId, serviceId, null/* cause */, sinkId, + sinkChunksOut, altSinkId, altSinkChunksOut, context + .getStats()); + clientProxy.haltOp(msg); } catch (Throwable t) { - try { - clientProxy.haltOp(new HaltOpMessage(queryId, - bopId, partitionId, serviceId, - t/* cause */, sinkId, sinkChunksOut, altSinkId, - altSinkChunksOut, context.getStats())); - } catch (RemoteException e) { - cancel(true/* mayInterruptIfRunning */); - log.error("queryId=" + queryId + ", bopId=" + bopId, e); + /* + * Mark the query as halted on this node regardless of whether + * we are able to communicate with the query controller. + * + * Note: Invoking halt(t) here will log an error. This logged + * error message is necessary in order to catch errors in + * clientProxy.haltOp() (above and below). + */ + // Note: uncomment if paranoid about masked errors after the 1st reported error. +// log.error("queryId=" + queryId + ", bopId=" + bopId, t); + + if (t == future.halt(t)) { + /* + * Send the halt message to the query controller. + * + * Note: Since the exception return from halt(t) is our + * exception, we are responsible for communicating this + * exception to the query controller. If that message does + * not arrive then the query controller will not know that + * we have terminated the query. This can result in a long + * running query which must be explicitly cancelled on the + * query controller. + * + * @todo if we are unable to send the message to the query + * controller then we could retry each time an error is + * thrown for this query. + */ + final HaltOpMessage msg = new HaltOpMessage(queryId, bopId, + partitionId, serviceId, t/* cause */, sinkId, + sinkChunksOut, altSinkId, altSinkChunksOut, context + .getStats()); + try { + clientProxy.haltOp(msg); + } catch (RemoteException e) { + cancel(true/* mayInterruptIfRunning */); + log.error("queryId=" + queryId + ", bopId=" + bopId, e); + } } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -22,7 +22,7 @@ private static final long serialVersionUID = 1L; /** The query identifier. */ - final public long queryId; + final public UUID queryId; /** The operator identifier. */ final public int bopId; @@ -39,7 +39,7 @@ */ final public int nchunks; - public StartOpMessage(final long queryId, final int opId, + public StartOpMessage(final UUID queryId, final int opId, final int partitionId, final UUID serviceId, final int nchunks) { this.queryId = queryId; this.bopId = opId; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-15 14:30:14 UTC (rev 3556) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-15 15:54:52 UTC (rev 3557) @@ -70,6 +70,11 @@ .getLogger(FederatedQueryEngine.class); /** + * The {@link UUID} associated with this service. + */ + private final UUID serviceUUID; + + /** * The {@link IBigdataFederation} iff running in scale-out. * <p> * Note: The {@link IBigdataFederation} is required in scale-out in order to @@ -99,7 +104,7 @@ @Override public UUID getServiceUUID() { - return fed.getServiceUUID(); + return serviceUUID; } @@ -127,7 +132,7 @@ * {@inheritDoc} */ @Override - protected FederatedRunningQuery getRunningQuery(final long queryId) { + protected FederatedRunningQuery getRunningQuery(final UUID queryId) { return (FederatedRunningQuery) super.getRunningQuery(queryId); @@ -147,10 +152,10 @@ */ public FederatedQueryEngine(final DataService dataService) { - this(dataService.getFederation(), + this(dataService.getServiceUUID(), dataService.getFederation(), new DelegateIndexManager(dataService), dataService .getResourceManager().getResourceService()); - + } /** @@ -164,6 +169,7 @@ * @param resourceService */ public FederatedQueryEngine(// + final UUID thisService, final IBigdataFederation<?> fed,// final IIndexManager indexManager,// final ManagedResourceService resourceService// @@ -179,6 +185,8 @@ this.fed = fed; + this.serviceUUID = thisService; + this.resourceService = resourceService; } @@ -277,6 +285,7 @@ if(!accept(msg)) { if(log.isDebugEnabled()) log.debug("dropping: " + msg); + continue; } if(log.isDebugEnabled()) log.debug("accepted: " + msg); @@ -287,7 +296,7 @@ * etc. */ FederatedQueryEngine.this - .bufferReady((IChunkMessage) msg); + .acceptChunk((IChunkMessage) msg); } catch(Throwable t) { if(InnerCause.isInnerCause(t, InterruptedException.class)) { log.warn("Interrupted."); @@ -318,7 +327,7 @@ */ private boolean accept(final IChunkMessage<?> msg) throws RemoteException { - final long queryId = msg.getQueryId(); + final UUID queryId = msg.getQueryId(); // lookup query by id. FederatedRunningQuery q = getRunningQuery(queryId); @@ -385,7 +394,7 @@ public void declareQuery(final IQueryDecl queryDecl) { - final long queryId = queryDecl.getQueryId(); + final UUID queryId = queryDecl.getQueryId(); putRunningQuery(queryId, newRunningQuery(this, queryId, false/* controller */, queryDecl.getQueryController(), @@ -411,7 +420,7 @@ */ @Override protected FederatedRunningQuery newRunningQuery( - final QueryEngine queryEngine, final long queryId, + final QueryEngine queryEngine, final UUID queryId, final boolean controller, final IQueryC... [truncated message content] |