From: <tho...@us...> - 2014-01-04 18:30:36
|
Revision: 7718 http://bigdata.svn.sourceforge.net/bigdata/?rev=7718&view=rev Author: thompsonbry Date: 2014-01-04 18:30:29 +0000 (Sat, 04 Jan 2014) Log Message: ----------- ChunkedRunningQuery:: Slight re-ordering of acceptChunk(). Now tests for an empty queue before testing for atOnce evaluation. Also computes the number of solutions across the messages and then conditionally logs that information. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 18:29:11 UTC (rev 7717) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2014-01-04 18:30:29 UTC (rev 7718) @@ -521,22 +521,6 @@ // } // // } - if (!pipelined && !queue.isEmpty() && !isAtOnceReady(bundle.bopId)) { - /* - * This operator is not pipelined, so we need to wait until all - * of its input solutions have been materialized (no prior - * operator in the pipeline is running or has inputs available - * which could cause it to run). - * - * TODO This is where we should examine MAX_MEMORY and the - * buffered data to see whether or not to trigger an evaluation - * pass for the operator based on the data already materialized - * for that operator. - */ - if (log.isDebugEnabled()) - log.debug("Waiting on producer(s): bopId=" + bundle.bopId); - return false; - } if (queue.isEmpty()) { // No work, so remove work queue for (bopId,partitionId). if(removeMapOperatorQueueEntries) @@ -544,6 +528,31 @@ return false; } /* + * true iff operator requires at once evaluation and all solutions + * are now available for that operator. + */ + boolean atOnceReady = false; + if (!pipelined) { + if (!isAtOnceReady(bundle.bopId)) { + /* + * This operator is not pipelined, so we need to wait until + * all of its input solutions have been materialized (no + * prior operator in the pipeline is running or has inputs + * available which could cause it to run). + * + * TODO This is where we should examine MAX_MEMORY and the + * buffered data to see whether or not to trigger an + * evaluation pass for the operator based on the data + * already materialized for that operator. + */ + if (log.isDebugEnabled()) + log.debug("Waiting on producer(s): bopId=" + + bundle.bopId); + return false; + } + atOnceReady = true; + } + /* * Drain the work queue for that (bopId,partitionId). * * Note: If the operator is pipelined, then we do not drain more @@ -553,104 +562,116 @@ */ final List<IChunkMessage<IBindingSet>> accepted = new LinkedList<IChunkMessage<IBindingSet>>(); try { - /* - * Note: Once we drain these messages from the work queue we are - * responsible for calling release() on them. - */ - queue.drainTo(accepted, pipelined ? maxMessagesPerTask - : Integer.MAX_VALUE); - // #of messages accepted from the work queue. - final int naccepted = accepted.size(); - // #of messages remaining on the work queue. - final int nremaining = queue.size(); - if(nremaining == 0) { - // Remove the work queue for that (bopId,partitionId). - if(removeMapOperatorQueueEntries) - if(queue != operatorQueues.remove(bundle)) throw new AssertionError(); - } else if(pipelined) { /* - * After removing the maximum amount from a pipelined operator, - * the work queue is still not empty. + * Note: Once we drain these messages from the work queue we are + * responsible for calling release() on them. */ - if (log.isInfoEnabled()) - log.info("Work queue is over capacity: bundle=" + bundle - + ", naccepted=" + naccepted + ", nremaining=" - + nremaining + ", maxMessagesPerTask=" - + maxMessagesPerTask + ", runState=" - + runStateString()); - } - /* - * Combine the messages into a single source to be consumed by a - * task. - */ - int nassigned = 1; - final Iterator<IChunkMessage<IBindingSet>> mitr = accepted.iterator(); - final IChunkMessage<IBindingSet> firstChunk = mitr.next(); - // See BOpContext#isLastInvocation() - final boolean isLastInvocation = pipelined -// && nremaining == 0 -// && maxParallel == 1 -// && isOperatorDone(bundle.bopId) - && firstChunk.isLastInvocation() - ; - /* - * Note: There is no longer any reliance on the IAsynchronous - * Iterator API here. It is perfectly sufficient to only - * implement ICloseableIterator. Query operator and chunk - * message implementations should be revisited with this - * simplifying assumption in mind. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/475 - */ - final IMultiSourceCloseableIterator<IBindingSet[]> source = new MultiSourceSequentialCloseableIterator<IBindingSet[]>(// -// accepted.remove(0).getChunkAccessor().iterator()// - firstChunk.getChunkAccessor().iterator()// - ); -// for (IChunkMessage<IBindingSet> msg : accepted) { -// source.add(msg.getChunkAccessor().iterator()); - while(mitr.hasNext()) { - source.add(mitr.next().getChunkAccessor().iterator()); - nassigned++; - } - if (nassigned != naccepted) - throw new AssertionError(); - /* - * Create task to consume that source. - */ - final ChunkFutureTask cft; - try { - cft = new ChunkFutureTask( - new ChunkTask(bundle.bopId, bundle.shardId, - naccepted, isLastInvocation, source)); - } catch (Throwable t2) { - // Ensure accepted messages are released(); - safeRelease(accepted); - halt(t2); // ensure query halts. - if (getCause() != null) { - // Abnormal termination - wrap and rethrow. - throw new RuntimeException(t2); + queue.drainTo(accepted, pipelined ? maxMessagesPerTask + : Integer.MAX_VALUE); + // #of messages accepted from the work queue. + final int naccepted = accepted.size(); + // #of messages remaining on the work queue. + final int nremaining = queue.size(); + if (nremaining == 0) { + // Remove the work queue for that (bopId,partitionId). + if(removeMapOperatorQueueEntries) + if(queue != operatorQueues.remove(bundle)) throw new AssertionError(); + } else if (pipelined) { + /* + * After removing the maximum amount from a pipelined operator, + * the work queue is still not empty. + */ + if (log.isInfoEnabled()) + log.info("Work queue is over capacity: bundle=" + bundle + + ", naccepted=" + naccepted + ", nremaining=" + + nremaining + ", maxMessagesPerTask=" + + maxMessagesPerTask + ", runState=" + + runStateString()); } - // normal termination - swallow the exception. - return false; - } - /* - * Save the Future for this task. Together with the logic above this - * may be used to limit the #of concurrent tasks per (bopId,shardId) - * to one for a given query. - */ - if (map == null) { - map = new ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>(); - operatorFutures.put(bundle, map); - } - map.put(cft, cft); - /* - * Submit task for execution (asynchronous). - */ - if (log.isDebugEnabled()) - log.debug("Running task: bop=" + bundle.bopId + ", naccepted=" - + naccepted+", runState="+runStateString()); - getQueryEngine().execute(cft); - return true; + /* + * Combine the messages into a single source to be consumed by a + * task. + */ + int nassigned = 1; + final Iterator<IChunkMessage<IBindingSet>> mitr = accepted.iterator(); + final IChunkMessage<IBindingSet> firstChunk = mitr.next(); + // See BOpContext#isLastInvocation() + final boolean isLastInvocation = pipelined + // && nremaining == 0 + // && maxParallel == 1 + // && isOperatorDone(bundle.bopId) + && firstChunk.isLastInvocation() + ; + /* + * Note: There is no longer any reliance on the IAsynchronous + * Iterator API here. It is perfectly sufficient to only + * implement ICloseableIterator. Query operator and chunk + * message implementations should be revisited with this + * simplifying assumption in mind. + * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/475 + */ + final IMultiSourceCloseableIterator<IBindingSet[]> source = new MultiSourceSequentialCloseableIterator<IBindingSet[]>(// + // accepted.remove(0).getChunkAccessor().iterator()// + firstChunk.getChunkAccessor().iterator()// + ); + // for (IChunkMessage<IBindingSet> msg : accepted) { + // source.add(msg.getChunkAccessor().iterator()); + // #of solutions accepted across those chunk messages. + final long solutionsAccepted; + { + long na = firstChunk.getSolutionCount(); + while (mitr.hasNext()) { + final IChunkMessage<IBindingSet> msg = mitr.next(); + na += msg.getSolutionCount(); + source.add(msg.getChunkAccessor().iterator()); + nassigned++; + } + solutionsAccepted = na; + } + if (nassigned != naccepted) + throw new AssertionError(); + /* + * Create task to consume that source. + */ + final ChunkFutureTask cft; + try { + cft = new ChunkFutureTask( + new ChunkTask(bundle.bopId, bundle.shardId, + naccepted, isLastInvocation, source)); + } catch (Throwable t2) { + // Ensure accepted messages are released(); + safeRelease(accepted); + halt(t2); // ensure query halts. + if (getCause() != null) { + // Abnormal termination - wrap and rethrow. + throw new RuntimeException(t2); + } + // normal termination - swallow the exception. + return false; + } + /* + * Save the Future for this task. Together with the logic above this + * may be used to limit the #of concurrent tasks per (bopId,shardId) + * to one for a given query. + */ + if (map == null) { + map = new ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask>(); + operatorFutures.put(bundle, map); + } + map.put(cft, cft); + /* + * Submit task for execution (asynchronous). + */ + if (log.isDebugEnabled()) { + log.debug("Running task: bop=" + bundle.bopId + + ", atOnceReady=" + atOnceReady + ", bop=" + + bop.toShortString() + ", messages=" + naccepted + + ", solutions=" + solutionsAccepted + + ", runState=" + runStateString()); + } + getQueryEngine().execute(cft); + return true; } catch(Throwable t) { // Ensure accepted messages are released(); safeRelease(accepted); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |