From: <tho...@us...> - 2013-11-13 21:50:30
|
Revision: 7548 http://bigdata.svn.sourceforge.net/bigdata/?rev=7548&view=rev Author: thompsonbry Date: 2013-11-13 21:50:23 +0000 (Wed, 13 Nov 2013) Log Message: ----------- Added @Override annotations and final attributes. Added checkDeadline() method on AbstractRunningQuery. This performs a non-blocking test of the RunState to determine whether a deadline (if one exists) has expired. If so, it halts the query. checkDeadline() is intended to provide a hook that can be used to force timely termination of queries that miss their deadline and do not terminate because some operator is compute bound. The current logic only check the deadline in startOp() and haltOp(). We will have to add additional logic to call checkDeadline() at other times, e.g., from a scheduled executor task, in order to ensure timely termination. See #772. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -340,24 +340,56 @@ } + /** + * If the query deadline has expired, then halt the query. + * + * @throws QueryTimeoutException + * if the query deadline has expired. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/772"> + * Query timeout only checked at operator start/stop. </a> + */ + final protected void checkDeadline() { + + try { + + runState.checkDeadline(); + + } catch (QueryTimeoutException ex) { + + halt(ex); + + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. See startOp() and haltOp() for the standard behavior. + */ + + } + + } + + @Override final public long getDeadline() { return runState.getDeadline(); } + @Override final public long getStartTime() { return startTime.get(); } + @Override final public long getDoneTime() { return doneTime.get(); } + @Override final public long getElapsed() { long mark = doneTime.get(); @@ -379,37 +411,28 @@ } + @Override public QueryEngine getQueryEngine() { return queryEngine; } - /** - * The client executing this query (aka the query controller). - * <p> - * Note: The proxy is primarily for light weight RMI messages used to - * coordinate the distributed query evaluation. Ideally, all large objects - * will be transfered among the nodes of the cluster using NIO buffers. - */ + @Override final public IQueryClient getQueryController() { return clientProxy; } - /** - * The unique identifier for this query. - */ + @Override final public UUID getQueryId() { return queryId; } - /** - * Return the operator tree for this query. - */ + @Override final public PipelineOp getQuery() { return query; @@ -425,6 +448,7 @@ } + @Override final public Map<Integer/* bopId */, BOpStats> getStats() { return Collections.unmodifiableMap(statsMap); @@ -744,6 +768,11 @@ halt(ex); + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. + */ + } finally { lock.unlock(); @@ -830,6 +859,11 @@ halt(t); + /* + * Note: The exception is not rethrown when the query halts for a + * deadline. + */ + } finally { lock.unlock(); @@ -1149,6 +1183,7 @@ */ abstract protected void consumeChunk(); + @Override final public ICloseableIterator<IBindingSet[]> iterator() { if (!controller) @@ -1161,6 +1196,7 @@ } + @Override final public void halt(final Void v) { lock.lock(); @@ -1181,6 +1217,7 @@ } + @Override final public <T extends Throwable> T halt(final T t) { if (t == null) @@ -1223,6 +1260,7 @@ * consume them.</li> * </ul> */ + @Override final public boolean cancel(final boolean mayInterruptIfRunning) { /* * Set if we notice an interrupt during clean up of the query and then @@ -1397,43 +1435,50 @@ } + @Override final public Void get() throws InterruptedException, ExecutionException { return future.get(); } - final public Void get(long arg0, TimeUnit arg1) + @Override + final public Void get(final long arg0, final TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { return future.get(arg0, arg1); } + @Override final public boolean isCancelled() { return future.isCancelled(); } + @Override final public boolean isDone() { return future.isDone(); } + @Override final public Throwable getCause() { return future.getCause(); } + @Override public IBigdataFederation<?> getFederation() { return queryEngine.getFederation(); } + @Override public IIndexManager getLocalIndexManager() { return queryEngine.getIndexManager(); @@ -1526,6 +1571,7 @@ * buffered on the native heap) rather than as a limit to the among of * native memory the operator may use while it is running. */ + @Override public IMemoryManager getMemoryManager() { IMemoryManager memoryManager = this.memoryManager.get(); if (memoryManager == null) { @@ -1545,6 +1591,7 @@ private final AtomicReference<IMemoryManager> memoryManager = new AtomicReference<IMemoryManager>(); + @Override final public IQueryAttributes getAttributes() { return queryAttributes; @@ -1618,6 +1665,7 @@ } } + @Override public String toString() { final StringBuilder sb = new StringBuilder(getClass().getName()); sb.append("{queryId=" + queryId); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -85,9 +85,13 @@ /** * The client coordinate the evaluation of this query (aka the query * controller). For a standalone database, this will be the - * {@link QueryEngine}. For scale-out, this will be the RMI proxy for the - * {@link QueryEngine} instance to which the query was submitted for - * evaluation by the application. + * {@link QueryEngine}. + * <p> + * For scale-out, this will be the RMI proxy for the {@link QueryEngine} + * instance to which the query was submitted for evaluation by the + * application. The proxy is primarily for light weight RMI messages used to + * coordinate the distributed query evaluation. Ideally, all large objects + * will be transfered among the nodes of the cluster using NIO buffers. */ IQueryClient getQueryController(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -722,6 +722,7 @@ } + @Override public void run() { if(log.isInfoEnabled()) log.info("Running: " + this); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2013-11-13 17:42:46 UTC (rev 7547) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2013-11-13 21:50:23 UTC (rev 7548) @@ -606,8 +606,7 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); + checkDeadline(); if (!innerState.started.compareAndSet(false/* expect */, true/* update */)) throw new IllegalStateException(ERR_QUERY_STARTED); @@ -704,11 +703,9 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); -// + " bopId="+msg.bopId+" : msg="+msg); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); - + checkDeadline(); + innerState.stepCount.incrementAndGet(); final boolean firstTime = _startOp(msg); @@ -777,6 +774,19 @@ } // RunStateEnum /** + * Check the query to see whether its deadline has expired. + * + * @throws QueryTimeoutException + * if the query deadline has expired. + */ + protected void checkDeadline() throws QueryTimeoutException { + + if (innerState.deadline.get() < System.currentTimeMillis()) + throw new QueryTimeoutException(ERR_DEADLINE); + + } + + /** * Update the {@link RunState} to reflect the post-condition of the * evaluation of an operator against one or more {@link IChunkMessage}s, * adjusting the #of messages available for consumption by the operator @@ -809,8 +819,7 @@ if (innerState.allDone.get()) throw new IllegalStateException(ERR_QUERY_HALTED); - if (innerState.deadline.get() < System.currentTimeMillis()) - throw new QueryTimeoutException(ERR_DEADLINE); + checkDeadline(); innerState.stepCount.incrementAndGet(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |