From: <tho...@us...> - 2013-11-15 17:02:04
|
Revision: 7554 http://bigdata.svn.sourceforge.net/bigdata/?rev=7554&view=rev Author: thompsonbry Date: 2013-11-15 17:01:58 +0000 (Fri, 15 Nov 2013) Log Message: ----------- @Override annotations Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2013-11-15 17:00:14 UTC (rev 7553) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2013-11-15 17:01:58 UTC (rev 7554) @@ -314,6 +314,7 @@ * available and no task is currently running, then drain the work queue and * submit a task to consume that work. */ + @Override protected void consumeChunk() { lock.lock(); try { @@ -722,6 +723,7 @@ } + @Override public void run() { try { @@ -791,6 +793,7 @@ } + @Override public void run() { final QueryEngine queryEngine = getQueryEngine(); @@ -1015,6 +1018,7 @@ /** * A human readable representation of the {@link ChunkTask}'s state. */ + @Override public String toString() { return "ChunkTask" + // "{query=" + getQueryId() + // @@ -1269,6 +1273,7 @@ /** * Evaluate the {@link IChunkMessage}. */ + @Override public Void call() throws Exception { if (log.isDebugEnabled()) log.debug("Running chunk: " + this); @@ -1355,6 +1360,7 @@ } + @Override public void add(final E e) { super.add(e); if (SolutionsLog.solutionsLog.isInfoEnabled()) { @@ -1609,7 +1615,8 @@ chunkSize = 0; smallChunks = null; } - + + @Override synchronized // Note: possible side-effect on internal buffer. public long flush() { if (open) @@ -1618,46 +1625,55 @@ // return sink.flush(); } + @Override public void abort(final Throwable cause) { open = false; q.halt(cause); // sink.abort(cause); } + @Override public void close() { // sink.close(); open = false; } + @Override public Future getFuture() { // return sink.getFuture(); return null; } + @Override public boolean isEmpty() { return true; // return sink.isEmpty(); } + @Override public boolean isOpen() { return open && !q.isDone(); // return sink.isOpen(); } + @Override public IAsynchronousIterator<IBindingSet[]> iterator() { throw new UnsupportedOperationException(); // return sink.iterator(); } + @Override public void reset() { // sink.reset(); } + @Override public void setFuture(Future future) { throw new UnsupportedOperationException(); // sink.setFuture(future); } + @Override public int size() { return 0; // return sink.size(); @@ -1702,6 +1718,7 @@ } + @Override public void run() { try { if (q.isController()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-04 18:30:36
|
Revision: 7718 http://bigdata.svn.sourceforge.net/bigdata/?rev=7718&view=rev Author: thompsonbry Date: 2014-01-04 18:30:29 +0000 (Sat, 04 Jan 2014) Log Message: ----------- ChunkedRunningQuery:: Slight re-ordering of acceptChunk(). Now tests for an empty queue before testing for atOnce evaluation. Also computes the number of solutions across the messages and then conditionally logs that information. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 18:29:11 UTC (rev 7717) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 18:30:29 UTC (rev 7718) @@ -521,22 +521,6 @@ // } // // } - if (!pipelined && !queue.isEmpty() && !isAtOnceReady(bundle.bopId)) { - /* - * This operator is not pipelined, so we need to wait until all - * of its input solutions have been materialized (no prior - * operator in the pipeline is running or has inputs available - * which could cause it to run). - * - * TODO This is where we should examine MAX_MEMORY and the - * buffered data to see whether or not to trigger an evaluation - * pass for the operator based on the data already materialized - * for that operator. - */ - if (log.isDebugEnabled()) - log.debug("Waiting on producer(s): bopId=" + bundle.bopId); - return false; - } if (queue.isEmpty()) { // No work, so remove work queue for (bopId,partitionId). if(removeMapOperatorQueueEntries) @@ -544,6 +528,31 @@ return false; } /* + * true iff operator requires at once evaluation and all solutions + * are now available for that operator. + */ + boolean atOnceReady = false; + if (!pipelined) { + if (!isAtOnceReady(bundle.bopId)) { + /* + * This operator is not pipelined, so we need to wait until + * all of its input solutions have been materialized (no + * prior operator in the pipeline is running or has inputs + * available which could cause it to run). + * + * TODO This is where we should examine MAX_MEMORY and the + * buffered data to see whether or not to trigger an + * evaluation pass for the operator based on the data + * already materialized for that operator. + */ + if (log.isDebugEnabled()) + log.debug("Waiting on producer(s): bopId=" + + bundle.bopId); + return false; + } + atOnceReady = true; + } + /* * Drain the work queue for that (bopId,partitionId). * * Note: If the operator is pipelined, then we do not drain more @@ -553,104 +562,116 @@ */ final List<IChunkMessage<IBindingSet>> accepted = new LinkedList<IChunkMessage<IBindingSet>>(); try { - /* - * Note: Once we drain these messages from the work queue we are - * responsible for calling release() on them. - */ - queue.drainTo(accepted, pipelined ? maxMessagesPerTask - : Integer.MAX_VALUE); - // #of messages accepted from the work queue. - final int naccepted = accepted.size(); - // #of messages remaining on the work queue. - final int nremaining = queue.size(); - if(nremaining == 0) { - // Remove the work queue for that (bopId,partitionId). - if(removeMapOperatorQueueEntries) - if(queue != operatorQueues.remove(bundle)) throw new AssertionError(); - } else if(pipelined) { /* - * After removing the maximum amount from a pipelined operator, - * the work queue is still not empty. + * Note: Once we drain these messages from the work queue we are + * responsible for calling release() on them. */ - if (log.isInfoEnabled()) - log.info("Work queue is over capacity: bundle=" + bundle - + ", naccepted=" + naccepted + ", nremaining=" - + nremaining + ", maxMessagesPerTask=" - + maxMessagesPerTask + ", runState=" - + runStateString()); - } - /* - * Combine the messages into a single source to be consumed by a - * task. - */ - int nassigned = 1; - final Iterator<IChunkMessage<IBindingSet>> mitr = accepted.iterator(); - final IChunkMessage<IBindingSet> firstChunk = mitr.next(); - // See BOpContext#isLastInvocation() - final boolean isLastInvocation = pipelined -// && nremaining == 0 -// && maxParallel == 1 -// && isOperatorDone(bundle.bopId) - && firstChunk.isLastInvocation() - ; - /* - * Note: There is no longer any reliance on the IAsynchronous - * Iterator API here. It is perfectly sufficient to only - * implement ICloseableIterator. Query operator and chunk - * message implementations should be revisited with this - * simplifying assumption in mind. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/475 - */ - final IMultiSourceCloseableIterator<IBindingSet[]> source = new MultiSourceSequentialCloseableIterator<IBindingSet[]>(// -// accepted.remove(0).getChunkAccessor().iterator()// - firstChunk.getChunkAccessor().iterator()// - ); -// for (IChunkMessage<IBindingSet> msg : accepted) { -// source.add(msg.getChunkAccessor().iterator()); - while(mitr.hasNext()) { - source.add(mitr.next().getChunkAccessor().iterator()); - nassigned++; - } - if (nassigned != naccepted) - throw new AssertionError(); - /* - * Create task to consume that source. - */ - final ChunkFutureTask cft; - try { - cft = new ChunkFutureTask( - new ChunkTask(bundle.bopId, bundle.shardId, - naccepted, isLastInvocation, source)); - } catch (Throwable t2) { - // Ensure accepted messages are released(); - safeRelease(accepted); - halt(t2); // ensure query halts. - if (getCause() != null) { - // Abnormal termination - wrap and rethrow. - throw new RuntimeException(t2); + queue.drainTo(accepted, pipelined ? maxMessagesPerTask + : Integer.MAX_VALUE); + // #of messages accepted from the work queue. + final int naccepted = accepted.size(); + // #of messages remaining on the work queue. + final int nremaining = queue.size(); + if (nremaining == 0) { + // Remove the work queue for that (bopId,partitionId). + if(removeMapOperatorQueueEntries) + if(queue != operatorQueues.remove(bundle)) throw new AssertionError(); + } else if (pipelined) { + /* + * After removing the maximum amount from a pipelined operator, + * the work queue is still not empty. + */ + if (log.isInfoEnabled()) + log.info("Work queue is over capacity: bundle=" + bundle + + ", naccepted=" + naccepted + ", nremaining=" + + nremaining + ", maxMessagesPerTask=" + + maxMessagesPerTask + ", runState=" + + runStateString()); } - // normal termination - swallow the exception. - return false; - } - /* - * Save the Future for this task. Together with the logic above this - * may be used to limit the #of concurrent tasks per (bopId,shardId) - * to one for a given query. - */ - if (map == null) { - map = new ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>(); - operatorFutures.put(bundle, map); - } - map.put(cft, cft); - /* - * Submit task for execution (asynchronous). - */ - if (log.isDebugEnabled()) - log.debug("Running task: bop=" + bundle.bopId + ", naccepted=" - + naccepted+", runState="+runStateString()); - getQueryEngine().execute(cft); - return true; + /* + * Combine the messages into a single source to be consumed by a + * task. + */ + int nassigned = 1; + final Iterator<IChunkMessage<IBindingSet>> mitr = accepted.iterator(); + final IChunkMessage<IBindingSet> firstChunk = mitr.next(); + // See BOpContext#isLastInvocation() + final boolean isLastInvocation = pipelined + // && nremaining == 0 + // && maxParallel == 1 + // && isOperatorDone(bundle.bopId) + && firstChunk.isLastInvocation() + ; + /* + * Note: There is no longer any reliance on the IAsynchronous + * Iterator API here. It is perfectly sufficient to only + * implement ICloseableIterator. Query operator and chunk + * message implementations should be revisited with this + * simplifying assumption in mind. + * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/475 + */ + final IMultiSourceCloseableIterator<IBindingSet[]> source = new MultiSourceSequentialCloseableIterator<IBindingSet[]>(// + // accepted.remove(0).getChunkAccessor().iterator()// + firstChunk.getChunkAccessor().iterator()// + ); + // for (IChunkMessage<IBindingSet> msg : accepted) { + // source.add(msg.getChunkAccessor().iterator()); + // #of solutions accepted across those chunk messages. + final long solutionsAccepted; + { + long na = firstChunk.getSolutionCount(); + while (mitr.hasNext()) { + final IChunkMessage<IBindingSet> msg = mitr.next(); + na += msg.getSolutionCount(); + source.add(msg.getChunkAccessor().iterator()); + nassigned++; + } + solutionsAccepted = na; + } + if (nassigned != naccepted) + throw new AssertionError(); + /* + * Create task to consume that source. + */ + final ChunkFutureTask cft; + try { + cft = new ChunkFutureTask( + new ChunkTask(bundle.bopId, bundle.shardId, + naccepted, isLastInvocation, source)); + } catch (Throwable t2) { + // Ensure accepted messages are released(); + safeRelease(accepted); + halt(t2); // ensure query halts. + if (getCause() != null) { + // Abnormal termination - wrap and rethrow. + throw new RuntimeException(t2); + } + // normal termination - swallow the exception. + return false; + } + /* + * Save the Future for this task. Together with the logic above this + * may be used to limit the #of concurrent tasks per (bopId,shardId) + * to one for a given query. + */ + if (map == null) { + map = new ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>(); + operatorFutures.put(bundle, map); + } + map.put(cft, cft); + /* + * Submit task for execution (asynchronous). + */ + if (log.isDebugEnabled()) { + log.debug("Running task: bop=" + bundle.bopId + + ", atOnceReady=" + atOnceReady + ", bop=" + + bop.toShortString() + ", messages=" + naccepted + + ", solutions=" + solutionsAccepted + + ", runState=" + runStateString()); + } + getQueryEngine().execute(cft); + return true; } catch(Throwable t) { // Ensure accepted messages are released(); safeRelease(accepted); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-04 20:04:03
|
Revision: 7721 http://bigdata.svn.sourceforge.net/bigdata/?rev=7721&view=rev Author: thompsonbry Date: 2014-01-04 20:03:56 +0000 (Sat, 04 Jan 2014) Log Message: ----------- logging change. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 19:48:22 UTC (rev 7720) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 20:03:56 UTC (rev 7721) @@ -663,13 +663,12 @@ /* * Submit task for execution (asynchronous). */ - if (log.isDebugEnabled()) { - log.debug("Running task: bop=" + bundle.bopId - + ", atOnceReady=" + atOnceReady + ", bop=" + if (log.isInfoEnabled()) + log.info("Running task: bop=" + bundle.bopId + + (pipelined?"":", atOnceReady=" + atOnceReady) + ", bop=" + bop.toShortString() + ", messages=" + naccepted + ", solutions=" + solutionsAccepted - + ", runState=" + runStateString()); - } + + (log.isDebugEnabled()?", runState=" + runStateString():"")); getQueryEngine().execute(cft); return true; } catch(Throwable t) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |