From: <tho...@us...> - 2011-01-15 14:53:34
|
Revision: 4101 http://bigdata.svn.sourceforge.net/bigdata/?rev=4101&view=rev Author: thompsonbry Date: 2011-01-15 14:53:26 +0000 (Sat, 15 Jan 2011) Log Message: ----------- Working through a problem with SubqueryOp leading to non-termination of some queries in BSBM. This appears to be fixed at this checkpoint. I am not sure yet if the fix was the -XX:+UseMembar JVM argument (I was running with JDK 1.6.0_17 which can loose monitor wake up signals) or the modifications to SubqueryOp. I have made several changes to improve the trapping of "normal" query termination exceptions (InterruptedException, BufferClosedException, and ClosedByInterruptException). All of these can arise in response to an interrupt triggered when a LIMIT is satisfied on the query. The RWStore and WORMStore properties files were modified to turn on the new query evaluation strategy impl which handles optional join groups. The data need to be loaded with that option enabled and also with dataTime inlining enabled. Added more reporting capabilities to the NanoSparqlServer, including reporting of the BOPs for the currently executing queries. This required adding a public method to IQueryClient to report on the UUIDs of the active IRunningQuery instances and making getRunningQuery(UUID) public so we can inspect the query by its UUID. BlockingBuffer was generating a log message even when it would not log the message. This is fixed. RWStore was logging an error in getData() when there are several non-error reasons why that method could throw an exception (an interrupt leading to a ClosedByInterrupt exception is the main one). Modified SOp2BOpUtility to layer the slice over the subquery op and not the other way around. The current BSBM performance status is excellent for the reduced query mix (without Q5 or Q6). Q5 appears to have a bad join plan which causes high CPU utilization. Q6 is visiting much more data than is otherwise required in order to satisfy a regex without a prefix anchor. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/RWStore.properties branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/WORMStore.properties branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/build.xml branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailGraphQuery.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -27,6 +27,7 @@ package com.bigdata.bop.controller; +import java.nio.channels.ClosedByInterruptException; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -46,7 +47,6 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.util.InnerCause; -import com.bigdata.util.concurrent.LatchedExecutor; /** * For each binding set presented, this operator executes a subquery. Any @@ -56,7 +56,7 @@ * semantics). Each subquery is run as a separate query but will be cancelled if * the parent query is cancelled. * - * FIXME Parallel evaluation of subqueries is not implemented. What is the + * @todo Parallel evaluation of subqueries is not implemented. What is the * appropriate parallelism for this operator? More parallelism should reduce * latency but could increase the memory burden. Review this decision once we * have the RWStore operating as a binding set buffer on the Java process heap. @@ -83,30 +83,30 @@ /** * When <code>true</code> the subquery has optional semantics (if the * subquery fails, the original binding set will be passed along to the - * downstream sink anyway). + * downstream sink anyway) (default {@value #DEFAULT_OPTIONAL}). */ String OPTIONAL = SubqueryOp.class.getName() + ".optional"; boolean DEFAULT_OPTIONAL = false; - /** - * The maximum parallelism with which the subqueries will be evaluated - * (default {@value #DEFAULT_MAX_PARALLEL}). - */ - String MAX_PARALLEL = SubqueryOp.class.getName() - + ".maxParallel"; +// /** +// * The maximum parallelism with which the subqueries will be evaluated +// * (default {@value #DEFAULT_MAX_PARALLEL}). +// */ +// String MAX_PARALLEL = SubqueryOp.class.getName() +// + ".maxParallel"; +// +// int DEFAULT_MAX_PARALLEL = 1; - int DEFAULT_MAX_PARALLEL = 1; - } - /** - * @see Annotations#MAX_PARALLEL - */ - public int getMaxParallel() { - return getProperty(Annotations.MAX_PARALLEL, - Annotations.DEFAULT_MAX_PARALLEL); - } +// /** +// * @see Annotations#MAX_PARALLEL +// */ +// public int getMaxParallel() { +// return getProperty(Annotations.MAX_PARALLEL, +// Annotations.DEFAULT_MAX_PARALLEL); +// } /** * Deep copy constructor. @@ -171,14 +171,14 @@ */ private static class ControllerTask implements Callable<Void> { - private final SubqueryOp controllerOp; +// private final SubqueryOp controllerOp; private final BOpContext<IBindingSet> context; // private final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); // private final CountDownLatch latch; private final boolean optional; - private final int nparallel; +// private final int nparallel; private final PipelineOp subquery; - private final Executor executor; +// private final Executor executor; public ControllerTask(final SubqueryOp controllerOp, final BOpContext<IBindingSet> context) { @@ -188,21 +188,21 @@ if (context == null) throw new IllegalArgumentException(); - this.controllerOp = controllerOp; +// this.controllerOp = controllerOp; this.context = context; this.optional = controllerOp.getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); - this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, - Annotations.DEFAULT_MAX_PARALLEL); +// this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, +// Annotations.DEFAULT_MAX_PARALLEL); this.subquery = (PipelineOp) controllerOp .getRequiredProperty(Annotations.SUBQUERY); - this.executor = new LatchedExecutor(context.getIndexManager() - .getExecutorService(), nparallel); +// this.executor = new LatchedExecutor(context.getIndexManager() +// .getExecutorService(), nparallel); // this.latch = new CountDownLatch(controllerOp.arity()); @@ -258,26 +258,37 @@ for(IBindingSet bset : chunk) { - FutureTask<IRunningQuery> ft = new FutureTask<IRunningQuery>( - new SubqueryTask(bset, subquery, context)); + final IRunningQuery runningSubquery = new SubqueryTask( + bset, subquery, context).call(); - // run the subquery. - executor.execute(ft); + if (!runningSubquery.isDone()) { - try { + throw new AssertionError("Future not done: " + + runningSubquery.toString()); + + } - // wait for the outcome. - ft.get(); - - } finally { - - /* - * Ensure that the inner task is cancelled if the - * outer task is interrupted. - */ - ft.cancel(true/* mayInterruptIfRunning */); - - } +// Note: Variant using executor, but still does not support parallel evaluation of subqueries. +// final FutureTask<IRunningQuery> ft = new FutureTask<IRunningQuery>( +// new SubqueryTask(bset, subquery, context)); +// +// try { +// +// // run the subquery. +// executor.execute(ft); +// +// // wait for the outcome. +// ft.get(); +// +// } finally { +// +// /* +// * Ensure that the inner task is cancelled if the +// * outer task is interrupted. +// */ +// ft.cancel(true/* mayInterruptIfRunning */); +// +// } } @@ -362,16 +373,15 @@ public IRunningQuery call() throws Exception { + // The subquery + IRunningQuery runningSubquery = null; + // The iterator draining the subquery IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; - IRunningQuery runningQuery = null; try { final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); -// final IRunningQuery runningQuery = queryEngine -// .eval(subQueryOp); - final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); final int startId = startOp.getId(); @@ -379,7 +389,7 @@ final UUID queryId = UUID.randomUUID(); // execute the subquery, passing in the source binding set. - runningQuery = queryEngine + runningSubquery = queryEngine .eval( queryId, (PipelineOp) subQueryOp, @@ -391,40 +401,58 @@ new ThickAsynchronousIterator<IBindingSet[]>( new IBindingSet[][] { new IBindingSet[] { bset } }))); - // Iterator visiting the subquery solutions. - subquerySolutionItr = runningQuery.iterator(); + long ncopied = 0L; + try { + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningSubquery.iterator(); - // Copy solutions from the subquery to the query. - final long ncopied = BOpUtility.copy(subquerySolutionItr, - parentContext.getSink(), null/* sink2 */, - null/* constraints */, null/* stats */); - - // wait for the subquery. - runningQuery.get(); + // Copy solutions from the subquery to the query. + ncopied = BOpUtility.copy(subquerySolutionItr, + parentContext.getSink(), null/* sink2 */, + null/* constraints */, null/* stats */); + // wait for the subquery to halt / test for errors. + runningSubquery.get(); + + } catch (InterruptedException ex) { + + // this thread was interrupted, so cancel the subquery. + runningSubquery + .cancel(true/* mayInterruptIfRunning */); + + // rethrow the exception. + throw ex; + + } + if (ncopied == 0L && optional) { /* * Since there were no solutions for the subquery, copy * the original binding set to the default sink. */ - parentContext.getSink().add(new IBindingSet[]{bset}); + + parentContext.getSink().add(new IBindingSet[]{bset}); } // done. - return runningQuery; + return runningSubquery; } catch (Throwable t) { - /* - * Note: SliceOp will cause other operators to be - * interrupted during normal evaluation but we do not want - * to terminate the parent query when this occurs. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class)) { - + /* + * Note: SliceOp will cause other operators to be + * interrupted during normal evaluation. Therefore, while + * these exceptions should cause the subquery to terminate, + * they should not be reported as errors to the parent + * query. + */ + if (!InnerCause.isInnerCause(t, InterruptedException.class) + && !InnerCause.isInnerCause(t, BufferClosedException.class) + && !InnerCause.isInnerCause(t, ClosedByInterruptException.class)) { + /* * If a subquery fails, then propagate the error to the * parent and rethrow the first cause error out of the @@ -435,13 +463,25 @@ } - return runningQuery; + return runningSubquery; } finally { - if (subquerySolutionItr != null) - subquerySolutionItr.close(); + try { + // ensure subquery is halted. + if (runningSubquery != null) + runningSubquery + .cancel(true/* mayInterruptIfRunning */); + + } finally { + + // ensure the subquery solution iterator is closed. + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -28,6 +28,7 @@ package com.bigdata.bop.engine; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -799,19 +800,25 @@ try { - /* - * Note: SliceOp will cause other operators to be interrupted - * during normal evaluation so it is not useful to log an - * InterruptedException @ ERROR. - */ - if (!InnerCause.isInnerCause(t, InterruptedException.class) - && !InnerCause.isInnerCause(t, BufferClosedException.class)) - log.error(toString(), t); - try { - // signal error condition. - return future.halt(t); + /* + * Note: SliceOp will cause other operators to be interrupted + * during normal evaluation so it is not useful to log an + * InterruptedException @ ERROR. + */ + if (!InnerCause.isInnerCause(t, InterruptedException.class) + && !InnerCause.isInnerCause(t, BufferClosedException.class) + && !InnerCause.isInnerCause(t, ClosedByInterruptException.class)) { + log.error(toString(), t); + // signal error condition. + return future.halt(t); + } else { + // normal termination. + future.halt((Void)null/* result */); + // the caller's cause. + return t; + } } finally { @@ -990,16 +997,28 @@ public String toString() { final StringBuilder sb = new StringBuilder(getClass().getName()); - sb.append("{queryId=" + queryId); - sb.append(",deadline=" + deadline.get()); - sb.append(",isDone=" + isDone()); - sb.append(",isCancelled=" + isCancelled()); - sb.append(",runState=" + runState); - sb.append(",controller=" + controller); - sb.append(",clientProxy=" + clientProxy); - sb.append(",query=" + query); - sb.append("}"); - return sb.toString(); + sb.append("{queryId=" + queryId); + /* + * Note: Obtaining the lock here is required to avoid concurrent + * modification exception in RunState's toString() when there is a + * concurrent change in the RunState. It also makes the isDone() and + * isCancelled() reporting atomic. + */ + lock.lock(); + try { + sb.append(",elapsed=" + getElapsed()); + sb.append(",deadline=" + deadline.get()); + sb.append(",isDone=" + isDone()); + sb.append(",isCancelled=" + isCancelled()); + sb.append(",runState=" + runState); + } finally { + lock.unlock(); + } + sb.append(",controller=" + controller); + sb.append(",clientProxy=" + clientProxy); + sb.append(",query=" + query); + sb.append("}"); + return sb.toString(); } // abstract protected IChunkHandler getChunkHandler(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -27,6 +27,7 @@ */ package com.bigdata.bop.engine; +import java.nio.channels.ClosedByInterruptException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -783,26 +784,33 @@ public void run() { - super.run(); + try { - /* - * This task is done executing so remove its Future before we - * attempt to schedule another task for the same - * (bopId,partitionId). - */ - final ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures - .get(new BSBundle(t.bopId, t.partitionId)); + super.run(); + + } finally { - if (map != null) { + /* + * This task is done executing so remove its Future before we + * attempt to schedule another task for the same + * (bopId,partitionId). + */ - map.remove(this, this); + final ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures + .get(new BSBundle(t.bopId, t.partitionId)); - } + if (map != null) { - // Schedule another task if any messages are waiting. - ChunkedRunningQuery.this.scheduleNext(new BSBundle( - t.bopId, t.partitionId)); - + map.remove(this, this); + + } + + } + + // Schedule another task if any messages are waiting. + ChunkedRunningQuery.this.scheduleNext(new BSBundle(t.bopId, + t.partitionId)); + } } @@ -852,6 +860,21 @@ final long begin = System.currentTimeMillis(); try { t.call(); + } catch(Throwable t) { + /* + * Note: SliceOp will cause other operators to be + * interrupted during normal evaluation. Therefore, while + * these exceptions should cause the query to terminate, + * they should not be reported as errors to the query + * controller. + */ + if (!InnerCause.isInnerCause(t, InterruptedException.class) + && !InnerCause.isInnerCause(t, BufferClosedException.class) + && !InnerCause.isInnerCause(t, ClosedByInterruptException.class) + ) { + // Not an error that we should ignore. + throw t; + } } finally { t.context.getStats().elapsed.add(System.currentTimeMillis() - begin); @@ -876,19 +899,10 @@ } catch (Throwable ex1) { - /* - * Note: SliceOp will cause other operators to be interrupted - * during normal evaluation so it is not useful to log an - * InterruptedException @ ERROR. - */ - if (!InnerCause.isInnerCause(ex1, InterruptedException.class) - && !InnerCause.isInnerCause(ex1, BufferClosedException.class) - ) { - // Log an error. - log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId - + ", bop=" + t.bop, ex1); - } - + // Log an error. + log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId + + ", bop=" + t.bop, ex1); + /* * Mark the query as halted on this node regardless of whether * we are able to communicate with the query controller. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -10,6 +10,13 @@ */ public interface IQueryClient extends IQueryPeer { + /** + * Return the set of queries which are running as of the moment when the + * request was processed. Queries reported in the returned array may + * terminate at any time. + */ + UUID[] getRunningQueries(); + /** * Return the query. * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -58,6 +58,7 @@ import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; +import com.bigdata.rdf.sail.bench.NanoSparqlClient; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.resources.IndexManager; @@ -967,17 +968,20 @@ } } - + /** - * Return the {@link AbstractRunningQuery} associated with that query identifier. + * Return the {@link AbstractRunningQuery} associated with that query + * identifier. * * @param queryId * The query identifier. * - * @return The {@link AbstractRunningQuery} -or- <code>null</code> if there is no - * query associated with that query identifier. + * @return The {@link AbstractRunningQuery} -or- <code>null</code> if there + * is no query associated with that query identifier. + * + * @todo Exposed to {@link NanoSparqlServer} */ - protected AbstractRunningQuery getRunningQuery(final UUID queryId) { + public /*protected*/ AbstractRunningQuery getRunningQuery(final UUID queryId) { if(queryId == null) throw new IllegalArgumentException(); @@ -1164,4 +1168,10 @@ } + public UUID[] getRunningQueries() { + + return runningQueries.keySet().toArray(new UUID[0]); + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -27,6 +27,7 @@ package com.bigdata.bop.engine; +import java.nio.channels.ClosedByInterruptException; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -440,6 +441,21 @@ final long begin = System.currentTimeMillis(); try { t.call(); + } catch(Throwable t) { + /* + * Note: SliceOp will cause other operators to be + * interrupted during normal evaluation. Therefore, while + * these exceptions should cause the query to terminate, + * they should not be reported as errors to the query + * controller. + */ + if (!InnerCause.isInnerCause(t, InterruptedException.class) + && !InnerCause.isInnerCause(t, BufferClosedException.class) + && !InnerCause.isInnerCause(t, ClosedByInterruptException.class) + ) { + // Not an error that we should ignore. + throw t; + } } finally { t.context.getStats().elapsed.add(System.currentTimeMillis() - begin); @@ -457,20 +473,11 @@ StandaloneChainedRunningQuery.this.haltOp(msg); } catch (Throwable ex1) { -log.fatal(ex1,ex1); // FIXME remove log stmt. - /* - * Note: SliceOp will cause other operators to be interrupted - * during normal evaluation so it is not useful to log an - * InterruptedException @ ERROR. - */ - if (!InnerCause.isInnerCause(ex1, InterruptedException.class) - && !InnerCause.isInnerCause(ex1, BufferClosedException.class) - ) { - // Log an error. - log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId - + ", bop=" + t.bop, ex1); - } + // Log an error. + log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId + + ", bop=" + t.bop, ex1); + /* * Mark the query as halted on this node regardless of whether * we are able to communicate with the query controller. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -163,7 +163,7 @@ * {@inheritDoc} */ @Override - protected FederatedRunningQuery getRunningQuery(final UUID queryId) { + public /*protected*/ FederatedRunningQuery getRunningQuery(final UUID queryId) { return (FederatedRunningQuery) super.getRunningQuery(queryId); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -1074,6 +1074,8 @@ logTimeout += Math.min(maxLogTimeout, logTimeout); + if(log.isInfoEnabled()) { + final String msg = "blocked: ntries=" + ntries + ", elapsed=" @@ -1096,6 +1098,8 @@ // issue warning. log.warn(msg); } + + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -1432,7 +1432,13 @@ } } } catch (Throwable e) { - log.error(e,e); + /* + * Note: ClosedByInterruptException can be thrown out of + * FileChannelUtility.readAll(), typically because the LIMIT on + * a query was satisified, but we do not want to log that as an + * error. + */ +// log.error(e,e); throw new IllegalArgumentException("Unable to read data", e); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -239,9 +239,11 @@ } finally { - src.close(); - - buffer.close(); + try { + src.close(); + } finally { + buffer.close(); + } } @@ -336,7 +338,11 @@ log.info("lastIndex=" + lastIndex + ", chunkSize=" + (chunk != null ? "" + chunk.length : "N/A")); - // asynchronous close by the consumer of the producer's buffer. + /* + * Asynchronous close by the consumer of the producer's buffer. This + * will cause the ChunkConsumerTask to abort if it is still running and + * that will cause the [src] to be closed. + */ buffer.close(); chunk = null; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -952,6 +952,10 @@ throws RemoteException { } + public UUID[] getRunningQueries() { + return null; + } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -259,6 +259,10 @@ throws RemoteException { } + public UUID[] getRunningQueries() { + return null; + } + } private static class MyNIOChunkMessage<E> extends NIOChunkMessage<E> { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -232,6 +232,10 @@ throws RemoteException { } + public UUID[] getRunningQueries() { + return null; + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/RWStore.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/RWStore.properties 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/RWStore.properties 2011-01-15 14:53:26 UTC (rev 4101) @@ -24,6 +24,17 @@ com.bigdata.namespace.BSBM_284826.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 com.bigdata.namespace.BSBM_284826.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 +# Reduce the branching factor for the lexicon since BSBM uses a lot of long +# literals. Note that you have to edit this override to specify the namespace +# into which the BSBM data will be loaded. +com.bigdata.namespace.BSBM_566496.lex.TERM2ID.com.bigdata.btree.BTree.branchingFactor=32 +com.bigdata.namespace.BSBM_566496.lex.ID2TERM.com.bigdata.btree.BTree.branchingFactor=32 + +# 4k pages. +com.bigdata.namespace.BSBM_566496.spo.POS.com.bigdata.btree.BTree.branchingFactor=970 +com.bigdata.namespace.BSBM_566496.spo.SPO.com.bigdata.btree.BTree.branchingFactor=512 +com.bigdata.namespace.BSBM_566496.spo.OSP.com.bigdata.btree.BTree.branchingFactor=470 + # Override the #of write cache buffers. com.bigdata.journal.AbstractJournal.writeCacheBufferCount=12 @@ -59,3 +70,6 @@ # 10000 is default. com.bigdata.rdf.sail.bufferCapacity=100000 + +# direct sesame to bop translation. +com.bigdata.rdf.sail.newEvalStrategy=true Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/WORMStore.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/WORMStore.properties 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/WORMStore.properties 2011-01-15 14:53:26 UTC (rev 4101) @@ -38,3 +38,6 @@ # 10000 is default. com.bigdata.rdf.sail.bufferCapacity=100000 + +# direct sesame to bop translation. +com.bigdata.rdf.sail.newEvalStrategy=true Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/build.xml =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/build.xml 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/build.xml 2011-01-15 14:53:26 UTC (rev 4101) @@ -51,14 +51,14 @@ <exclude name="**/*.java" /> <exclude name="**/package.html" /> </fileset> - <!-- copy log4j configuration file. --> - <fileset dir="${bsbm.dir}/src/resources/logging" /> </copy> <copy toDir="${build.dir}/bin"> <!-- copy benchmark data and queries. --> <fileset dir="${bsbm.dir}/src/resources/bsbm-data" /> <!-- copy the journal configuration file. --> <fileset file="${bsbm.dir}/*.properties" /> + <!-- copy log4j configuration file. --> + <fileset dir="${bsbm.dir}/src/resources/logging" /> </copy> </target> @@ -143,7 +143,7 @@ <!-- delete file if it exists so we load into a new journal. --> <delete file="${bsbm.journalFile}" /> <java classname="com.bigdata.rdf.store.DataLoader" fork="true" failonerror="true" dir="${build.dir}/bin"> - <arg line="-namespace ${bsbm.namespace} ${bsbm.journalPropertyFile} ${bsbm.outputFile}.${bsbm.outputType}${bsbm.compressType}" /> + <arg line="-verbose -namespace ${bsbm.namespace} ${bsbm.journalPropertyFile} ${bsbm.outputFile}.${bsbm.outputType}${bsbm.compressType}" /> <!-- specify/override the journal file name. --> <jvmarg line="${queryJvmArgs} -Dcom.bigdata.journal.AbstractJournal.file=${bsbm.journalFile} -Dcom.bigdata.rdf.store.DataLoader.bufferCapacity=1000000 Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties 2011-01-15 14:53:26 UTC (rev 4101) @@ -244,7 +244,7 @@ log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender log4j.appender.queryRunStateLog.Threshold=ALL -log4j.appender.queryRunStateLog.File=queryRunState.log +log4j.appender.queryRunStateLog.File=queryRunState.csv log4j.appender.queryRunStateLog.Append=true # I find that it is nicer to have this unbuffered since you can see what # is going on and to make sure that I have complete rule evaluation logs Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties 2011-01-15 14:53:26 UTC (rev 4101) @@ -156,6 +156,8 @@ #log4j.logger.com.bigdata.relation.accesspath.IAccessPath=DEBUG #log4j.logger.com.bigdata.rdf.sail.BigdataSail=DEBUG +#log4j.logger.com.bigdata.rdf.sail.Rule2BOpUtility=INFO +log4j.logger.com.bigdata.bop.controller.JoinGraph=INFO #log4j.logger.com.bigdata.rdf.sail.TestNamedGraphs=DEBUG log4j.logger.com.bigdata.rdf.sail.QuadsTestCase=DEBUG #log4j.logger.com.bigdata.relation.rule.eval.NestedSubqueryWithJoinThreadsTask=DEBUG @@ -178,7 +180,7 @@ log4j.logger.com.bigdata.rdf.store.DataLoader=INFO # Test suite logger. -log4j.logger.junit=INFO +#log4j.logger.junit=INFO #log4j.logger.junit=DEBUG log4j.logger.com.bigdata.btree.AbstractBTreeTestCase=INFO @@ -202,6 +204,7 @@ ## # Summary query evaluation log (tab delimited file). +# Uncomment the next line to enable. #log4j.logger.com.bigdata.bop.engine.QueryLog=INFO,queryLog log4j.additivity.com.bigdata.bop.engine.QueryLog=false log4j.appender.queryLog=org.apache.log4j.FileAppender Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -1725,6 +1725,11 @@ new BigdataBindingSetResolverator(database, it2).start(database .getExecutorService())); + /* + * FIXME This will deadlock in the buffer fills - see + * BigdataEvaluationStrategyImpl3 which contains a new code pattern for + * this. + */ try { // Wait for the Future (checks for errors). runningQuery.get(); Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -711,24 +711,34 @@ final Collection<Filter> sesameFilters) throws QueryEvaluationException { + IRunningQuery runningQuery = null; try { - final IRunningQuery runningQuery = queryEngine.eval(query); - + // Submit query for evaluation. + runningQuery = queryEngine.eval(query); + + // Iterator draining the query results. final IAsynchronousIterator<IBindingSet[]> it1 = runningQuery.iterator(); + // De-chunk the IBindingSet[] visited by that iterator. final IChunkedOrderedIterator<IBindingSet> it2 = new ChunkedWrappedIterator<IBindingSet>( new Dechunkerator<IBindingSet>(it1)); - - CloseableIteration<BindingSet, QueryEvaluationException> result = + + // Materialize IVs as RDF Values. + CloseableIteration<BindingSet, QueryEvaluationException> result = + // Monitor IRunningQuery and cancel if Sesame iterator is closed. + new RunningQueryCloseableIteration<BindingSet, QueryEvaluationException>(runningQuery, + // Convert bigdata binding sets to Sesame binding sets. new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( + // Materialize IVs as RDF Values. new BigdataBindingSetResolverator(database, it2).start( - database.getExecutorService())); + database.getExecutorService()))); - // Wait for the Future (checks for errors). - runningQuery.get(); +// No - will deadlock if buffer fills up +// // Wait for the Future (checks for errors). +// runningQuery.get(); // use the basic filter iterator for remaining filters if (sesameFilters != null) { @@ -740,13 +750,13 @@ } } - return result; - - } catch (QueryEvaluationException ex) { - throw ex; - } catch (Exception ex) { - throw new QueryEvaluationException(ex); - } + return result; + + } catch (Throwable t) { + if (runningQuery != null) + runningQuery.cancel(true/* mayInterruptIfRunning */); + throw new QueryEvaluationException(t); + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailGraphQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailGraphQuery.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailGraphQuery.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -228,8 +228,8 @@ try { - TupleExpr tupleExpr = getParsedQuery().getTupleExpr(); - BigdataSailConnection sailCon = + final TupleExpr tupleExpr = getParsedQuery().getTupleExpr(); + final BigdataSailConnection sailCon = (BigdataSailConnection) getConnection().getSailConnection(); CloseableIteration<? extends BindingSet, QueryEvaluationException> bindingsIter = sailCon.evaluate( @@ -242,7 +242,7 @@ bindingsIter) { @Override protected boolean accept(BindingSet bindingSet) { - Value context = bindingSet.getValue("context"); + final Value context = bindingSet.getValue("context"); return bindingSet.getValue("subject") instanceof Resource && bindingSet.getValue("predicate") instanceof URI && bindingSet.getValue("object") instanceof Value @@ -254,15 +254,15 @@ // Convert the BindingSet objects to actual RDF statements final ValueFactory vf = getConnection().getRepository().getValueFactory(); - CloseableIteration<Statement, QueryEvaluationException> stIter; + final CloseableIteration<Statement, QueryEvaluationException> stIter; stIter = new ConvertingIteration<BindingSet, Statement, QueryEvaluationException>(bindingsIter) { @Override protected Statement convert(BindingSet bindingSet) { - Resource subject = (Resource)bindingSet.getValue("subject"); - URI predicate = (URI)bindingSet.getValue("predicate"); - Value object = bindingSet.getValue("object"); - Resource context = (Resource)bindingSet.getValue("context"); + final Resource subject = (Resource)bindingSet.getValue("subject"); + final URI predicate = (URI)bindingSet.getValue("predicate"); + final Value object = bindingSet.getValue("object"); + final Resource context = (Resource)bindingSet.getValue("context"); if (context == null) { return vf.createStatement(subject, predicate, object); @@ -277,10 +277,11 @@ return new GraphQueryResultImpl(getParsedQuery().getQueryNamespaces(), stIter); } else { + // native construct. // Convert the BindingSet objects to actual RDF statements final ValueFactory vf = getConnection().getRepository().getValueFactory(); - CloseableIteration<? extends Statement, QueryEvaluationException> stIter; + final CloseableIteration<? extends Statement, QueryEvaluationException> stIter; stIter = new BigdataConstructIterator(sailCon.getTripleStore(), bindingsIter, vf); return new GraphQueryResultImpl(getParsedQuery() .getQueryNamespaces(), stIter); Added: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -0,0 +1,63 @@ +package com.bigdata.rdf.sail; + +import info.aduna.iteration.CloseableIteration; + +import java.util.concurrent.ExecutionException; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.bigdata.bop.engine.IRunningQuery; + +/** + * Iteration construct wraps an {@link IRunningQuery} with logic to (a) verify + * that the {@link IRunningQuery} has not encountered an error; and (b) to cancel + * the {@link IRunningQuery} when the iteration is {@link #close() closed}. + * @author thompsonbry + * + * @param <E> + * @param <X> + */ +public class RunningQueryCloseableIteration<E extends BindingSet, X extends QueryEvaluationException> + implements CloseableIteration<E, X> { + + private final IRunningQuery runningQuery; + private final CloseableIteration<E, X> src; + private boolean checkedFuture = false; + + public RunningQueryCloseableIteration(final IRunningQuery runningQuery, + final CloseableIteration<E, X> src) { + + this.runningQuery = runningQuery; + this.src = src; + + } + + public void close() throws X { + runningQuery.cancel(true/* mayInterruptIfRunning */); + src.close(); + } + + public boolean hasNext() throws X { + return src.hasNext(); + } + + public E next() throws X { + if (!checkedFuture && runningQuery.isDone()) { + try { + runningQuery.get(); + } catch (InterruptedException e) { + throw (X) new QueryEvaluationException(e); + } catch (ExecutionException e) { + throw (X) new QueryEvaluationException(e); + } + checkedFuture = true; + } + return src.next(); + } + + public void remove() throws X { + src.remove(); + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; +import java.util.UUID; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -74,8 +75,12 @@ import org.openrdf.sail.SailException; import com.bigdata.LRUNexus; +import com.bigdata.bop.BOpUtility; import com.bigdata.bop.BufferAnnotations; import com.bigdata.bop.IPredicate; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.btree.IndexMetadata; import com.bigdata.journal.AbstractJournal; @@ -709,7 +714,7 @@ return config.timestamp; } - + /** * Respond to a status request. * @@ -719,15 +724,27 @@ * @param params * @return * @throws Exception + * + * @todo add statistics for top-N queries based on query template + * identifiers, which can be communicated using query hints. See // + * wait for the subquery. + * @todo Report on the average query latency, average concurrency of query + * evaluation, etc. */ public Response doStatus(final String uri, final String method, final Properties header, final LinkedHashMap<String, Vector<String>> params) throws Exception { + // SPARQL queries accepted by the SPARQL end point. final boolean showQueries = params.get("showQueries") != null; + // IRunningQuery objects currently running on the query controller. + final boolean showRunningQueries = params.get("showRunningQueries") != null; + + // Information about the KB (stats, properties). final boolean showKBInfo = params.get("showKBInfo") != null; + // bigdata namespaces known to the index manager. final boolean showNamespaces = params.get("showNamespaces") != null; final StringBuilder sb = new StringBuilder(); @@ -789,15 +806,17 @@ } // show the disk access details. - sb.append(jnl.getBufferStrategy().getCounters().toString()+"\n\n"); + sb.append(jnl.getBufferStrategy().getCounters().toString()+"\n"); } if(showQueries) { /* - * Show the queries which are currently executing. + * Show the queries which are currently executing (accepted by the NanoSparqlServer). */ + + sb.append("\n"); final long now = System.nanoTime(); @@ -850,6 +869,86 @@ } + if(showRunningQueries) { + + /* + * Show the queries which are currently executing (actually running + * on the QueryEngine). + */ + + sb.append("\n"); + + final QueryEngine queryEngine = (QueryEngine) QueryEngineFactory + .getQueryController(indexManager); + + final UUID[] queryIds = queryEngine.getRunningQueries(); + +// final long now = System.nanoTime(); + + final TreeMap<Long, IRunningQuery> ages = new TreeMap<Long, IRunningQuery>(new Comparator<Long>() { + /** + * Comparator puts the entries into descending order by the query + * execution time (longest running queries are first). + */ + public int compare(final Long o1, final Long o2) { + if(o1.longValue()<o2.longValue()) return 1; + if(o1.longValue()>o2.longValue()) return -1; + return 0; + } + }); + + for(UUID queryId : queryIds) { + + final IRunningQuery query = queryEngine + .getRunningQuery(queryId); + + if (query == null) { + // Already terminated. + continue; + } + + ages.put(query.getElapsed(), query); + + } + + { + + final Iterator<IRunningQuery> itr = ages.values().iterator(); + + while (itr.hasNext()) { + + final IRunningQuery query = itr.next(); + + if (query.isDone() && query.getCause() != null) { + // Already terminated (normal completion). + continue; + } + + /* + * @todo The runstate and stats could be formatted into an + * HTML table ala QueryLog or RunState. + */ + sb.append("age=" + query.getElapsed() + "ms\n"); + sb.append("queryId=" + query.getQueryId() + "\n"); + sb.append(query.toString()); + sb.append("\n"); + sb.append(BOpUtility.toString(query.getQuery())); + sb.append("\n"); + sb.append("\n"); + +// final long age = query.getElapsed(); +// sb.append("age=" +// + java.util.concurrent.TimeUnit.NANOSECONDS +// .toMillis(age) + "ms, queryId=" +// + query.getQueryId() + "\nquery=" +// + BOpUtility.toString(query.getQuery()) + "\n"); + + } + + } + + } + return new Response(HTTP_OK, MIME_TEXT_PLAIN, sb.toString()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-01-14 18:39:44 UTC (rev 4100) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-01-15 14:53:26 UTC (rev 4101) @@ -169,20 +169,6 @@ PipelineOp left = Rule2BOpUtility.convert( rule, conditionals, idFactory, db, queryEngine, queryHints); - if (!left.getEvaluationContext().equals( - BOpEvaluationContext.CONTROLLER)) { - /* - * Wrap with an operator which will be evaluated on the query - * controller so the results will be streamed back to the query - * controller in scale-out. - */ - left = new SliceOp(new BOp[] { left }, NV.asMap(// - new NV(BOp.Annotations.BOP_ID, idFactory - .incrementAndGet()), // - new NV(BOp.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.CONTROLLER))); - } - /* * Start with left=<this join group> and add a SubqueryOp for each * sub group. @@ -207,6 +193,33 @@ } } + if (!left.getEvaluationContext() + .equals(BOpEvaluationContext.CONTROLLER) + && !(left instanceof SubqueryOp)) { + /* + * Wrap with an operator which will be evaluated on the query + * controller so the results will be streamed back to the query + * controller in scale-out. + * + * @todo For scale-out, we probably need to stream the results back + * to the node from which the subquery was issued. If the subquery + * is issued against the local query engine where the IBindingSet + * was produced, then the that query engine is the query controller + * for the subquery and a SliceOp on the subquery would bring the + * results for the subquery back to that query controller. There is + * no requirement that the query controller for the subquery and the + * query controller for the parent query be the same node. [I am not + * doing this currently in order to test whether there is a problem + * with SliceOp which interactions with SubqueryOp to allow + * incorrect termination under some circumstances. + */ + left = new SliceOp(new BOp[] { left }, NV.asMap(// + new NV(BOp.Annotations.BOP_ID, idFactory + .incrementAndGet()), // + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER))); + } + return left; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |