From: <tho...@us...> - 2010-09-16 10:55:33
|
Revision: 3564 http://bigdata.svn.sourceforge.net/bigdata/?rev=3564&view=rev Author: thompsonbry Date: 2010-09-16 10:55:26 +0000 (Thu, 16 Sep 2010) Log Message: ----------- Working through SliceOp integration for standalone and scale-out. It currently cancels the query, which results in an exception being reported by RunningQuery.get(). That might be Ok, but the unit test needs to be updated and we need to report out the statistics anyway. I am also looking at termination conditions when a message is routed from a query peer to the query controller in scale-out, which is what happens for a SliceOp since it is evaluated at the query controller. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -168,7 +168,7 @@ * Perhaps the right thing is to expose an object with a richer API * for obtaining various kinds of iterators or even access to the * direct {@link ByteBuffer}s backing the data (for high volume joins, - * exernal merge sorts, etc). + * external merge sorts, etc). */ public final IAsynchronousIterator<E[]> getSource() { return source; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -33,10 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; - import com.bigdata.bop.BOp.Annotations; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.AbstractNode; @@ -54,7 +51,7 @@ */ public class BOpUtility { - private static final Logger log = Logger.getLogger(BOpUtility.class); +// private static final Logger log = Logger.getLogger(BOpUtility.class); /** * Pre-order recursive visitation of the operator tree (arguments only, no Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkAccessor.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -57,14 +57,14 @@ /** * Visit the binding sets in the chunk. * - * @deprecated We do not need to use {@link IAsynchronousIterator} any more. - * This could be much more flexible and should be harmonized to - * support high volume operators, GPU operators, etc. probably - * the right thing to do is introduce another interface here - * with a getChunk():IChunk where IChunk let's you access the - * chunks data in different ways (and chunks can be both - * {@link IBindingSet}[]s and element[]s so we might need to - * raise that into the interfaces and/or generics as well). + * @todo We do not need to use {@link IAsynchronousIterator} any more. This + * could be much more flexible and should be harmonized to support + * high volume operators, GPU operators, etc. probably the right thing + * to do is introduce another interface here with a getChunk():IChunk + * where IChunk let's you access the chunks data in different ways + * (and chunks can be both {@link IBindingSet}[]s and element[]s so we + * might need to raise that into the interfaces and/or generics as + * well). * * @todo It is likely that we can convert to the use of * {@link BlockingQueue} instead of {@link BlockingBuffer} in the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -179,9 +179,9 @@ TableLog.tableLog.info("\n\nqueryId=" + queryId + "\n"); // TableLog.tableLog.info(query.getQuery().toString()+"\n"); TableLog.tableLog.info(getTableHeader()); - TableLog.tableLog - .info(getTableRow("startQ", serviceId, msg.getBOpId(), - -1/* shardId */, 1/* fanIn */, null/* stats */)); + TableLog.tableLog.info(getTableRow("startQ", serviceId, msg + .getBOpId(), -1/* shardId */, 1/* fanIn */, + null/* cause */, null/* stats */)); } // System.err.println("startQ : nstep="+nsteps+", bopId=" + bopId @@ -258,8 +258,9 @@ if (TableLog.tableLog.isInfoEnabled()) { TableLog.tableLog - .info(getTableRow("startOp", msg.serviceId, msg.bopId, - msg.partitionId, msg.nchunks/* fanIn */, null/* stats */)); +.info(getTableRow("startOp", msg.serviceId, + msg.bopId, msg.partitionId, msg.nchunks/* fanIn */, + null/* cause */, null/* stats */)); } // check deadline. @@ -363,7 +364,8 @@ if (TableLog.tableLog.isInfoEnabled()) { TableLog.tableLog.info(getTableRow("haltOp", msg.serviceId, - msg.bopId, msg.partitionId, fanOut, msg.taskStats)); + msg.bopId, msg.partitionId, fanOut, msg.cause, + msg.taskStats)); } // if (log.isTraceEnabled()) @@ -409,6 +411,7 @@ query.cancel(true/* mayInterruptIfRunning */); } + return isDone; } @@ -484,6 +487,8 @@ sb.append("\tbop"); + sb.append("\tcause"); + sb.append("\tstats"); sb.append('\n'); @@ -510,13 +515,18 @@ * specific index partition. * @param fanIO * The fanIn (startQ,startOp) or fanOut (haltOp). + * @param cause + * The {@link Throwable} in a {@link HaltOpMessage} and + * <code>null</code> for other messages or if the + * {@link Throwable} was null. * @param stats * The statistics from the operator evaluation and - * <code>null</code> unless {@link #haltOp(HaltOpMessage)} is - * the invoker. + * <code>null</code> unless {@link #haltOp(HaltOpMessage)} is the + * invoker. */ private String getTableRow(final String label, final UUID serviceId, final int bopId, final int shardId, final int fanIO, + final Throwable cause, final BOpStats stats) { final StringBuilder sb = new StringBuilder(); @@ -558,12 +568,19 @@ sb.append('\t'); sb.append(serviceId == null ? "N/A" : serviceId.toString()); + // the operator. sb.append('\t'); sb.append(query.bopIndex.get(bopId)); + + // the thrown cause. + sb.append('\t'); + if (cause != null) + sb.append(cause.getLocalizedMessage()); + // the statistics. + sb.append('\t'); if (stats != null) { // @todo use a multi-column version of stats. - sb.append('\t'); sb.append(stats.toString()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -51,7 +51,6 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.bset.CopyBindingSetOp; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; @@ -125,14 +124,12 @@ /** * The buffer used for the overall output of the query pipeline. - * - * FIXME SCALEOUT: This should only exist on the query controller. Other - * nodes will send {@link IChunkMessage}s to the query controller. s/o will - * use an operator with {@link BOpEvaluationContext#CONTROLLER} in order to - * ensure that the results are transferred to the query controller. When a - * {@link SliceOp} is used, this is redundant. The operator in other cases - * can be a {@link CopyBindingSetOp} whose {@link BOpEvaluationContext} has - * been overridden. + * <p> + * Note: In scale out, this only exists on the query controller. In order to + * ensure that the results are transferred to the query controller, the + * top-level operator in the query plan must specify + * {@link BOpEvaluationContext#CONTROLLER}. For example, {@link SliceOp} + * uses this {@link BOpEvaluationContext}. */ final private IBlockingBuffer<IBindingSet[]> queryBuffer; @@ -330,8 +327,15 @@ runState = controller ? new RunState(this) : null; - this.queryBuffer = newQueryBuffer(); + // Note: only exists on the query controller. + this.queryBuffer = controller ? newQueryBuffer() : null; +// System.err +// .println("new RunningQuery:: queryId=" + queryId +// + ", isController=" + controller + ", queryController=" +// + clientProxy + ", queryEngine=" +// + queryEngine.getServiceUUID()); + } /** @@ -619,6 +623,12 @@ /** Alias for the {@link ChunkTask}'s logger. */ private final Logger log = chunkTaskLog; + /** + * The message with the materialized chunk to be consumed by the + * operator. + */ + final IChunkMessage<IBindingSet> msg; + /** The index of the bop which is being evaluated. */ private final int bopId; @@ -682,13 +692,20 @@ * by {@link PipelineOp#eval(BOpContext)} in order to handle the outputs * written on those sinks. * - * @param chunk + * @param msg * A message containing the materialized chunk and metadata * about the operator which will consume that chunk. + * + * @throws IllegalStateException + * unless {@link IChunkMessage#isMaterialized()} is + * <code>true</code>. */ - public ChunkTask(final IChunkMessage<IBindingSet> chunk) { - bopId = chunk.getBOpId(); - partitionId = chunk.getPartitionId(); + public ChunkTask(final IChunkMessage<IBindingSet> msg) { + if(!msg.isMaterialized()) + throw new IllegalStateException(); + this.msg = msg; + bopId = msg.getBOpId(); + partitionId = msg.getPartitionId(); bop = bopIndex.get(bopId); if (bop == null) { throw new NoSuchBOpException(bopId); @@ -740,9 +757,9 @@ altSink = altSinkId == null ? null : op.newBuffer(); - // context + // context : @todo pass in IChunkMessage or IChunkAccessor context = new BOpContext<IBindingSet>(RunningQuery.this, - partitionId, op.newStats(), chunk.getChunkAccessor() + partitionId, op.newStats(), msg.getChunkAccessor() .iterator(), sink, altSink); // FutureTask for operator execution (not running yet). @@ -762,8 +779,7 @@ clientProxy.startOp(new StartOpMessage(queryId, bopId, partitionId, serviceId, fanIn)); if (log.isDebugEnabled()) - log.debug("Running chunk: queryId=" + queryId + ", bopId=" - + bopId + ", bop=" + bop); + log.debug("Running chunk: " + msg); ft.run(); // run ft.get(); // verify success if (sink != null && sink != queryBuffer && !sink.isEmpty()) { @@ -835,14 +851,20 @@ } // run() } // class ChunkTask - + /** * Return an iterator which will drain the solutions from the query. The * query will be cancelled if the iterator is * {@link ICloseableIterator#close() closed}. + * + * @throws UnsupportedOperationException + * if this is not the query controller. */ public IAsynchronousIterator<IBindingSet[]> iterator() { + if(!controller) + throw new UnsupportedOperationException(); + return queryBuffer.iterator(); } @@ -872,11 +894,35 @@ * <li>must not cause the solutions to be discarded before the client can * consume them.</li> * </ul> + * + * FIXME SCALEOUT: Each query engine peer touched by the running query (or + * known to have an operator task running at the time that the query was + * halted) must be notified that the query has been terminated and the + * receiving query engines must interrupt any running tasks which they have + * locally for that query. + * <p> + * Since this involves RMI to the nodes, we should not issue those RMIs + * while holding the {@link #runStateLock} (and this could even deadlock + * with callback from those nodes). Perhaps + * {@link RunState#haltOp(HaltOpMessage)} should throw back the + * {@link HaltOpMessage} or a {@link TimeoutException} if the deadline has + * expired and then let {@link RunningQuery#haltOp(HaltOpMessage)} handle + * the termination of the query, which it can do without holding the lock. + * <p> + * When the controller sends a node a terminate signal for an operator, it + * should not bother to RMI back to the controller (unless this is done for + * the purposes of confirmation, which is available from the RMI return in + * any case). + * + * FIXME SCALEOUT: Life cycle methods for operators must have hooks for the + * operator implementations which are evaluated on the query controller + * (here) but also on the nodes on which the query will run (for hash + * partitioned operators). */ final public boolean cancel(final boolean mayInterruptIfRunning) { // halt the query. boolean cancelled = future.cancel(mayInterruptIfRunning); - // cancel any running operators for this query. + // cancel any running operators for this query on this node. for (Future<?> f : operatorFutures.values()) { if (f.cancel(mayInterruptIfRunning)) cancelled = true; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -359,18 +359,18 @@ + getServiceUUID() + ", msg=" + msg); } - // request from the query controller. + // request from the query controller (RMI). final BindingSetPipelineOp query = msg.getQueryController() .getQuery(msg.getQueryId()); q = newRunningQuery(FederatedQueryEngine.this, queryId, - isController, msg.getQueryController(), query); + false/* controller */, msg.getQueryController(), query); final RunningQuery tmp = runningQueries.putIfAbsent(queryId, q); if(tmp != null) { - // another thread won this race. + // another thread won this race : @todo memoize, RMI is too expensive. q = (FederatedRunningQuery) tmp; } @@ -424,8 +424,8 @@ final boolean controller, final IQueryClient clientProxy, final BindingSetPipelineOp query) { - return new FederatedRunningQuery(this, queryId, true/* controller */, - this/* clientProxy */, query); + return new FederatedRunningQuery(this, queryId, controller, + clientProxy, query); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -42,13 +42,14 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IShardwisePipelineOp; -import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryPeer; +import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.RunningQuery; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; +import com.bigdata.journal.TemporaryStoreFactory; import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -89,14 +90,24 @@ * this query. */ private final UUID queryControllerUUID; - + /** * A map associating resources with running queries. When a query halts, the * resources listed in its resource map are released. Resources can include * {@link ByteBuffer}s backing either incoming or outgoing - * {@link LocalChunkMessage}s, temporary files associated with the query, hash - * tables, etc. + * {@link LocalChunkMessage}s, temporary files associated with the query, + * hash tables, etc. * + * @todo The {@link IAllocationContext} allows us to automatically release + * native {@link ByteBuffer}s used by the query. Such buffers do not + * need to be part of this map. This means that the only real use for + * the map will be temporary persistent resources, such as graphs or + * hash tables backed by a local file or the intermediate outputs of a + * sort operator. We may be able to manage the local persistent data + * structures using the {@link TemporaryStoreFactory} and manage the + * life cycle of the intermediate results for sort within its operator + * implementation. + * * @todo This map will eventually need to be moved into {@link RunningQuery} * in order to support temporary graphs or other disk-backed resources * associated with the evaluation of a query against a standalone @@ -111,8 +122,9 @@ * * @todo Only use the values in the map for transient objects, such as a * hash table which is not backed by the disk. For {@link ByteBuffer}s - * we want to make the references go through the {@link ResourceService} - * . For files, through the {@link ResourceManager}. + * we want to make the references go through the + * {@link ResourceService} . For files, through the + * {@link ResourceManager}. * * @todo We need to track the resources in use by the query so they can be * released when the query terminates. This includes: buffers; joins @@ -292,7 +304,7 @@ if(serviceUUID.equals(getQueryEngine().getServiceUUID())) { - // Return a hard reference to the query engine (NOT a proxy). + // Return a hard reference to this query engine (NOT a proxy). return getQueryEngine(); } else if (serviceUUID.equals(queryControllerUUID)) { @@ -369,6 +381,9 @@ switch (bop.getEvaluationContext()) { case ANY: { + /* + * This operator may be evaluated anywhere. + */ return super.handleOutputChunk(sinkId, sink); } case HASHED: { @@ -490,11 +505,7 @@ sendChunkMessage(queryControllerUUID, sinkId, -1/* partitionId */, allocationContext, sink); - /* - * Chunks send to the query controller do not keep the query - * running. - */ - return 0; + return 1; } default: @@ -588,7 +599,7 @@ if (source.isEmpty()) throw new RuntimeException(); - // The peer to be notified. + // The peer to whom we send the message. final IQueryPeer peerProxy = getQueryPeer(serviceUUID); if (peerProxy == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -395,8 +395,10 @@ slice[j] = bset; if (log.isTraceEnabled()) - log.trace("Mapping: keyOrder=" + keyOrder + ",bset=" + bset - + " onto " + split.pmd.getPartitionId()); + log + .trace("Mapping: keyOrder=" + keyOrder + ",bset=" + + bset + " onto partitionId=" + + split.pmd.getPartitionId()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -511,7 +511,7 @@ } public boolean isExhausted() { - return hasNext(); + return !hasNext(); } public E[] next(long timeout, TimeUnit unit) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -235,6 +235,7 @@ } + @SuppressWarnings("unchecked") public boolean hasNext() { if (current != null) @@ -293,7 +294,7 @@ } public boolean isExhausted() { - return hasNext(); + return !hasNext(); } public E[] next(long timeout, TimeUnit unit) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -97,7 +97,7 @@ public class PipelineJoin<E> extends BindingSetPipelineOp implements IShardwisePipelineOp<E> { - static private final Logger log = Logger.getLogger(PipelineJoin.class); + static private final transient Logger log = Logger.getLogger(PipelineJoin.class); /** * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -132,25 +132,25 @@ // // if (!(args[0] instanceof BindingSetPipelineOp)) // throw new IllegalArgumentException(); - + } /** - * @see Annotations#OFFSET + * @see Annotations#OFFSET */ public long getOffset() { - - return (Long) getRequiredProperty(Annotations.OFFSET); - + + return getProperty(Annotations.OFFSET, Annotations.DEFAULT_OFFSET); + } /** - * @see Annotations#LIMIT + * @see Annotations#LIMIT */ public long getLimit() { + + return getProperty(Annotations.LIMIT, Annotations.DEFAULT_LIMIT); - return (Long) getRequiredProperty(Annotations.LIMIT); - } public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-09-16 10:55:26 UTC (rev 3564) @@ -89,7 +89,8 @@ #log4j.logger.com.bigdata.service=ALL #log4j.logger.com.bigdata.bop=ALL -#log4j.logger.com.bigdata.bop.join.PipelineJoin=ALL +log4j.logger.com.bigdata.bop.join.PipelineJoin=ALL + log4j.logger.com.bigdata.bop.engine=ALL log4j.logger.com.bigdata.bop.engine.QueryEngine=ALL log4j.logger.com.bigdata.bop.engine.RunningQuery=ALL @@ -98,6 +99,7 @@ log4j.logger.com.bigdata.bop.fed.FederatedQueryEngine=ALL log4j.logger.com.bigdata.bop.fed.FederatedRunningQuery=ALL log4j.logger.com.bigdata.bop.fed.MapBindingSetsOverShardsBuffer=ALL + #log4j.logger.com.bigdata.relation.rule.eval.RuleLog=INFO #log4j.logger.com.bigdata.relation.rule.eval=INFO #log4j.logger.com.bigdata.relation.rule.eval.RuleState=DEBUG @@ -212,7 +214,7 @@ ## # BOp run state trace (tab delimited file). Uncomment the next line to enable. -#log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog +log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender log4j.appender.queryRunStateLog.Threshold=ALL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -61,6 +61,7 @@ import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.fed.TestFederatedQueryEngine; import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.BufferMode; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; @@ -139,12 +140,12 @@ final R rel = new R(store, namespace, ITx.UNISOLATED, new Properties()); rel.create(); - // data to insert. + // data to insert (in key order for convenience). final E[] a = {// - new E("John", "Mary"),// - new E("Mary", "Paul"),// - new E("Paul", "Leon"),// - new E("Leon", "Paul"),// + new E("John", "Mary"),// [0] + new E("Leon", "Paul"),// [1] + new E("Mary", "Paul"),// [2] + new E("Paul", "Leon"),// [3] }; // insert data (the records are not pre-sorted). @@ -270,7 +271,7 @@ final int predId = 3; final BindingSetPipelineOp query = new PipelineJoin<E>( // left - new CopyBindingSetOp(new BOp[] {}, NV.asMap(new NV[] {// + new StartOp(new BOp[] {}, NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, startId),// })), // right @@ -373,23 +374,141 @@ } /** - * @todo Test ability to impose a limit/offset slice on a query. - * <p> - * Note: While the logic for visiting only the solutions selected by - * the slice can be tested against a mock object, the integration by - * which a slice halts a query when it is satisfied has to be tested - * against a {@link QueryEngine}. - * <p> - * This must also be tested in scale-out to make sure that the data - * backing the solutions is not discarded before the caller can use - * those data. [This could be handled by materializing binding set - * objects out of a {@link ByteBuffer} rather than using a live decode - * of the data in that {@link ByteBuffer}.] + * Run a join with a slice. The slice is always evaluated on the query + * controller so adding it to the query plan touches a slightly different + * code path from adding another join (joins are evaluated shardwise, at + * least in scale-out). + * <p> + * Note: While the logic for visiting only the solutions selected by the + * slice can be tested against a mock object, the integration by which a + * slice halts a query when it is satisfied has to be tested against a + * {@link QueryEngine}. + * <p> + * This must also be tested in scale-out to make sure that the data backing + * the solutions is not discarded before the caller can use those data. + * [This could be handled by materializing binding set objects out of a + * {@link ByteBuffer} rather than using a live decode of the data in that + * {@link ByteBuffer}.] */ - public void test_query_slice() { + public void test_query_slice() throws Exception { - fail("write test"); + final Var<?> x = Var.var("x"); + final Var<?> y = Var.var("y"); + final int startId = 1; + final int joinId = 2; + final int predId = 3; + final int sliceId = 4; + + final StartOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + + final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { + x, y }, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.PARTITION_ID, Integer + .valueOf(-1)),// + new NV(Predicate.Annotations.OPTIONAL, Boolean.FALSE),// + new NV(Predicate.Annotations.CONSTRAINT, null),// + new NV(Predicate.Annotations.EXPANDER, null),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> joinOp = new PipelineJoin<E>(startOp/* left */, + predOp/* right */, + // join annotations + NV.asMap(new NV[] { // + new NV(Predicate.Annotations.BOP_ID, joinId),// + })// + ); + + final BindingSetPipelineOp query = new SliceOp(new BOp[] { joinOp }, + // slice annotations + NV.asMap(new NV[] { // + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(SliceOp.Annotations.OFFSET, 0L),// + new NV(SliceOp.Annotations.LIMIT, 2L),// + })// + ); + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary") }// + ),// + new ArrayBindingSet(// + new IVariable[] { x, y },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ) }; + + final UUID queryId = UUID.randomUUID(); + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1 /* partitionId */, + newBindingSetIterator(new HashBindingSet()))); + + // verify solutions. + assertSameSolutions(expected, runningQuery.iterator()); + + // Wait until the query is done. + final Map<Integer, BOpStats> statsMap = runningQuery.get(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(3, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the join operator. + { + final BOpStats stats = statsMap.get(joinId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join : " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the slice operator. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(2L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + } /** @@ -788,7 +907,7 @@ if (!actual.hasNext()) { fail(msg - + ": Index exhausted while expecting more object(s)" + + ": Iterator exhausted while expecting more object(s)" + ": index=" + j); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -509,6 +509,20 @@ assertEquals(1L, stats.chunksOut.get()); // @todo this depends on which index partitions we read on. } + // validate the stats for the slice operator. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: "+stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(2L, stats.unitsIn.get()); + assertEquals(2L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -70,6 +70,52 @@ } /** + * Unit test for a message with a single chunk containing a single empty + * binding set. + */ + public void test_oneChunkWithEmptyBindingSet() { + + final List<IBindingSet> data = new LinkedList<IBindingSet>(); + { + data.add(new HashBindingSet()); + } + + final IQueryClient queryController = new MockQueryController(); + final UUID queryId = UUID.randomUUID(); + final int bopId = 1; + final int partitionId = 2; + final IBlockingBuffer<IBindingSet[]> source = new BlockingBuffer<IBindingSet[]>( + 10); + + // populate the source. + source.add(data.toArray(new IBindingSet[0])); + + // close the source. + source.close(); + + // build the chunk. + final IChunkMessage<IBindingSet> msg = new ThickChunkMessage<IBindingSet>( + queryController, queryId, bopId, partitionId, source); + + assertTrue(queryController == msg.getQueryController()); + + assertEquals(queryId, msg.getQueryId()); + + assertEquals(bopId, msg.getBOpId()); + + assertEquals(partitionId, msg.getPartitionId()); + + // the data is inline with the message. + assertTrue(msg.isMaterialized()); + + // verify the iterator. + assertSameIterator(data.toArray(new IBindingSet[0]), + new Dechunkerator<IBindingSet>(msg.getChunkAccessor() + .iterator())); + + } + + /** * Unit test for a message with a single chunk of binding sets. */ public void test_oneChunk() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-15 23:01:17 UTC (rev 3563) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java 2010-09-16 10:55:26 UTC (rev 3564) @@ -178,6 +178,10 @@ new NV(SliceOp.Annotations.LIMIT, 3L),// })); + assertEquals("offset", 1L, query.getOffset()); + + assertEquals("limit", 3L, query.getLimit()); + // the expected solutions final IBindingSet[] expected = new IBindingSet[] {// new ArrayBindingSet(// This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |