From: <tho...@us...> - 2010-09-11 22:53:49
|
Revision: 3532 http://bigdata.svn.sourceforge.net/bigdata/?rev=3532&view=rev Author: thompsonbry Date: 2010-09-11 22:53:40 +0000 (Sat, 11 Sep 2010) Log Message: ----------- Further reorganization of the federated query engine and its use of buffers. There are now thick (payload including with RMI) and thin (RMI message with payload via NIO over the resource service) messages for moving chunks around during distributed query processing. There are bare bones unit tests for these as well. (The ResourceServer does not actually use NIO yet, but it can be optimized later. Also, we are not yet applying compression suitable for binding sets, but again that can be an optimization.) Now that RMI messages and payload transfers are more or less in place, I am going to work through some unit tests of distributed query evaluation. To do that I still need to reconcile the concept of a "query buffer" where the final solutions are written with the new model for moving data around. I think that the query buffer will no longer be privledged (it used to be a proxy object for a buffer on the client). Instead, scale-out will require an operator at the top of the query plan whose evaluation context is the query controller. The mere presence of an operator which copies its inputs to its outputs whose evaluation context is the query controller is sufficient to do the trick. For practical purposes, this can be a SliceOp, since that already must run in the query controller context. If an offset/limit are not specified, then they can be set to 0L and MAX_LONG on the SliceOp which has the effect of turning it into a NOP (unless you are visiting an unbelievable #of results). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ThickChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPoolAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/Dechunkerator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/mutation/TestDelete.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/mutation/TestInsert.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestNIOChunkMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestThickChunkMessage.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ChunkMessageWithNIOPayload.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -12,7 +12,7 @@ * be consumed by some {@link BOp} in a specific query (this is only used in * query evaluation for the standalone database). */ -public class BindingSetChunk implements IChunkMessage { +public class BindingSetChunk<E> implements IChunkMessage<E> { /** The query controller. */ private final IQueryClient queryController; @@ -35,7 +35,7 @@ /** * The binding sets to be consumed by that {@link BOp}. */ - private IAsynchronousIterator<IBindingSet[]> source; + private IAsynchronousIterator<E[]> source; public IQueryClient getQueryController() { return queryController; @@ -59,7 +59,7 @@ public BindingSetChunk(final IQueryClient queryController, final long queryId, final int bopId, final int partitionId, - final IAsynchronousIterator<IBindingSet[]> source) { + final IAsynchronousIterator<E[]> source) { if (queryController == null) throw new IllegalArgumentException(); @@ -89,8 +89,12 @@ public void materialize(FederatedRunningQuery runningQuery) { // NOP } + + public void release() { + // NOP + } - public IAsynchronousIterator<IBindingSet[]> iterator() { + public IAsynchronousIterator<E[]> iterator() { return source; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkMessage.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -15,8 +15,18 @@ * processing. There are several implementations of this interface supporting * same-JVM messages, thick RMI messages, and RMI messages where the payload is * materialized using NIO transfers from the {@link ResourceService}. + * + * @param <E> + * The generic type of the elements in the chunk (binding sets, + * elements from a relation, etc). + * + * @todo Compressed representations of binding sets with the ability to read + * them in place or materialize them onto the java heap. The + * representation should be amenable to processing in C since we want to + * use them on GPUs as well. See {@link IChunkMessage} and perhaps + * {@link IRaba}. */ -public interface IChunkMessage { +public interface IChunkMessage<E> { /** The proxy for the query controller. */ IQueryClient getQueryController(); @@ -44,6 +54,11 @@ void materialize(FederatedRunningQuery runningQuery); /** + * Discard the materialized data. + */ + void release(); + + /** * Visit the binding sets in the chunk. * * @todo we do not need to use {@link IAsynchronousIterator} any more. This @@ -67,7 +82,10 @@ * source for processing. For selective operators, those chunks can be * combined before we execute the operator. For unselective operators, * we are going to run over all the data anyway. + * + * @throws IllegalStateException + * if the payload is not materialized. */ - IAsynchronousIterator<IBindingSet[]> iterator(); + IAsynchronousIterator<E[]> iterator(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -5,6 +5,7 @@ import java.util.UUID; import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; import com.bigdata.service.IService; /** @@ -46,6 +47,6 @@ * @throws UnsupportedOperationException * unless running in scale-out. */ - void bufferReady(IChunkMessage msg) throws RemoteException; + void bufferReady(IChunkMessage<IBindingSet> msg) throws RemoteException; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -45,6 +45,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; import com.bigdata.bop.bset.Union; +import com.bigdata.bop.fed.FederatedQueryEngine; import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; @@ -411,11 +412,6 @@ /** * The currently executing queries. - * - * @todo DEADLINE: There should be a data structure representing - * {@link RunningQuery} having deadlines so we can - * {@link RunningQuery#cancel(boolean)} queries when their deadline - * expires. */ final ConcurrentHashMap<Long/* queryId */, RunningQuery> runningQueries = new ConcurrentHashMap<Long, RunningQuery>(); @@ -514,54 +510,39 @@ * if the sink has not been taken, e.g., by combining the chunk into * the same target ByteBuffer, or when we add the chunk to the * RunningQuery.] - * - * @todo SCALEOUT: High volume query operators must demand that their inputs - * are materialized before they can begin evaluation. Scaleout - * therefore requires a separate queue which looks at the metadata - * concerning chunks available on remote nodes for an operator which - * will run on this node and then demands the data either when the - * predecessors in the pipeline are done (operator at once evaluation) - * or when sufficient data are available to run the operator (mega - * chunk pipelining). Once the data are locally materialized, the - * operator may be queued for evaluation. */ private class QueryEngineTask implements Runnable { public void run() { - try { - System.err.println("QueryEngine running: "+this); - while (true) { + System.err.println("QueryEngine running: " + this); + while (true) { + try { final RunningQuery q = priorityQueue.take(); final long queryId = q.getQueryId(); if (q.isCancelled()) continue; - final IChunkMessage chunk = q.chunksIn.poll(); - if (chunk == null) { - // not expected, but can't do anything without a chunk. - if (log.isDebugEnabled()) - log.debug("Dropping chunk: queryId=" + queryId); - continue; - } + final IChunkMessage<IBindingSet> chunk = q.chunksIn.poll(); if (log.isTraceEnabled()) log.trace("Accepted chunk: queryId=" + queryId + ", bopId=" + chunk.getBOpId()); + // create task. try { - // create task. final FutureTask<?> ft = q.newChunkTask(chunk); // execute task. localIndexManager.getExecutorService().execute(ft); } catch (RejectedExecutionException ex) { - // shutdown of the pool (should be an unbounded pool). + // shutdown of the pool (should be an unbounded + // pool). log.warn("Dropping chunk: queryId=" + queryId); continue; - } catch (Throwable ex) { - // log and continue - log.error(ex, ex); - continue; } + } catch (InterruptedException e) { + log.warn("Interrupted."); + return; + } catch (Throwable ex) { + // log and continue + log.error(ex, ex); + continue; } - } catch (InterruptedException e) { - log.warn("Interrupted."); - return; } } } // QueryEngineTask @@ -579,7 +560,7 @@ * @throws IllegalStateException * if the chunk is not materialized. */ - void acceptChunk(final IChunkMessage chunk) { + void acceptChunk(final IChunkMessage<IBindingSet> chunk) { if (chunk == null) throw new IllegalArgumentException(); @@ -625,9 +606,25 @@ } + // hook for subclasses. + didShutdown(); + + // stop the query engine. + final Future<?> f = engineFuture.get(); + if (f != null) + f.cancel(true/* mayInterruptIfRunning */); + } /** + * Hook is notified by {@link #shutdown()} when all running queries have + * terminated. + */ + protected void didShutdown() { + + } + + /** * Do not accept new queries and halt any running binding set chunk tasks. */ public void shutdownNow() { @@ -686,7 +683,7 @@ } - public void bufferReady(IChunkMessage msg) { + public void bufferReady(IChunkMessage<IBindingSet> msg) { throw new UnsupportedOperationException(); @@ -844,6 +841,10 @@ * @todo Could return a data structure which encapsulates the query results * and could allow multiple results from a query, e.g., one per step * in a program. + * + * @deprecated This is going away. + * + * @see FederatedQueryEngine#newQueryBuffer(BindingSetPipelineOp) */ protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( final BindingSetPipelineOp query) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -48,13 +48,15 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; +import com.bigdata.bop.bset.CopyBindingSetOp; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.ITx; -import com.bigdata.journal.TimestampUtility; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.service.IBigdataFederation; @@ -90,24 +92,15 @@ final private long queryId; // /** -// * The timestamp or transaction identifier against which the query is -// * reading. -// */ -// final private long readTimestamp; -// -// /** -// * The timestamp or transaction identifier against which the query is -// * writing. -// */ -// final private long writeTimestamp; - -// /** // * The timestamp when the query was accepted by this node (ms). // */ // final private long begin; + /** * The query deadline. The value is the system clock time in milliseconds * when the query is due and {@link Long#MAX_VALUE} if there is no deadline. + * In order to have a guarantee of a consistent clock, the deadline is + * interpreted by the query controller. */ final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); @@ -132,8 +125,6 @@ */ final private IQueryClient clientProxy; -// /** The query iff materialized on this node. */ -// final private AtomicReference<BOp> queryRef; /** The query. */ final private BOp query; @@ -141,7 +132,12 @@ * The buffer used for the overall output of the query pipeline. * * FIXME SCALEOUT: This should only exist on the query controller. Other - * nodes will send {@link IChunkMessage}s to the query controller. + * nodes will send {@link IChunkMessage}s to the query controller. s/o will + * use an operator with {@link BOpEvaluationContext#CONTROLLER} in order to + * ensure that the results are transferred to the query controller. When a + * {@link SliceOp} is used, this is redundant. The operator in other cases + * can be a {@link CopyBindingSetOp} whose {@link BOpEvaluationContext} has + * been overridden. */ final private IBlockingBuffer<IBindingSet[]> queryBuffer; @@ -218,7 +214,7 @@ * Note: This is package private so it will be visible to the * {@link QueryEngine}. */ - final/* private */BlockingQueue<IChunkMessage> chunksIn = new LinkedBlockingDeque<IChunkMessage>(); + final/* private */BlockingQueue<IChunkMessage<IBindingSet>> chunksIn = new LinkedBlockingDeque<IChunkMessage<IBindingSet>>(); /** * Set the query deadline. The query will be cancelled when the deadline is @@ -286,45 +282,13 @@ } + /** + * Return the operator tree for this query. + */ public BOp getQuery() { return query; } -// /** -// * Return the operator tree for this query. If query processing is -// * distributed and the query has not been materialized on this node, then it -// * is materialized now. -// * -// * @return The query. -// */ -// public BOp getQuery() { -// -// if (queryRef.get() == null) { -// -// synchronized (queryRef) { -// -// if (queryRef.get() == null) { -// -// try { -// -// queryRef.set(clientProxy.getQuery(queryId)); -// -// } catch (RemoteException e) { -// -// throw new RuntimeException(e); -// -// } -// -// } -// -// } -// -// } -// -// return queryRef.get(); -// -// } - /** * Return <code>true</code> iff this is the query controller. */ @@ -361,8 +325,6 @@ * if the <i>writeTimestamp</i> is neither * {@link ITx#UNISOLATED} nor a read-write transaction * identifier. - * - * @todo is queryBuffer required? should it be allocated from the top bop? */ public RunningQuery(final QueryEngine queryEngine, final long queryId, // final long begin, @@ -390,42 +352,6 @@ this.statsMap = controller ? new ConcurrentHashMap<Integer, BOpStats>() : null; -// /* -// * @todo when making a per-bop annotation, queries must obtain a tx for -// * each timestamp up front on the controller and rewrite the bop to hold -// * the tx until it is done. -// * -// * @todo This is related to how we handle sequences of steps, parallel -// * steps, closure of steps, and join graphs. Those operations need to be -// * evaluated on the controller. We will have to model the relationship -// * between the subquery and the query in order to terminate the subquery -// * when the query halts and to terminate the query if the subquery -// * fails. -// * -// * @todo Closure operations must rewrite the query to update the -// * annotations. Each pass in a closure needs to be its own "subquery" -// * and will need to have a distinct queryId. -// */ -// final Long timestamp = query -// .getProperty(BOp.Annotations.TIMESTAMP); -// -// // @todo remove default when elevating to per-writable bop annotation. -// final long writeTimestamp = query.getProperty( -// BOp.Annotations.WRITE_TIMESTAMP, ITx.UNISOLATED); -// -// if (readTimestamp == null) -// throw new IllegalArgumentException(); -// -// if (readTimestamp.longValue() == ITx.UNISOLATED) -// throw new IllegalArgumentException(); -// -// if (TimestampUtility.isReadOnly(writeTimestamp)) -// throw new IllegalArgumentException(); -// -// this.readTimestamp = readTimestamp; -// -// this.writeTimestamp = writeTimestamp; - this.timeout = query.getProperty(BOp.Annotations.TIMEOUT, BOp.Annotations.DEFAULT_TIMEOUT); @@ -463,8 +389,9 @@ /* * Note: The partitionId will always be -1 in scale-up. */ - final BindingSetChunk chunk = new BindingSetChunk(clientProxy, queryId, - sinkId, -1/* partitionId */, sink.iterator()); + final BindingSetChunk<IBindingSet> chunk = new BindingSetChunk<IBindingSet>( + clientProxy, queryId, sinkId, -1/* partitionId */, sink + .iterator()); queryEngine.acceptChunk(chunk); @@ -475,12 +402,12 @@ /** * Make a chunk of binding sets available for consumption by the query. * <p> - * Note: this is invoked by {@link QueryEngine#add(BindingSetChunk)}. + * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} * * @param msg * The chunk. */ - protected void acceptChunk(final IChunkMessage msg) { + protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -506,7 +433,7 @@ * * @todo this should reject multiple invocations for a given query instance. */ - public void startQuery(final IChunkMessage chunk) { + public void startQuery(final IChunkMessage<IBindingSet> chunk) { if (!controller) throw new UnsupportedOperationException(); if (chunk == null) @@ -776,7 +703,7 @@ * A chunk to be consumed. */ @SuppressWarnings("unchecked") - protected FutureTask<Void> newChunkTask(final IChunkMessage chunk) { + protected FutureTask<Void> newChunkTask(final IChunkMessage<IBindingSet> chunk) { /* * Look up the BOp in the index, create the BOpContext for that BOp, and * return the value returned by BOp.eval(context). @@ -792,6 +719,9 @@ * @todo evaluation of element[] pipelines needs to use pretty much * the same code, but it needs to be typed for E[] rather than * IBindingSet[]. + * + * @todo evaluation of Monet style BATs would also operate under + * different assumptions, closer to those of an element[]. */ throw new UnsupportedOperationException(bop.getClass().getName()); } @@ -961,16 +891,4 @@ } -// public long getReadTimestamp() { -// -// return readTimestamp; -// -// } -// -// public long getWriteTimestamp() { -// -// return writeTimestamp; -// -// } - } Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ChunkMessageWithNIOPayload.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ChunkMessageWithNIOPayload.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ChunkMessageWithNIOPayload.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -1,244 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Sep 10, 2010 - */ - -package com.bigdata.bop.fed; - -import java.io.Serializable; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; - -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.engine.IChunkMessage; -import com.bigdata.bop.engine.IQueryClient; -import com.bigdata.io.DirectBufferPoolAllocator.IAllocation; -import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; -import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.service.ResourceService; - -/** - * An {@link IChunkMessage} where the payload is made available to the receiving - * service using an NIO transfer against the sender's {@link ResourceService}. - * This is suitable for moving large blocks of data during query evaluation. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class ChunkMessageWithNIOPayload implements IChunkMessage, Serializable { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * Metadata about an allocation to be retrieved from the sender's - * {@link ResourceService}. - */ - private final class A implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * The identifier of the resource on the sender's - * {@link ResourceService}. - */ - private final UUID bufferId; - - /** - * The size of that resource in bytes. - */ - private final int nbytes; - - /** - * - * @param bufferId - * The identifier of the resource on the sender's - * {@link ResourceService}. - * @param nbytes - * The size of that resource in bytes. - */ - public A(final UUID bufferId, final int nbytes) { - this.bufferId = bufferId; - this.nbytes = nbytes; - } - } - - final private IQueryClient queryController; - - final private long queryId; - - final private int bopId; - - final private int partitionId; - - final private int nbytes; - - /** - * Note: Even when we send one message per chunk, we can still have a list - * of {@link IAllocation}s if the chunk did not get formatted onto a single - * {@link IAllocation}. - */ - final private A[] allocations; - - /** - * The Internet address and port where the receiver can fetch the payload - * using the sender's {@link ResourceService}. - */ - final private InetSocketAddress addr; - - public IQueryClient getQueryController() { - return queryController; - } - - public long getQueryId() { - return queryId; - } - - public int getBOpId() { - return bopId; - } - - public int getPartitionId() { - return partitionId; - } - - /** The #of bytes of data which are available for that operator. */ - public int getBytesAvailable() { - return nbytes; - } - - /** - * The Internet address and port of a {@link ResourceService} from which the - * receiver may demand the data. - */ - public InetSocketAddress getServiceAddr() { - return addr; - } - - /** - * - * @param queryController - * @param queryId - * @param sinkId - * @param partitionId - * @param allocations - * The ordered list of {@link IAllocation}s comprising the chunk. - * @param addr - * The Internet address and port where the receiver can fetch the - * payload using the sender's {@link ResourceService}. - */ - public ChunkMessageWithNIOPayload(final IQueryClient queryController, - final long queryId, final int sinkId, final int partitionId, - final List<IAllocation> allocations, final InetSocketAddress addr) { - - if (queryController == null) - throw new IllegalArgumentException(); - - if (allocations == null) - throw new IllegalArgumentException(); - - if (addr == null) - throw new IllegalArgumentException(); - - this.queryController = queryController; - this.queryId = queryId; - this.bopId = sinkId; - this.partitionId = partitionId; - final int n = allocations.size(); - this.allocations = new A[n]; - int i = 0; - int nbytes = 0; - final Iterator<IAllocation> itr = allocations.iterator(); - while (itr.hasNext()) { - final IAllocation alloc = itr.next(); - final int len = alloc.getSlice().capacity(); - this.allocations[i++] = new A(alloc.getId(), len); - nbytes += len; - } - this.nbytes = nbytes; - this.addr = addr; - - } - - public boolean isMaterialized() { - return materialized; - } - private volatile boolean materialized = false; - - /** - * - * FIXME unit tests for materializing and visiting the chunk. - */ - synchronized public void materialize(FederatedRunningQuery runningQuery) { - - if (materialized) - return; - - final AllocationContextKey key = new ShardContext(queryId, bopId, - partitionId); - - final IAllocationContext allocationContext = runningQuery - .getAllocationContext(key); - - final ResourceService resourceService = runningQuery.getQueryEngine() - .getResourceService(); - -// for (A a : allocations) { -// -// /* -// * FIXME harmonize an IAllocation[] with a ByteBuffer for the {@link -// * ResourceService}. The problem is that an object to be sent across -// * the wire may span multiple ByteBuffers. -// */ -// final ByteBuffer tmp = allocationContext.alloc(a.nbytes); -// -// new ResourceService.ReadBufferTask(addr, a.bufferId, tmp); -// -// } - - throw new UnsupportedOperationException(); - - } - - public IAsynchronousIterator<IBindingSet[]> iterator() { - - if (!isMaterialized()) - throw new UnsupportedOperationException(); - - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - - } - -} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -31,7 +31,13 @@ import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.log4j.Logger; + import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.engine.IChunkMessage; @@ -41,7 +47,6 @@ import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.btree.raba.IRaba; import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -51,6 +56,7 @@ import com.bigdata.service.IDataService; import com.bigdata.service.ManagedResourceService; import com.bigdata.service.ResourceService; +import com.bigdata.util.InnerCause; /** * An {@link IBigdataFederation} aware {@link QueryEngine}. @@ -59,18 +65,13 @@ * @version $Id: FederatedQueryEngine.java 3508 2010-09-05 17:02:34Z thompsonbry * $ * - * @todo buffer management for s/o bindingSet[] movement - * - * @todo buffer management for s/o DHT element[] movement - * - * @todo Compressed representations of binding sets with the ability to read - * them in place or materialize them onto the java heap. The - * representation should be amenable to processing in C since we want to - * use them on GPUs as well. See {@link IChunkMessage} and perhaps - * {@link IRaba}. + * @todo DEFAULT_GRAPH_QUERY: buffer management for s/o DHT element[] movement */ public class FederatedQueryEngine extends QueryEngine { + private final static transient Logger log = Logger + .getLogger(FederatedQueryEngine.class); + /** * The {@link IBigdataFederation} iff running in scale-out. * <p> @@ -88,6 +89,17 @@ private final ManagedResourceService resourceService; /** + * A priority queue of {@link IChunkMessage}s which needs to have their data + * materialized so an operator can consume those data on this node. + */ + final private PriorityBlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new PriorityBlockingQueue<IChunkMessage<?>>(); + + /** + * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. + */ + private final AtomicReference<FutureTask<Void>> materializeChunksFuture = new AtomicReference<FutureTask<Void>>(); + + /** * Constructor used on a {@link DataService} (a query engine peer). * * @param dataService @@ -101,7 +113,44 @@ } + @Override + public UUID getServiceUUID() { + + return fed.getServiceUUID(); + + } + + @Override + public IBigdataFederation<?> getFederation() { + + return fed; + + } + /** + * The service used to expose {@link ByteBuffer}s and managed index + * resources for transfer to remote services in support of distributed query + * evaluation. + */ + public ManagedResourceService getResourceService() { + + return resourceService; + + } + + /** + * Overridden to strengthen the return type. + * <p> + * {@inheritDoc} + */ + @Override + protected FederatedRunningQuery getRunningQuery(final long queryId) { + + return (FederatedRunningQuery) super.getRunningQuery(queryId); + + } + + /** * Constructor used on a non-{@link DataService} node to expose a query * controller. Since the query controller is not embedded within a data * service it needs to provide its own {@link ResourceService} and local @@ -131,43 +180,121 @@ } + /** + * {@inheritDoc} + * <p> + * Extended to also initialize a thread which will materialize + * {@link IChunkMessage} for consumption by this node. + * + * @todo ANALYTIC_QUERY: {@link IChunkMessage} are dropped onto a queue and + * materialized in order of arrival. This works fine for low latency + * pipelined query evaluation. + * <p> + * For analytic query, we (a) manage the #of high volume operators + * which run concurrently, presumably based on their demands on + * memory; and (b) model the chunks available before they are + * materialized locally such that (c) they can be materialized on + * demand (flow control); and (d) we can run the operator when there + * are sufficient chunks available without taking on too much data. + * <p> + * This requires a separate queue for executing high volume operators + * and also separate consideration of when chunks available on remote + * nodes should be materialized. + */ @Override - public UUID getServiceUUID() { + public void init() { - return fed.getServiceUUID(); + final FutureTask<Void> ft = new FutureTask<Void>( + new MaterializeChunksTask(), (Void) null); + + if (materializeChunksFuture.compareAndSet(null/* expect */, ft)) { + + getIndexManager().getExecutorService().execute(ft); + + } else { + + throw new IllegalStateException("Already running"); + + } } + /** + * {@inheritDoc} + * <p> + * Extended to stop materializing chunks once all running queries are done. + */ @Override - public IBigdataFederation<?> getFederation() { + protected void didShutdown() { + + // stop materializing chunks. + final Future<?> f = materializeChunksFuture.get(); + if (f != null) + f.cancel(true/* mayInterruptIfRunning */); - return fed; - } - - /** - * The service used to expose {@link ByteBuffer}s and managed index - * resources for transfer to remote services in support of distributed query - * evaluation. - */ - public ManagedResourceService getResourceService() { - return resourceService; - - } - /** - * Overridden to strengthen the return type. + * {@inheritDoc} * <p> - * {@inheritDoc} + * Extended to stop materializing chunks. */ @Override - protected FederatedRunningQuery getRunningQuery(final long queryId) { + public void shutdownNow() { + + // stop materializing chunks. + final Future<?> f = materializeChunksFuture.get(); + if (f != null) + f.cancel(true/* mayInterruptIfRunning */); - return (FederatedRunningQuery) super.getRunningQuery(queryId); + super.shutdownNow(); } + /** + * Runnable materializes chunks and makes them available for further + * processing. + */ + private class MaterializeChunksTask implements Runnable { + public void run() { + while (true) { + try { + final IChunkMessage<?> c = chunkMaterializationQueue.take(); + final long queryId = c.getQueryId(); + final FederatedRunningQuery q = getRunningQuery(queryId); + if (q.isCancelled()) + continue; + final IChunkMessage<?> msg = chunkMaterializationQueue + .poll(); + try { + msg.materialize(q); + /* + * @todo The type warning here is because the rest of + * the API does not know what to do with messages for + * chunks other than IBindingSet[], e.g., IElement[], + * etc. + */ + FederatedQueryEngine.this + .bufferReady((IChunkMessage) msg); + } catch(Throwable t) { + if(InnerCause.isInnerCause(t, InterruptedException.class)) { + log.warn("Interrupted."); + return; + } + throw new RuntimeException(t); + } + } catch (InterruptedException e) { + log.warn("Interrupted."); + return; + } catch (Throwable ex) { + // log and continue + log.error(ex, ex); + continue; + } + } + } + } // MaterializeChunksTask + public void declareQuery(final IQueryDecl queryDecl) { final long queryId = queryDecl.getQueryId(); @@ -179,7 +306,7 @@ } @Override - public void bufferReady(final IChunkMessage msg) { + public void bufferReady(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -200,12 +327,6 @@ } else { /* - * FIXME SCALEOUT: We need to model the chunks available before they - * are materialized locally such that (a) they can be materialized - * on demand (flow control); and (b) we can run the operator when - * there are sufficient chunks available without taking on too much - * data. [For the sort term, they can be dropped onto a queue and - * materialized in order of arrival.] */ throw new UnsupportedOperationException("FIXME"); @@ -248,6 +369,8 @@ * normally. Also pay attention when the client closes the * {@link IAsynchronousIterator} from which it is draining solutions * early. + * + * @deprecated This is going away. */ @Override protected IBlockingBuffer<IBindingSet[]> newQueryBuffer( Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-10 20:47:27 UTC (rev 3531) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -29,10 +29,7 @@ import java.nio.ByteBuffer; import java.rmi.RemoteException; -import java.util.Arrays; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -47,9 +44,7 @@ import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IQueryPeer; import com.bigdata.bop.engine.RunningQuery; -import com.bigdata.io.DirectBufferPoolAllocator; -import com.bigdata.io.SerializerUtil; -import com.bigdata.io.DirectBufferPoolAllocator.IAllocation; +import com.bigdata.io.DirectBufferPool; import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.accesspath.BlockingBuffer; @@ -58,7 +53,6 @@ import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.resources.ResourceManager; import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.ManagedResourceService; import com.bigdata.service.ResourceService; import com.bigdata.striterator.IKeyOrder; @@ -245,7 +239,7 @@ * {@inheritDoc} */ @Override - protected void acceptChunk(final IChunkMessage msg) { + protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { super.acceptChunk(msg); @@ -475,11 +469,11 @@ * The identifier of the target {@link BOp}. * @param allocationContext * The allocation context within which the {@link ByteBuffer}s - * will be managed for this {@link ChunkMessageWithNIOPayload}. + * will be managed for this {@link NIOChunkMessage}. * @param source * The binding sets to be formatted onto a buffer. * - * @return The {@link ChunkMessageWithNIOPayload}. + * @return The {@link NIOChunkMessage}. * * @todo This is basically a factory for creating {@link IChunkMessage}s. * That factory pattern in combined with the logic to send the message @@ -489,10 +483,6 @@ * could help to cut latency when an operator has a large fan out (in * scale-out when mapping over shards or nodes). * - * @todo We probably need to use the {@link DirectBufferPoolAllocator} to - * receive the chunks within the {@link ManagedResourceService} as - * well. - * * @todo Release the allocations associated with each output chunk once it * is received by the remote service. * <p> @@ -506,10 +496,32 @@ * closed, then the output chunks for the query controller should be * immediately dropped. * - * @todo There are a few things where the resource must be made available to - * more than one operator evaluation phase. The best examples are - * temporary graphs for parallel closure and large collections of - * graphIds for SPARQL "NAMED FROM DATA SET" extensions. + * @todo There are a few things for which the resource must be made + * available to more than one operator evaluation phase. The best + * examples are temporary graphs for parallel closure and large + * collections of graphIds for SPARQL "NAMED FROM DATA SET" + * extensions. + * + * @todo Rethink the multiplicity relationship between chunks output from an + * operator, chunks output from mapping the operator over shards or + * nodes, RMI messages concerning buffers available for the sink + * operator on the various nodes, and the #of allocations per RMI + * message on both the sender and the receiver. + * <p> + * I am pretty sure that none of these are strongly coupled, e.g., + * they are not 1:1. Some stages can combine chunks. Multiple + * allocations could be required on either the sender or the receiver + * purely due to where the slices fall on the backing direct + * {@link ByteBuffer}s in the {@link DirectBufferPool} and the sender + * and receiver do not need to use the same allocation context or have + * the same projection of slices onto the backing buffers. + * <p> + * The one thing which is critical is that the query controller is + * properly informed of the #of chunks made available to an operator + * and consumed by that operator, that those reports must be in the + * same units, and that the reports must be delivered back to the + * query controller in a manner which does not transiently violate the + * termination conditions of the query. */ protected void sendChunkMessage( final UUID serviceUUID, @@ -540,13 +552,15 @@ final boolean thisService = peerProxy == getQueryEngine(); if(thisService) { + /* * Leave the chunk as Java objects and drop it directly onto the * query engine. */ - final IChunkMessage msg = new BindingSetChunk(getQueryController(), - getQueryId(), sinkId, partitionId, source.iterator()); + final IChunkMessage<IBindingSet> msg = new BindingSetChunk<IBindingSet>( + getQueryController(), getQueryId(), sinkId, partitionId, + source.iterator()); getQueryEngine().bufferReady(msg); @@ -561,37 +575,23 @@ * RMI message or out of band using NIO. This decision effects how we * serialize the chunk. */ - final IChunkMessage msg; + final IChunkMessage<IBindingSet> msg; if (source.size() < 100) { - /* - * FIXME Send payload inline with the RMI message. - */ + msg = new ThickChunkMessage<IBindingSet>(getQueryController(), + getQueryId(), sinkId, partitionId, source); -// final byte[] data = SerializerUtil.serialize(obj); -// -// // @todo harmonize serialization and compression and ctors. -// msg = new ThickChunkMessage(getQueryController(), getQueryId(), -// sinkId, partitionId, data); - throw new UnsupportedOperationException(); + } else { - } else - { - /* * Marshall the data onto direct ByteBuffer(s) and send a thin * message by RMI. The receiver will retrieve the data using NIO * against the ResourceService. - * - * @todo harmonize serialization and compression and ctors. */ - final List<IAllocation> allocations = moveToNIOBuffers( - allocationContext, source); + msg = new NIOChunkMessage<IBindingSet>(getQueryController(), + getQueryId(), sinkId, partitionId, allocationContext, + source, getQueryEngine().getResourceService().getAddr()); - msg = new ChunkMessageWithNIOPayload(getQueryController(), - getQueryId(), sinkId, partitionId, allocations, - getQueryEngine().getResourceService().getAddr()); - } try { @@ -606,61 +606,4 @@ } - /** - * Chunk-wise serialization of the data onto allocations. - * @param allocationContext - * @param source - * @return - * - * @todo should be on message per chunk, right? - */ - private List<IAllocation> moveToNIOBuffers( - final IAllocationContext allocationContext, - final IBlockingBuffer<IBindingSet[]> source) { - - int nbytes = 0; - - final List<IAllocation> allocations = new LinkedList<IAllocation>(); - - final IAsynchronousIterator<IBindingSet[]> itr = source.iterator(); - - try { - - while (itr.hasNext()) { - - // Next chunk to be serialized. - final IBindingSet[] chunk = itr.next(); - - // serialize the chunk of binding sets. - final byte[] data = SerializerUtil.serialize(chunk); - - // track size of the allocations. - nbytes += data.length; - - // allocate enough space for those data. - final IAllocation[] tmp; - try { - tmp = allocationContext.alloc(data.length); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - - // copy the data into the allocations. - DirectBufferPoolAllocator.put(data, tmp); - - // append the new allocations. - allocations.addAll(Arrays.asList(tmp)); - - } - - return allocations; - - } finally { - - itr.close(); - - } - - } - } Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java (from rev 3531, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ChunkMessageWithNIOPayload.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/NIOChunkMessage.java 2010-09-11 22:53:40 UTC (rev 3532) @@ -0,0 +1,502 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 10, 2010 + */ + +package com.bigdata.bop.fed; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.IQueryClient; +import com.bigdata.io.DirectBufferPoolAllocator; +import com.bigdata.io.SerializerUtil; +import com.bigdata.io.DirectBufferPoolAllocator.IAllocation; +import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.service.ManagedResourceService; +import com.bigdata.service.ResourceService; + +/** + * An {@link IChunkMessage} where the payload is made available to the receiving + * service using an NIO transfer against the sender's {@link ResourceService}. + * This is suitable for moving large blocks of data during query evaluation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class NIOChunkMessage<E> implements IChunkMessage<E>, Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + final private IQueryClient queryController; + + final private long queryId; + + final private int bopId; + + final private int partitionId; + + final private int solutionCount; + + final private int nbytes; + + /** + * Note: Even when we send one message per chunk, we can still have a list + * of {@link IAllocation}s if the chunk did not get formatted onto a single + * {@link IAllocation}. + */ + final private A[] allocations; + + /** + * The Internet address and port where the receiver can fetch the payload + * using the sender's {@link ResourceService}. + */ + final private InetSocketAddress addr; + + public IQueryClient getQueryController() { + return queryController; + } + + public long getQueryId() { + return queryId; + } + + public int getBOpId() { + return bopId; + } + + public int getPartitionId() { + return partitionId; + } + + /** + * The #of elements in this chunk. + * + * @todo we could track this in total and in {@link A} on a per-slice basis. + */ + public int getSolutionCount() { + return solutionCount; + } + + /** The #of bytes of data which are available for that operator. */ + public int getBytesAvailable() { + return nbytes; + } + + /** + * The Internet address and port of a {@link ResourceService} from which the + * receiver may demand the data. + */ + public InetSocketAddress getServiceAddr() { + return addr; + } + + public String toString() { + + return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId + + ",partitionId=" + partitionId + ", solutionCount=" + + solutionCount + ", bytesAvailable=" + nbytes + ", nslices=" + + allocations.length + ", serviceAddr=" + addr + "}"; + + } + + /** + * + * @param queryController + * @param queryId + * @param sinkId + * @param partitionId + * @param allocations + * The ordered list of {@link IAllocation}s comprising the chunk. + * @param addr + * The Internet address and port where the receiver can fetch the + * payload using the sender's {@link ResourceService}. + */ + public NIOChunkMessage(final IQueryClient queryController, + final long queryId, final int sinkId, final int partitionId, + final IAllocationContext allocationContext, + final IBlockingBuffer<E[]> source, + final InetSocketAddress addr) { + + if (queryController == null) + throw new IllegalArgumentException(); + + if (allocationContext == null) + throw new IllegalArgumentException(); + + if (source == null) + throw new IllegalArgumentException(); + + if (addr == null) + throw new IllegalArgumentException(); + + // format onto NIO buffers. + final AtomicInteger nsolutions = new AtomicInteger(); + final List<IAllocation> allocations = moveToNIOBuffers( + allocationContext, source, nsolutions); + + this.queryController = queryController; + this.queryId = queryId; + this.bopId = sinkId; + this.partitionId = partitionId; + final int n = allocations.size(); + this.allocations = new A[n]; + int i = 0; + int nbytes = 0; + final Iterator<IAllocation> itr = allocations.iterator(); + while (itr.hasNext()) { + final IAllocation alloc = itr.next(); + final int len = alloc.getSlice().capacity(); + this.allocations[i++] = new A(alloc.getId(), len); + nbytes += len; + } + this.solutionCount = nsolutions.get(); + this.nbytes = nbytes; + this.addr = addr; + + } + + /** + * Chunk-wise serialization of the data onto allocations. + * + * @param allocationContext + * @param source + * @return + */ + static private <E> List<IAllocation> moveToNIOBuffers( + final IAllocationContext allocationContext, + final IBlockingBuffer<E[]> source, + final AtomicInteger nsolutions) { + + int nbytes = 0; + + int n = 0; + + final List<IAllocation> allocations = new LinkedList<IAllocation>(); + + final IAsynchronousIterator<E[]> itr = source.iterator(); + + try { + + while (itr.hasNext()) { + + // Next chunk to be serialized. + final E[] chunk = itr.next(); + + // track #of solutions. + n += chunk.length; + + // serialize the chunk of binding sets. + final byte[] data = SerializerUtil.serialize(chunk); + + // track size of the allocations. + nbytes += data.length; + + // allocate enough space for those data. + final IAllocation[] tmp; + try { + tmp = allocationContext.alloc(data.length); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + + // copy the data into the allocations. + DirectBufferPoolAllocator.put(data, tmp); + + for(IAllocation a : tmp) { + + // prepare for reading. + a.getSlice().flip(); + + // append the allocation. + allocations.add(a); + + } + + } + + nsolutions.addAndGet(n); + + return allocations; + + } finally { + + itr.close(); + + } + + } + + /** + * Metadata about an allocation to be retrieved from the sender's + * {@link ResourceService}. + */ + private static final class A implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * The identifier of the resource on the sender's + * {@link ResourceService}. + */ + private final UUID bufferId; + + /** + * The size of that resource in bytes. + */ + private final int nbytes; + + /** + ... [truncated message content] |