From: <tho...@us...> - 2010-09-08 20:52:16
|
Revision: 3524 http://bigdata.svn.sourceforge.net/bigdata/?rev=3524&view=rev Author: thompsonbry Date: 2010-09-08 20:52:07 +0000 (Wed, 08 Sep 2010) Log Message: ----------- Added a class (DirectBufferPoolAllocator) to manage allocations of direct ByteBuffer slices against the DirectBufferPool for use with the FederatedQueryEngine. There is a limited test suite for this class. Partly integrated the DirectBufferPoolAllocator with the FederatedQueryEngine. The generated intermediate results are now written onto direct ByteBuffer slices allocated on the DirectBufferPool and notification messages are sent to the receiving services. I have not yet modified the receiving FederatedQueryEngine to demand the data from the remote query engine. Harmonized InsertOp and PipelineJoin as IShardwiseBindingSetOps. This makes it possible to find the predicate for the access path on which we need to read/write and map the results to the appropriate shards. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IRelation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DataService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/IDataService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/R.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/AbstractResourceManagerTestCase.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/DataServer.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/java/com/bigdata/service/jini/JiniFederation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/magic/MagicRelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPORelation.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IShardwisePipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPoolAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/io/TestDirectBufferPoolAllocator.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IShardwisePipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IShardwisePipelineOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IShardwisePipelineOp.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -0,0 +1,48 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 8, 2010 + */ + +package com.bigdata.bop; + +/** + * An interface for {@link BindingSetPipelineOp}s which are mapped across + * shards. + * + * @param <E> + * The generic type of the elements in the relation on which the + * predicate will read or write. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IShardwisePipelineOp<E> { + + /** + * The predicate which reads or writes on the shard. + */ + IPredicate<E> getPredicate(); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IShardwisePipelineOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -3,8 +3,10 @@ import java.net.InetSocketAddress; import java.rmi.Remote; import java.rmi.RemoteException; +import java.util.UUID; import com.bigdata.bop.BOp; +import com.bigdata.service.IService; /** * Interface for a node participating in the exchange of NIO buffers to @@ -13,6 +15,14 @@ public interface IQueryPeer extends Remote { /** + * The {@link UUID} of the service within which the {@link IQueryPeer} is + * running. + * + * @see IService#getServiceUUID() + */ + UUID getServiceUUID() throws RemoteException; + + /** * Notify a service that a buffer having data for some {@link BOp} in some * running query is available. The receiver may request the data when they * are ready. If the query is cancelled, then the sender will drop the @@ -30,7 +40,9 @@ * @return <code>true</code> unless the receiver knows that the query has * already been cancelled. */ +// * @param nbytes +// * The #of bytes of data which are available for that operator. void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, - long queryId, int bopId) throws RemoteException; + long queryId, int bopId/*, int nbytes*/) throws RemoteException; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -372,7 +372,7 @@ * is running -or- <code>null</code> if the {@link QueryEngine} is * not running against an {@link IBigdataFederation}. */ - protected UUID getServiceId() { + public UUID getServiceUUID() { return null; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -59,7 +59,6 @@ import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.resources.ResourceManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.concurrent.Haltable; @@ -209,40 +208,6 @@ private final Set<Integer/*bopId*/> startedSet = new LinkedHashSet<Integer>(); /** - * A map associating resources with running queries. When a query halts, the - * resources listed in its resource map are released. Resources can include - * {@link ByteBuffer}s backing either incoming or outgoing - * {@link BindingSetChunk}s, temporary files associated with the query, hash - * tables, etc. - * - * @todo Cache any resources materialized for the query on this node (e.g., - * temporary graphs materialized from a peer or the client). A bop - * should be able to demand those data from the cache and otherwise - * have them be materialized. - * - * @todo only use the values in the map for transient objects, such as a - * hash table which is not backed by the disk. For {@link ByteBuffer}s - * we want to make the references go through the {@link BufferService} - * . For files, through the {@link ResourceManager}. - * - * @todo We need to track the resources in use by the query so they can be - * released when the query terminates. This includes: buffers; joins - * for which there is a chunk of binding sets that are currently being - * executed; downstream joins (they depend on the source joins to - * notify them when they are complete in order to decide their own - * termination condition); local hash tables which are part of a DHT - * (especially when they are persistent); buffers and disk resources - * allocated to N-way merge sorts, etc. - * - * @todo The set of buffers having data which has been accepted for this - * query. - * - * @todo The set of buffers having data which has been generated for this - * query. - */ - private final ConcurrentHashMap<UUID, Object> resourceMap = new ConcurrentHashMap<UUID, Object>(); - - /** * The chunks available for immediate processing. * <p> * Note: This is package private so it will be visible to the @@ -261,7 +226,7 @@ * combined before we execute the operator. For unselective operators, * we are going to run over all the data anyway. */ - final BlockingQueue<BindingSetChunk> chunksIn = new LinkedBlockingDeque<BindingSetChunk>(); + final /*private*/ BlockingQueue<BindingSetChunk> chunksIn = new LinkedBlockingDeque<BindingSetChunk>(); /** * The class executing the query on this node. @@ -601,7 +566,7 @@ final long elapsed = System.currentTimeMillis() - begin; if (log.isTraceEnabled()) log.trace("bopId=" + msg.bopId + ",partitionId=" + msg.partitionId - + ",serviceId=" + queryEngine.getServiceId() + + ",serviceId=" + queryEngine.getServiceUUID() + ", nchunks=" + fanOut + " : runningTaskCount=" + runningTaskCount + ", availableChunkCount=" + availableChunkCount + ", elapsed=" + elapsed); @@ -742,7 +707,7 @@ .getProperty(BindingSetPipelineOp.Annotations.BOP_ID); final IBlockingBuffer<IBindingSet[]> sink = (p == null ? queryBuffer : op.newBuffer()); - // altSink [@todo altSink=null or sink when not specified?] + // altSink (null when not specified). final Integer altSinkId = (Integer) op .getProperty(BindingSetPipelineOp.Annotations.ALT_SINK_REF); if (altSinkId != null && !bopIndex.containsKey(altSinkId)) { @@ -758,7 +723,7 @@ // Hook the FutureTask. final Runnable r = new Runnable() { public void run() { - final UUID serviceId = queryEngine.getServiceId(); + final UUID serviceId = queryEngine.getServiceUUID(); int fanIn = 1; int sinkChunksOut = 0; int altSinkChunksOut = 0; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -27,17 +27,18 @@ package com.bigdata.bop.fed; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.bset.ConditionalRoutingOp; import com.bigdata.bop.engine.IQueryClient; +import com.bigdata.bop.engine.IQueryPeer; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; -import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.solutions.SliceOp; import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -45,6 +46,7 @@ import com.bigdata.relation.accesspath.IBuffer; import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.IDataService; import com.bigdata.service.ManagedResourceService; import com.bigdata.service.ResourceService; @@ -55,50 +57,14 @@ * @version $Id: FederatedQueryEngine.java 3508 2010-09-05 17:02:34Z thompsonbry * $ * - * @todo Modify the {@link FederatedQueryEngine} to actually run a distributed - * query. Since we are in the same JVM, the {@link IBindingSet} chunks can - * be used directly without being marshalled onto {@link ByteBuffer}s and - * transferred over the network. - * <p> - * Distributed query will fail until each {@link FederatedQueryEngine} is - * receiving chunks and running operators against its local - * {@link IIndexManager}. This requires that we map the output chunks for - * an operator over the shards for the next operator, that we send the - * appropriate messages to the query engine peers, that they demand the - * necessary data from their peers, etc. - * <p> - * Once distributed query is running, begin to marshall the chunks onto - * buffers [this might have to be done immediately to get the notification - * protocol working]. + * @todo buffer management for s/o including bindingSet[] movement for the + * pipeline and element[] movement for DHT on access path. * - * @todo buffer management for s/o, including binding sets movement, element - * chunk movement for DHT on access path, and on demand materialization of - * large query resources for large data sets, parallel closure, etc.; - * grouping operators which will run locally (such as a pipeline join plus - * a conditional routing operator) so we do not marshall binding sets - * between operators when they will not cross a network boundary. Also, - * handle mutation, programs and closure operators. - * - * @todo I have not yet figured out how to mark operators to indicate when their - * output should be mapped across shards or handled locally. It would - * appear that this is a concern of their parent in the operator tree. For - * example, the {@link ConditionalRoutingOp} would be applied to transform - * the output of a {@link PipelineJoin} before mapping the output over the - * shards. - * <p> - * The operator themselves could carry this information either as a Java - * method or as an annotation. - * <p> - * This could interact with how we combine {@link RunningQuery#chunksIn}. - * * @todo Override to release buffers associated with chunks buffered for a query * when it terminates (buffers may be for received chunks or chunks which * are awaiting transfer to another node). [This might be handled by a * {@link RunningQuery} override.] * - * @todo Override protocol hooks for moving data around among the - * {@link QueryEngine}s - * * @todo Compressed representations of binding sets with the ability to read * them in place or materialize them onto the java heap. The * representation should be ammenable to processing in C since we want to @@ -167,7 +133,7 @@ } @Override - protected UUID getServiceId() { + public UUID getServiceUUID() { return fed.getServiceUUID(); @@ -190,13 +156,18 @@ return resourceService; } - + + /** + * FIXME Once buffers are ready their data needs to be materialized on this + * node and the chunks queued for processing. + * + * @todo What is the cost of passing the proxy around like this? Should it + * be discovered instead from a registrar? + */ @Override public void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, long queryId, int bopId) { - // @todo notify peer when a buffer is ready. - } /** @@ -246,4 +217,47 @@ } + /** + * Resolve an {@link IQueryPeer}. + * <p> + * Note: This only resolves the peers running on the {@link IDataService}s. + * It will not resolve a query controller unless an {@link IDataService} is + * being used as the query controller. + * + * @param serviceUUID + * The service {@link UUID}. + * + * @return The proxy for the query peer. + */ + protected IQueryPeer getQueryPeer(final UUID serviceUUID) { + + IQueryPeer proxy = proxyMap.get(serviceUUID); + + if (proxy == null) { + + final IDataService dataService = getFederation().getDataService( + serviceUUID); + + if (dataService == null) + throw new RuntimeException("No such service: " + serviceUUID); + + try { + proxy = dataService.getQueryEngine(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + proxyMap.put(serviceUUID, proxy); + + } + + return proxy; + + } + + /** + * Cache for {@link #getQueryPeer(UUID)}. + */ + private final ConcurrentHashMap<UUID, IQueryPeer> proxyMap = new ConcurrentHashMap<UUID, IQueryPeer>(); + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -27,19 +27,39 @@ package com.bigdata.bop.fed; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.rmi.RemoteException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IShardwisePipelineOp; +import com.bigdata.bop.engine.BindingSetChunk; import com.bigdata.bop.engine.IQueryClient; +import com.bigdata.bop.engine.IQueryPeer; +import com.bigdata.bop.engine.QueryEngine; import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.io.DirectBufferPoolAllocator; +import com.bigdata.io.SerializerUtil; +import com.bigdata.io.DirectBufferPoolAllocator.IAllocation; +import com.bigdata.io.DirectBufferPoolAllocator.IAllocationContext; import com.bigdata.mdi.PartitionLocator; +import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.IBuffer; +import com.bigdata.resources.ResourceManager; import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.jini.master.IAsynchronousClientTask; +import com.bigdata.service.ManagedResourceService; +import com.bigdata.service.ResourceService; import com.bigdata.striterator.IKeyOrder; /** @@ -69,6 +89,59 @@ * */ public class FederatedRunningQuery extends RunningQuery { + /** + * The {@link UUID} of the service which is the {@link IQueryClient} running + * this query. + */ + private final UUID queryControllerUUID; + + /** + * A map associating resources with running queries. When a query halts, the + * resources listed in its resource map are released. Resources can include + * {@link ByteBuffer}s backing either incoming or outgoing + * {@link BindingSetChunk}s, temporary files associated with the query, hash + * tables, etc. + * + * @todo This map will eventually need to be moved into {@link RunningQuery} + * in order to support temporary graphs or other disk-backed resources + * associated with the evaluation of a query against a standalone + * database. However, the main use case are the resources associated + * with query against an {@link IBigdataFederation} which it why it is + * being developed in the {@link FederatedRunningQuery} class. + * + * @todo Cache any resources materialized for the query on this node (e.g., + * temporary graphs materialized from a peer or the client). A bop + * should be able to demand those data from the cache and otherwise + * have them be materialized. + * + * @todo Only use the values in the map for transient objects, such as a + * hash table which is not backed by the disk. For {@link ByteBuffer}s + * we want to make the references go through the {@link ResourceService} + * . For files, through the {@link ResourceManager}. + * + * @todo We need to track the resources in use by the query so they can be + * released when the query terminates. This includes: buffers; joins + * for which there is a chunk of binding sets that are currently being + * executed; downstream joins (they depend on the source joins to + * notify them when they are complete in order to decide their own + * termination condition); local hash tables which are part of a DHT + * (especially when they are persistent); buffers and disk resources + * allocated to N-way merge sorts, etc. + * + * @todo The set of buffers having data which has been accepted for this + * query. + * + * @todo The set of buffers having data which has been generated for this + * query. + */ + private final ConcurrentHashMap<UUID, Object> resourceMap = new ConcurrentHashMap<UUID, Object>(); + + /** + * @todo Maintain multiple allocation contexts. Some can be query wide. + * Others might be specific to a serviceId and/or sinkId. + */ + private final ConcurrentHashMap<Object/* key */, IAllocationContext> allocationContexts = new ConcurrentHashMap<Object, IAllocationContext>(); + public FederatedRunningQuery(FederatedQueryEngine queryEngine, long queryId, long readTimestamp, long writeTimestamp, long begin, long timeout, boolean controller, IQueryClient clientProxy, @@ -76,6 +149,17 @@ super(queryEngine, queryId, readTimestamp, writeTimestamp, begin, timeout, controller, clientProxy, query, queryBuffer); + + /* + * Note: getServiceUUID() should be a smart proxy method and thus not + * actually do RMI here. However, it is resolved eagerly and cached + * anyway. + */ + try { + this.queryControllerUUID = getQueryController().getServiceUUID(); + } catch (RemoteException e) { + throw new RuntimeException(e); + } } @@ -87,38 +171,148 @@ } /** + * The allocation context key groups together allocations onto the same + * direct {@link ByteBuffer}s. There are different implementations depending + * on how it makes sense to group data data for a given query. + */ + static abstract private class AllocationContextKey { + + /** + * Must be overridden. The queryId must be a part of each hashCode() in + * order to ensure that the hash codes are well distributed across + * different queries on the same node. + */ + @Override + abstract public int hashCode(); + + /** + * Must be overridden. + */ + @Override + abstract public boolean equals(Object o); + + } + + /** + * An allocation context which is shared by all operators running in the + * same query. + */ + static private class QueryContext extends AllocationContextKey { + private final Long queryId; + + QueryContext(final Long queryId) { + this.queryId = Long.valueOf(queryId); + } + + public int hashCode() { + return queryId.hashCode(); + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof QueryContext)) + return false; + if (!queryId.equals(((QueryContext) o).queryId)) + return false; + return true; + } + } + + /** + * An allocation context which is shared by all operators running in the + * same query which target the same service. + */ + static private class ServiceContext extends AllocationContextKey { + private final Long queryId; + + private final UUID serviceUUID; + + ServiceContext(final Long queryId, final UUID serviceUUID) { + this.queryId = queryId; + this.serviceUUID = serviceUUID; + } + + public int hashCode() { + return queryId.hashCode() * 31 + serviceUUID.hashCode(); + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof ServiceContext)) + return false; + if (!queryId.equals(((ServiceContext) o).queryId)) + return false; + if (!serviceUUID.equals(((ServiceContext) o).serviceUUID)) + return false; + return true; + } + } + + /** + * An allocation context which is shared by all operators running in the + * same query which target the same shard (the same shard implies the same + * service, at least until we have HA with shard affinity). + */ + static private class ShardContext extends AllocationContextKey { + + private final Long queryId; + + private final int partitionId; + + ShardContext(final Long queryId, final int partitionId) { + this.queryId = queryId; + this.partitionId = partitionId; + } + + public int hashCode() { + return queryId.hashCode() * 31 + partitionId; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof ShardContext)) + return false; + if (!queryId.equals(((ShardContext) o).queryId)) + return false; + if (partitionId != partitionId) + return false; + return true; + } + } + + /** + * Return the {@link IAllocationContext} for the given key. + * + * @param key + * The key. + * + * @return The allocation context. + */ + private IAllocationContext getAllocationContext( + final AllocationContextKey key) { + + return getQueryEngine().getResourceService().getAllocator() + .getAllocationContext(key); + + } + + /** * {@inheritDoc} - * - * @return The #of chunks made available for consumption by the sink. This - * will always be ONE (1) for scale-up. For scale-out, there will be - * one chunk per index partition over which the intermediate results - * were mapped. - * - * FIXME SCALEOUT: This is where we need to map the binding sets - * over the shards for the target operator. Once they are mapped, - * write the binding sets onto an NIO buffer for the target node and - * then send an RMI message to the node telling it that there is a - * chunk available for the given (queryId,bopId,partitionId). - * <p> - * For selective queries in s/o, first format the data onto a list - * of byte[]s, one per target shard/node. Then, using a lock, obtain - * a ByteBuffer if there is none associated with the query yet. - * Otherwise, using the same lock, obtain a slice onto that - * ByteBuffer and put as much of the byte[] as will fit, continuing - * onto a newly recruited ByteBuffer if necessary. Release the lock - * and notify the target of the ByteBuffer slice (buffer#, off, - * len). Consider pushing the data proactively for selective - * queries. - * <p> - * For unselective queries in s/o, proceed as above but we need to - * get the data off the heap and onto the {@link ByteBuffer}s - * quickly (incrementally) and we want the consumers to impose flow - * control on the producers to bound the memory demand (this needs - * to be coordinated carefully to avoid deadlocks). Typically, large - * result sets should result in multiple passes over the consumer's - * shard rather than writing the intermediate results onto the disk. - * - * */ + * <p> + * This method is overridden to organize the output from one operator so in + * order to make it available to another operator running on a different + * node. There are several cases which have to be handled and which are + * identified by the {@link BOp#getEvaluationContext()}. In addition, we + * need to handle low latency and high data volume queries somewhat + * differently. Except for {@link BOpEvaluationContext#ANY}, all of these + * cases wind up writing the intermediate results onto a direct + * {@link ByteBuffer} and notifying the receiving service that there are + * intermediate results which it can pull when it is ready to process them. + * This pattern allows the receiver to impose flow control on the producer. + */ @Override protected <E> int add(final int sinkId, final IBlockingBuffer<IBindingSet[]> sink) { @@ -132,27 +326,31 @@ throw new IllegalArgumentException(); switch (bop.getEvaluationContext()) { - case ANY: + case ANY: { return super.add(sinkId, sink); + } case HASHED: { /* - * FIXME The sink self describes the nodes over which the - * binding sets will be mapped and the hash function to be applied - * so we look up those metadata and apply them to distributed the - * binding sets across the nodes. + * @todo The sink must use annotations to describe the nodes over + * which the binding sets will be mapped and the hash function to be + * applied. Look up those annotations and apply them to distribute + * the binding sets across the nodes. */ throw new UnsupportedOperationException(); } case SHARDED: { /* - * FIXME The sink must read or write on a shard so we map the - * binding sets across the access path for the sink. + * The sink must read or write on a shard so we map the binding sets + * across the access path for the sink. * * @todo For a pipeline join, the predicate is the right hand * operator of the sink. This might be true for INSERT and DELETE - * operators as well. + * operators as well. [It is not, but make it so and document this + * pattern or have a common interface method which returns the + * IPredicate regardless of whether it is an operand or an + * annotation.] * - * @todo IKeyOrder tells us which index will be used and should be + * Note: IKeyOrder tells us which index will be used and should be * set on the predicate by the join optimizer. * * @todo Use the read or write timestamp depending on whether the @@ -163,28 +361,21 @@ * data contained in the sink (in fact, we should just process the * sink data in place). */ - final IPredicate<E> pred = null; // @todo - final IKeyOrder<E> keyOrder = null; // @todo - final long timestamp = getReadTimestamp(); // @todo + @SuppressWarnings("unchecked") + final IPredicate<E> pred = ((IShardwisePipelineOp) bop).getPredicate(); + final IKeyOrder<E> keyOrder = pred.getKeyOrder(); + final long timestamp = getReadTimestamp(); // @todo read vs write timestamp. final int capacity = 1000;// @todo + final int capacity2 = 1000;// @todo final MapBindingSetsOverShardsBuffer<IBindingSet, E> mapper = new MapBindingSetsOverShardsBuffer<IBindingSet, E>( getFederation(), pred, keyOrder, timestamp, capacity) { - - @Override - IBuffer<IBindingSet> newBuffer(PartitionLocator locator) { - // TODO Auto-generated method stub - return null; - } - + @Override + IBuffer<IBindingSet> newBuffer(PartitionLocator locator) { + return new BlockingBuffer<IBindingSet>(capacity2); + } }; /* * Map the binding sets over shards. - * - * FIXME The buffers created above need to become associated with - * this query as resources of the query. Once we are done mapping - * the binding sets over the shards, the target node for each buffer - * needs to be set an RMI message to let it know that there is a - * chunk available for it for the target operator. */ { final IAsynchronousIterator<IBindingSet[]> itr = sink @@ -201,18 +392,52 @@ sink.close(); } } + /* + * The allocation context. + * + * @todo use (queryId, serviceId, sinkId) when the target bop is + * high volume operator (this requires annotation by the query + * planner of the operator tree). + */ + final IAllocationContext allocationContext = getAllocationContext(new QueryContext( + getQueryId())); + + /* + * Generate the output chunks and notify the receivers. + * + * @todo This stage should probably be integrated with the stage + * which maps the binding sets over the shards (immediately above) + * to minimize copying or visiting in the data. + */ + for (Map.Entry<PartitionLocator, IBuffer<IBindingSet>> e : mapper + .getSinks().entrySet()) { + + final PartitionLocator locator = e.getKey(); + + final IBuffer<IBindingSet> shardSink = e.getValue(); + + // FIXME harmonize IBuffer<IBindingSet> vs IBuffer<IBindingSet[]> +// sendOutputChunkReadyMessage(newOutputChunk(locator +// .getDataServiceUUID(), sinkId, allocationContext, +// shardSink)); + throw new UnsupportedOperationException(); + } - throw new UnsupportedOperationException(); } case CONTROLLER: { - final IQueryClient clientProxy = getQueryController(); + /* + * Format the binding sets onto a ByteBuffer and publish that + * ByteBuffer as a manager resource for the query and notify the + * query controller that data is available for it. + */ -// getQueryEngine().getResourceService().port; -// -// clientProxy.bufferReady(clientProxy, serviceAddr, getQueryId(), sinkId); + final IAllocationContext allocationContext = getAllocationContext(new QueryContext( + getQueryId())); - throw new UnsupportedOperationException(); + sendOutputChunkReadyMessage(newOutputChunk(queryControllerUUID, + sinkId, allocationContext, sink)); + } default: throw new AssertionError(bop.getEvaluationContext()); @@ -220,4 +445,193 @@ } + /** + * Create an {@link OutputChunk} from some intermediate results. + * + * @param serviceUUID + * The {@link UUID} of the {@link IQueryPeer} who is the + * recipient. + * @param sinkId + * The identifier of the target {@link BOp}. + * @param allocationContext + * The allocation context within which the {@link ByteBuffer}s + * will be managed for this {@link OutputChunk}. + * @param source + * The binding sets to be formatted onto a buffer. + * + * @return The {@link OutputChunk}. + */ + protected OutputChunk newOutputChunk( + final UUID serviceUUID, + final int sinkId, + final IAllocationContext allocationContext, + final IBlockingBuffer<IBindingSet[]> source) { + + if (serviceUUID == null) + throw new IllegalArgumentException(); + + if (allocationContext == null) + throw new IllegalArgumentException(); + + if (source == null) + throw new IllegalArgumentException(); + + int nbytes = 0; + + final List<IAllocation> allocations = new LinkedList<IAllocation>(); + + final IAsynchronousIterator<IBindingSet[]> itr = source.iterator(); + + try { + + while (itr.hasNext()) { + + // Next chunk to be serialized. + final IBindingSet[] chunk = itr.next(); + + // serialize the chunk of binding sets. + final byte[] data = SerializerUtil.serialize(chunk); + + // track size of the allocations. + nbytes += data.length; + + // allocate enough space for those data. + final IAllocation[] tmp; + try { + tmp = allocationContext.alloc(data.length); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + + // copy the data into the allocations. + DirectBufferPoolAllocator.put(data, tmp); + + // append the new allocations. + allocations.addAll(Arrays.asList(tmp)); + + } + + } finally { + + itr.close(); + + } + + return new OutputChunk(getQueryId(), serviceUUID, sinkId, nbytes, + allocations); + + } + + protected IQueryPeer getQueryPeer(final UUID serviceUUID) { + + if (serviceUUID == null) + throw new IllegalArgumentException(); + + final IQueryPeer queryPeer; + + if (serviceUUID.equals(queryControllerUUID)) { + + // The target is the query controller. + queryPeer = getQueryController(); + + } else { + + // The target is some data service. + queryPeer = getQueryEngine().getQueryPeer(serviceUUID); + + } + + return queryPeer; + + } + + /** + * Notify a remote {@link IQueryPeer} that data is available for it. + * + * @todo If the target for the {@link OutputChunk} is this node then just + * drop it onto the {@link QueryEngine}. + * + * @todo Report the #of bytes available with this message. However, first + * figure out if that if the #of bytes in this {@link OutputChunk} or + * across all {@link OutputChunk}s available for the target service + * and sink. + * + * @todo Consider a fast path with inline RMI based transfer for small sets + * of data. We might just serialize to a byte[] and send that directly + * using a different message to notify the {@link IQueryPeer}. + */ + protected void sendOutputChunkReadyMessage(final OutputChunk outputChunk) { + + try { + + // The peer to be notified. + final IQueryPeer peerProxy = getQueryPeer(outputChunk.serviceId); + + // The Internet address and port where the peer can read the data + // from this node. + final InetSocketAddress serviceAddr = getQueryEngine() + .getResourceService().getAddr(); + + peerProxy.bufferReady(getQueryController(), serviceAddr, + getQueryId(), outputChunk.sinkId); + + } catch (RemoteException e) { + + throw new RuntimeException(e); + + } + + } + + /** + * A chunk of outputs. + * + * @todo We probably need to use the {@link DirectBufferPoolAllocator} to + * receive the chunks within the {@link ManagedResourceService} as + * well. + * + * @todo Release the allocations associated with each output chunk once it + * is received by the remote service. + * <p> + * When the query terminates all output chunks targeting any node + * EXCEPT the query controller should be immediately dropped. + * <p> + * If there is an error during query evaluation, then the output + * chunks for the query controller should be immediately dropped. + * <p> + * If the iterator draining the results on the query controller is + * closed, then the output chunks for the query controller should be + * immediately dropped. + * + * @todo There are a few things where the resource must be made available to + * more than one operator evaluation phase. The best examples are + * temporary graphs for parallel closure and large collections of + * graphIds for SPARQL "NAMED FROM DATA SET" extensions. + */ + private static class OutputChunk { + + final long queryId; + + final UUID serviceId; + + final int sinkId; + + final int nbytes; + + final List<IAllocation> allocations; + + public OutputChunk(final long queryId, final UUID serviceId, + final int sinkId, final int nbytes, + final List<IAllocation> allocations) { + + this.queryId = queryId; + this.serviceId = serviceId; + this.sinkId = sinkId; + this.nbytes = nbytes; + this.allocations = allocations; + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -50,6 +50,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; +import com.bigdata.bop.IShardwisePipelineOp; import com.bigdata.bop.IVariable; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.BytesUtil; @@ -93,7 +94,8 @@ * @todo Break the star join logic out into its own join operator and test * suite. */ -public class PipelineJoin extends BindingSetPipelineOp { +public class PipelineJoin<E> extends BindingSetPipelineOp implements + IShardwisePipelineOp<E> { static private final Logger log = Logger.getLogger(PipelineJoin.class); @@ -256,22 +258,36 @@ } - protected BindingSetPipelineOp left() { + /** + * The left hand operator, which is the previous join in the pipeline join + * path. + */ + public BindingSetPipelineOp left() { return (BindingSetPipelineOp) get(0); } - protected IPredicate<?> right() { + /** + * The right hand operator, which is the {@link IPredicate}. + */ + @SuppressWarnings("unchecked") + public IPredicate<E> right() { - return (IPredicate<?>) get(1); + return (IPredicate<E>) get(1); } + + public IPredicate<E> getPredicate() { + + return right(); + + } /** * @see Annotations#CONSTRAINTS */ - protected IConstraint[] constraints() { + public IConstraint[] constraints() { return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); @@ -280,7 +296,7 @@ /** * @see Annotations#OPTIONAL */ - protected boolean isOptional() { + public boolean isOptional() { return getProperty(Annotations.OPTIONAL, Annotations.DEFAULT_OPTIONAL); @@ -289,7 +305,7 @@ /** * @see Annotations#MAX_PARALLEL */ - protected int getMaxParallel() { + public int getMaxParallel() { return getProperty(Annotations.MAX_PARALLEL, Annotations.DEFAULT_MAX_PARALLEL); @@ -298,7 +314,7 @@ /** * @see Annotations#SELECT */ - protected IVariable<?>[] variablesToKeep() { + public IVariable<?>[] variablesToKeep() { return getProperty(Annotations.SELECT, null/* defaultValue */); @@ -325,7 +341,7 @@ /** * The join that is being executed. */ - final private PipelineJoin joinOp; + final private PipelineJoin<?> joinOp; /** * The constraint (if any) specified for the join operator. @@ -450,7 +466,7 @@ * @param context */ public JoinTask(// - final PipelineJoin joinOp,// + final PipelineJoin<?> joinOp,// final BOpContext<IBindingSet> context ) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-08 20:11:49 UTC (rev 3523) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -27,8 +27,6 @@ package com.bigdata.bop.mutation; -import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -39,7 +37,7 @@ import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; -import com.bigdata.bop.IVariableOrConstant; +import com.bigdata.bop.IShardwisePipelineOp; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.btree.ITupleSerializer; @@ -59,7 +57,8 @@ * @param <E> * The generic type of the elements written onto the index. */ -public class InsertOp<E> extends BindingSetPipelineOp { +public class InsertOp<E> extends BindingSetPipelineOp implements + IShardwisePipelineOp<E> { /** * @@ -69,12 +68,12 @@ public interface Annotations extends BindingSetPipelineOp.Annotations { /** - * An ordered {@link IVariableOrConstant}[]. Elements will be created - * using the binding sets which flow through the operator and - * {@link IRelation#newElement(java.util.List, IBindingSet)}. + * An {@link IPredicate}. The {@link IPredicate#asBound(IBindingSet)} + * predicate will be used to create the elements to be inserted into + * the relation. * - * @todo This should be an {@link IPredicate} and should be the right - * hand operand just like for a JOIN. + * @see IPredicate#asBound(IBindingSet) + * @see IRelation#newElement(java.util.List, IBindingSet) */ String SELECTED = InsertOp.class.getName() + ".selected"; @@ -116,9 +115,8 @@ /** * @see Annotations#SELECTED */ - public IVariableOrConstant<?>[] getSelected() { + public IPredicate<E> getPredicate() { -// return (IVariableOrConstant<?>[]) getProperty(Annotations.SELECTED); return getRequiredProperty(Annotations.SELECTED); } @@ -164,7 +162,7 @@ */ private final IBlockingBuffer<IBindingSet[]> sink; - private List<IVariableOrConstant<?>> selected; + private IPredicate<E> predicate; private final IRelation<E> relation; @@ -181,7 +179,7 @@ sink = context.getSink(); - selected = Arrays.asList(op.getSelected()); + predicate = op.getPredicate(); relation = context.getWriteRelation(op.getRelation()); @@ -229,7 +227,7 @@ final IBindingSet bset = chunk[i]; - final E e = relation.newElement(selected, bset); + final E e = relation.newElement(predicate.args(), bset); final byte[] key = keyOrder.getKey(keyBuilder, e); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPoolAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPoolAllocator.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/io/DirectBufferPoolAllocator.java 2010-09-08 20:52:07 UTC (rev 3524) @@ -0,0 +1,653 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 8, 2010 + */ + +package com.bigdata.io; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +import com.bigdata.service.ResourceService; +import com.bigdata.util.concurrent.Haltable; + +/** + * An allocator for {@link ByteBuffer} slices backed by direct + * {@link ByteBuffer}s allocated against a {@link DirectBufferPool}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo Make the type of the identifier for the {@link IAllocation} generic + * using a factory pattern (we need {@link UUID} for scale-out, but the + * class could be reused for other purposes as well). [The allocation + * context identifier should continue to be an application specified + * object.] + */ +public class DirectBufferPoolAllocator { + + /** + * The pool from which the direct {@link ByteBuffer}s are allocated. + */ + private final DirectBufferPool directBufferPool; + + /** + * The set of allocation contexts. + */ + private final ConcurrentHashMap<Object/* key */, AllocationContext> allocationContexts = new ConcurrentHashMap<Object, AllocationContext>(); + + /** + * The set of {@link IAllocation} outstanding against the + * {@link #directBufferPool}. + */ + private final ConcurrentHashMap<UUID, Allocation> allocations = new ConcurrentHashMap<UUID, Allocation>(); + + /** + * @todo Maybe replace this with a private {@link Haltable} (or extend + * {@link Haltable}) so we can test {@link Haltable#halted()} in + * critical methods? If we expose the {@link Haltable} then the + * {@link ResourceService} can also check it to see whether all + * allocations have been invalidated. However, that will not help us + * to invalidate a specific {@link IAllocationContext}. For that + * purpose we would need to do pretty much the same thing recursively. + */ + private final AtomicBoolean open = new AtomicBoolean(true); + + /** + * + * @param pool + * The pool from which the direct {@link ByteBuffer}s are + * allocated. + */ + public DirectBufferPoolAllocator(final DirectBufferPool pool) { + + this.directBufferPool = pool; + + } + + /** + * Extended to {@link #close()} the allocator. + */ + @Override + protected void finalize() throws Throwable { + + close(); + + super.finalize(); + + } + + /** + * Releases all {@link AllocationContext}s and all direct {@link ByteBuffer} + * s which they are using. + */ + public void close() { + + if (open.compareAndSet(true/* expect */, false/* update */)) { + + for (AllocationContext c : allocationContexts.values()) { + + c.release(); + + } + + } + + } + + /** + * The maximum #of bytes in a single {@link IAllocation}. + */ + public int getMaxSlotSize() { + + return directBufferPool.getBufferCapacity(); + + } + + /** + * Return an allocation context for the key. If none exists for that key, + * then one is atomically created and returned. + * + * @param key + * A key which uniquely identifies that context. The key will be + * inserted into a hash table and therefore must have appropriate + * hashCode() and equals() methods. + * + * @return The allocation context. + */ + public IAllocationContext getAllocationContext(final Object key) { + + AllocationContext c = allocationContexts.get(key); + + if (c == null) { + + final AllocationContext t = allocationContexts.putIfAbsent(key, + c = new AllocationContext(key)); + + if (t != null) { + + // lost the race to another thread. + c = t; + + } + + } + + return c; + + } + + /** + * Return the allocation associated with that id. + * + * @param id + * The allocation identifier. + * + * @return The allocation -or- <code>null</code> if there is no such + * allocation. + */ + public IAllocation getAllocation(final UUID id) { + + return allocations.get(id); + + } + +// /** +// * A direct {@link ByteBuffer} allocated from the {@link #directBufferPool} +// * together with the identifier assigned to that {@link ByteBuffer} (we can +// * not directly insert {@link ByteBuffer}s into the keys of a hash map since +// * their hash code is a function of their content). +// */ +// private class DirectBufferAllocation { +// +// private final Long id; +// +// private final ByteBuffer directBuffer; +// +// public DirectBufferAllocation(final Long id, +// final ByteBuffer directBuffer) { +// +// if (id == null) +// throw new IllegalArgumentException(); +// +// if (directBuffer == null) +// throw new IllegalArgumentException(); +// +// this.id = id; +// +// this.directBuffer = directBuffer; +// +// } +// +// } + + /** + * An allocation context links some application specified key with a list + * of direct {@link ByteBuffer}s on which allocations have been made by + * the application. + */ + public interface IAllocationContext { + + /** + * Allocate a series of {@link ByteBuffer} slices on which the + * application may write data. The application is encouraged to maintain + * the order of the allocations in the array in order to preserve the + * ordering of data written onto those allocation. + * + * @param nbytes + * The #of bytes required. + * + * @return The {@link UUID}s of those allocations. + * + * @throws InterruptedException + */ + IAllocation[] alloc(int nbytes) throws InterruptedException; + + /** + * Release all allocations made against this allocation context. + */ + void release(); + + } + + /** + * An allocation against a direct {@link ByteBuffer}. + */ + public interface IAllocation { + + /** The allocation identifier. */ + public UUID getId(); + + /** + * The allocated {@link ByteBuffer#slice()}. + */ + public ByteBuffer getSlice(); + + /** + * Release this allocation. + * <p> + * Note: The implementation is encouraged to release the associated + * direct {@link ByteBuffer} if there are no remaining allocations + * against it and MAY made the slice of the buffer available for + * reallocation. + * <p> + * Note: An {@link InterruptedException} MAY be thrown. This allows us + * to handle cases where a concurrent process (such as a query) was + * halted and its component threads were interrupted. By looking for the + * interrupt, we can avoid attempts to release an allocation in some + * thread where the entire {@link IAllocationContext} has already been + * released by another thread. + * + * @throws InterruptedException + */ + public void release() throws InterruptedException; + + } + + /** + * An allocation against a direct {@link ByteBuffer}. + */ + // Note: package private for the unit tests. + /*private*/ class Allocation implements IAll... [truncated message content] |