From: <tho...@us...> - 2010-09-25 00:31:41
|
Revision: 3630 http://bigdata.svn.sourceforge.net/bigdata/?rev=3630&view=rev Author: thompsonbry Date: 2010-09-25 00:31:32 +0000 (Sat, 25 Sep 2010) Log Message: ----------- Added support for RMI access paths. All operators now assume "ANY" as their evaluation context. This means that you must explicitly override the evaluation context for scale-out JOINS in order to use shard-partitioned access paths versus remote access paths. The default of "ANY" is not valid for some operators. Such operators now check in their shallow copy constructor to verify that a legal evaluation context was explicitly set. At this point we have the tools to evaluate a default graph query in scale-out. The Rule2BOpUtility needs to be modified to become aware of standalone versus scale-out and to use the appropriate operator patterns for standalone and scale-out. The correct operator pattern depends on a number of factors. I will be working up a cost model and pulling the logic out of DefaultGraphSolutionExpander and NamedGraphSolutionExpander. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/AbstractChunkedOrderedIteratorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ChunkedOrderedIteratorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ndx/AbstractSampleIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SliceOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractRelation.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestPredicateAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestPipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestSliceOp.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestRemoteAccessPath.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/AbstractChunkedOrderedIteratorOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/AbstractChunkedOrderedIteratorOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/AbstractChunkedOrderedIteratorOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -90,13 +90,13 @@ } - protected int getFullyBufferedReadThreshold() { +// protected int getFullyBufferedReadThreshold() { +// +// return getProperty(Annotations.FULLY_BUFFERED_READ_THRESHOLD, +// Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); +// +// } - return getProperty(Annotations.FULLY_BUFFERED_READ_THRESHOLD, - Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); - - } - protected long getChunkTimeout() { return getProperty(Annotations.CHUNK_TIMEOUT, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -161,10 +161,8 @@ int getId(); /** - * Return the evaluation context for the operator. The default is - * {@link BOpEvaluationContext#ANY}. Operators which must be mapped against - * shards, mapped against nodes, or evaluated on the query controller must - * override this method. + * Return the evaluation context for the operator as specified by + * {@link Annotations#EVALUATION_CONTEXT}. */ BOpEvaluationContext getEvaluationContext(); @@ -245,6 +243,14 @@ String TIMESTAMP = BOp.class.getName() + ".timestamp"; /** + * This annotation determines where an operator will be evaluated + * (default {@value #DEFAULT_EVALUATION_CONTEXT}). + */ + String EVALUATION_CONTEXT = BOp.class.getName() + ".evaluationContext"; + + BOpEvaluationContext DEFAULT_EVALUATION_CONTEXT = BOpEvaluationContext.ANY; + + /** * For hash partitioned operators, this is the set of the member nodes * for the operator. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -362,16 +362,11 @@ } - /** - * The default implementation returns {@link BOpEvaluationContext#ANY} and - * must be overridden by operators which have a different {@link BOpEvaluationContext}. - * <p> - * {@inheritDoc} - */ - public BOpEvaluationContext getEvaluationContext() { - - return BOpEvaluationContext.ANY; - + final public BOpEvaluationContext getEvaluationContext() { + + return getProperty(Annotations.EVALUATION_CONTEXT, + Annotations.DEFAULT_EVALUATION_CONTEXT); + } public final boolean isMutation() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -38,6 +38,7 @@ import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.TimestampUtility; +import com.bigdata.relation.AbstractRelation; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.IAccessPath; @@ -50,7 +51,7 @@ import com.bigdata.striterator.IKeyOrder; /** - * The evaluation context for the operator (NOT serializable). + * Base class for the bigdata operation evaluation context (NOT serializable). * * @param <E> * The generic type of the objects processed by the operator. @@ -147,12 +148,13 @@ * order to support mutation operator we will also have to pass in the * {@link #writeTimestamp} or differentiate this in the method name. */ - public IRelation getRelation(final IPredicate<?> pred) { + @SuppressWarnings("unchecked") + public <E> IRelation<E> getRelation(final IPredicate<E> pred) { /* * Note: This uses the federation as the index manager when locating a - * resource for scale-out. However, s/o reads must use the local index - * manager when actually obtaining the index view for the relation. + * resource for scale-out since that let's us look up the relation in + * the global row store, which is being used as a catalog. */ final IIndexManager tmp = getFederation() == null ? getIndexManager() : getFederation(); @@ -160,7 +162,7 @@ final long timestamp = (Long) pred .getRequiredProperty(BOp.Annotations.TIMESTAMP); - return (IRelation<?>) tmp.getResourceLocator().locate( + return (IRelation<E>) tmp.getResourceLocator().locate( pred.getOnlyRelationName(), timestamp); } @@ -194,11 +196,25 @@ * Obtain an access path reading from relation for the specified predicate * (from the tail of some rule). * <p> - * Note that passing in the {@link IRelation} is important since it - * otherwise must be discovered using the {@link IResourceLocator}. By - * requiring the caller to resolve it before hand and pass it into this - * method the contention and demand on the {@link IResourceLocator} cache is - * reduced. + * Note: Passing in the {@link IRelation} is important since it otherwise + * must be discovered using the {@link IResourceLocator}. By requiring the + * caller to resolve it before hand and pass it into this method the + * contention and demand on the {@link IResourceLocator} cache is reduced. + * <p> + * <h2>Scale-Out</h2> + * <p> + * Note: You MUST be extremely careful when using expanders with a local + * access path for a shared-partitioned or hash-partitioned index. Only + * expanders whose semantics remain valid with a partial view of the index + * will behave as expected. Here are some examples that DO NOT work: + * <ul> + * <li>"DISTINCT" on a partitioned local access path is not coherent</li> + * <li>Expanders which generate reads against keys not found on that shard + * are not coherent.</li> + * </ul> + * If you have requirements such as these, then either use a remote access + * path or change your query plan design more radically to take advantage of + * efficient shard-wise scans in scale-out. * * @param relation * The relation. @@ -211,19 +227,39 @@ * * @todo replaces * {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. + * + * @todo Reconcile with IRelation#getAccessPath(IPredicate) once the bop + * conversion is done. It has much of the same logic (this also + * handles remote access paths now). + * + * @todo Support mutable relation views. */ - @SuppressWarnings("unchecked") - public IAccessPath<?> getAccessPath(final IRelation<?> relation, - final IPredicate<?> predicate) { +// @SuppressWarnings("unchecked") + public <E> IAccessPath<E> getAccessPath(final IRelation<E> relation, + final IPredicate<E> predicate) { if (relation == null) throw new IllegalArgumentException(); if (predicate == null) throw new IllegalArgumentException(); - // FIXME This should be as assigned by the query planner so the query is fully declarative. - final IKeyOrder keyOrder = relation.getKeyOrder((IPredicate) predicate); + /* + * FIXME This should be as assigned by the query planner so the query is + * fully declarative. + */ + final IKeyOrder<E> keyOrder; + { + final IKeyOrder<E> tmp = predicate.getKeyOrder(); + if (tmp != null) { + // use the specified index. + keyOrder = tmp; + } else { + // ask the relation for the best index. + keyOrder = relation.getKeyOrder(predicate); + } + } + if (keyOrder == null) throw new RuntimeException("No access path: " + predicate); @@ -233,26 +269,24 @@ .getRequiredProperty(BOp.Annotations.TIMESTAMP); final int flags = predicate.getProperty( - PipelineOp.Annotations.FLAGS, - PipelineOp.Annotations.DEFAULT_FLAGS) + IPredicate.Annotations.FLAGS, + IPredicate.Annotations.DEFAULT_FLAGS) | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY : 0); final int chunkOfChunksCapacity = predicate.getProperty( - PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, - PipelineOp.Annotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); + BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, + BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); final int chunkCapacity = predicate.getProperty( - PipelineOp.Annotations.CHUNK_CAPACITY, - PipelineOp.Annotations.DEFAULT_CHUNK_CAPACITY); + BufferAnnotations.CHUNK_CAPACITY, + BufferAnnotations.DEFAULT_CHUNK_CAPACITY); final int fullyBufferedReadThreshold = predicate.getProperty( - PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD, - PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); + IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, + IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); - final IIndexManager indexManager = getIndexManager(); - - if (predicate.getPartitionId() != -1) { + if (partitionId != -1) { /* * Note: This handles a read against a local index partition. For @@ -269,12 +303,14 @@ // return ((AbstractRelation<?>) relation) // .getAccessPathForIndexPartition(indexManager, // (IPredicate) predicate); + /* - * @todo This condition should probably be an error since the expander - * will be ignored. + * @todo This is an error since expanders are currently ignored on + * shard-wise access paths. While it is possible to enable expanders + * for shard-wise access paths. */ -// if (predicate.getSolutionExpander() != null) -// throw new IllegalArgumentException(); + if (predicate.getSolutionExpander() != null) + throw new IllegalArgumentException(); final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); @@ -286,60 +322,70 @@ final ILocalBTreeView ndx = (ILocalBTreeView) indexManager .getIndex(name, timestamp); - return new AccessPath(relation, indexManager, timestamp, + return new AccessPath<E>(relation, indexManager, timestamp, predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, chunkCapacity, fullyBufferedReadThreshold).init(); } - /* - * Find the best access path for the predicate for that relation. - * - * @todo Replace this with IRelation#getAccessPath(IPredicate) once the - * bop conversion is done. It is the same logic. - */ - IAccessPath accessPath; - { - // accessPath = relation.getAccessPath((IPredicate) predicate); - final IIndex ndx = relation.getIndex(keyOrder); + // Decide on a local or remote view of the index. + final IIndexManager indexManager; + if (predicate.isRemoteAccessPath()) { + // use federation in scale-out for a remote access path. + indexManager = fed != null ? fed : this.indexManager; + } else { + indexManager = this.indexManager; + } - if (ndx == null) { - - throw new IllegalArgumentException("no index? relation=" - + relation.getNamespace() + ", timestamp=" - + timestamp + ", keyOrder=" + keyOrder + ", pred=" - + predicate + ", indexManager=" + getIndexManager()); + // Obtain the index. + final String fqn = AbstractRelation.getFQN(relation, keyOrder); + final IIndex ndx = AbstractRelation.getIndex(indexManager, fqn, timestamp); - } + if (ndx == null) { - accessPath = new AccessPath((IRelation) relation, indexManager, - timestamp, (IPredicate) predicate, - (IKeyOrder) keyOrder, ndx, flags, chunkOfChunksCapacity, - chunkCapacity, fullyBufferedReadThreshold).init(); + throw new IllegalArgumentException("no index? relation=" + + relation.getNamespace() + ", timestamp=" + timestamp + + ", keyOrder=" + keyOrder + ", pred=" + predicate + + ", indexManager=" + getIndexManager()); } - - /* - * @todo No expander's for bops, at least not right now. They could be - * added in easily enough, which would support additional features for - * standalone query evaluation (runtime materialization of some - * entailments). - * - * FIXME temporarily enabled expanders (mikep) - */ - final ISolutionExpander<?> expander = predicate.getSolutionExpander(); - - if (expander != null) { - - // allow the predicate to wrap the access path - accessPath = expander.getAccessPath(accessPath); - - } - // return that access path. + // Obtain the access path for that relation and index. + final IAccessPath<E> accessPath = new AccessPath<E>( + relation, indexManager, timestamp, + predicate, keyOrder, ndx, flags, + chunkOfChunksCapacity, chunkCapacity, + fullyBufferedReadThreshold).init(); + + // optionally wrap with an expander pattern. + return expander(predicate, accessPath); + + } + + /** + * Optionally wrap with an expander pattern. + * + * @param predicate + * @param accessPath + * @return + * @param <E> + */ + private <E> IAccessPath<E> expander(final IPredicate<E> predicate, + final IAccessPath<E> accessPath) { + + final ISolutionExpander<E> expander = predicate.getSolutionExpander(); + + if (expander != null) { + + // allow the predicate to wrap the access path + return expander.getAccessPath(accessPath); + + } + return accessPath; + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpEvaluationContext.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -22,7 +22,11 @@ * The operator may be evaluated anywhere, including piecewise evaluation on * any node of the cluster where its inputs are available. This is used for * operators which do not need to concentrate or coordinate their inputs - * such as {@link ConditionalRoutingOp}. + * such as {@link ConditionalRoutingOp}. It may also be used in combination + * with a remote access path to impose a DISTINCT filter across one or more + * shards or nodes. + * + * @see IPredicate.Annotations#REMOTE_ACCESS_PATH */ ANY, /** Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -0,0 +1,84 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 24, 2010 + */ + +package com.bigdata.bop; + +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.relation.accesspath.IBuffer; + +/** + * Annotations for {@link BlockingBuffer} as used by various kinds of operators. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface BufferAnnotations { + + /** + * The maximum #of chunks that can be buffered before an the producer would + * block (default {@value #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY}). Note that + * partial chunks may be combined into full chunks whose nominal capacity is + * specified by {@link #CHUNK_CAPACITY}. + */ + String CHUNK_OF_CHUNKS_CAPACITY = BlockingBuffer.class.getName() + + ".chunkOfChunksCapacity"; + + /** + * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} + */ + int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100; + + /** + * Sets the capacity of the {@link IBuffer}s used to accumulate a chunk of + * {@link IBindingSet}s (default {@value #CHUNK_CAPACITY}). Partial chunks + * may be automatically combined into full chunks. + * + * @see #CHUNK_OF_CHUNKS_CAPACITY + */ + String CHUNK_CAPACITY = IBuffer.class.getName() + ".chunkCapacity"; + + /** + * Default for {@link #CHUNK_CAPACITY} + */ + int DEFAULT_CHUNK_CAPACITY = 100; + + /** + * The timeout in milliseconds that the {@link BlockingBuffer} will wait for + * another chunk to combine with the current chunk before returning the + * current chunk (default {@value #DEFAULT_CHUNK_TIMEOUT}). This may be ZERO + * (0) to disable the chunk combiner. + */ + String CHUNK_TIMEOUT = BlockingBuffer.class.getName() + ".chunkTimeout"; + + /** + * The default for {@link #CHUNK_TIMEOUT}. + * + * @todo this is probably much larger than we want. Try 10ms. + */ + int DEFAULT_CHUNK_TIMEOUT = 20; + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ChunkedOrderedIteratorOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ChunkedOrderedIteratorOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ChunkedOrderedIteratorOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -1,17 +1,7 @@ package com.bigdata.bop; -import com.bigdata.btree.ILocalBTreeView; -import com.bigdata.journal.IIndexManager; -import com.bigdata.rawstore.Bytes; -import com.bigdata.relation.accesspath.AccessPath; -import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IAccessPath; -import com.bigdata.relation.accesspath.IBuffer; -import com.bigdata.relation.rule.eval.IJoinNexus; -import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.IDataService; import com.bigdata.striterator.IChunkedOrderedIterator; -import com.bigdata.striterator.ICloseableIterator; /** * Interface for evaluating operations producing chunks of elements (tuples @@ -25,96 +15,11 @@ public interface ChunkedOrderedIteratorOp<E> extends BOp { /** - * Well known annotations pertaining to the binding set pipeline. + * Well known annotations. */ - public interface Annotations extends BOp.Annotations { + public interface Annotations extends BOp.Annotations, BufferAnnotations { - /** - * The maximum #of chunks that can be buffered before an the producer - * would block (default {@value #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY}). - * Note that partial chunks may be combined into full chunks whose - * nominal capacity is specified by {@link #CHUNK_CAPACITY}. - */ - String CHUNK_OF_CHUNKS_CAPACITY = BlockingBuffer.class.getName() - + ".chunkOfChunksCapacity"; - /** - * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} - */ - int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 1000; - - /** - * Sets the capacity of the {@link IBuffer}s used to accumulate a chunk - * of {@link IBindingSet}s (default {@value #CHUNK_CAPACITY}). Partial - * chunks may be automatically combined into full chunks. - * - * @see #CHUNK_OF_CHUNKS_CAPACITY - */ - String CHUNK_CAPACITY = IBuffer.class.getName() + ".chunkCapacity"; - - /** - * Default for {@link #CHUNK_CAPACITY} - */ - int DEFAULT_CHUNK_CAPACITY = 100; - - /** - * The timeout in milliseconds that the {@link BlockingBuffer} will wait - * for another chunk to combine with the current chunk before returning - * the current chunk (default {@value #DEFAULT_CHUNK_TIMEOUT}). This may - * be ZERO (0) to disable the chunk combiner. - */ - String CHUNK_TIMEOUT = BlockingBuffer.class.getName() + ".chunkTimeout"; - - /** - * The default for {@link #CHUNK_TIMEOUT}. - * - * @todo this is probably much larger than we want. Try 10ms. - */ - int DEFAULT_CHUNK_TIMEOUT = 1000; - - /** - * If the estimated rangeCount for an - * {@link AccessPath#iterator()} is LTE this threshold then use - * a fully buffered (synchronous) iterator. Otherwise use an - * asynchronous iterator whose capacity is governed by - * {@link #CHUNK_OF_CHUNKS_CAPACITY}. - */ - String FULLY_BUFFERED_READ_THRESHOLD = AccessPath.class - .getName() - + ".fullyBufferedReadThreadshold"; - - /** - * Default for {@link #FULLY_BUFFERED_READ_THRESHOLD} - */ - int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 20*Bytes.kilobyte32; - } - /** - * Execute the operator, returning an iterator from which the element may be - * read. Operator evaluation may be halted using - * {@link ICloseableIterator#close()}. - * - * @param fed - * The {@link IBigdataFederation} IFF the operator is being - * evaluated on an {@link IBigdataFederation}. When evaluating - * operations against an {@link IBigdataFederation}, this - * reference provides access to the scale-out view of the indices - * and to other bigdata services. - * @param joinNexus - * An evaluation context with hooks for the <em>local</em> - * execution environment. When evaluating operators against an - * {@link IBigdataFederation} the {@link IJoinNexus} MUST be - * formulated with the {@link IIndexManager} of the local - * {@link IDataService} order perform efficient reads against the - * shards views as {@link ILocalBTreeView}s. It is an error if - * the {@link IJoinNexus#getIndexManager()} returns the - * {@link IBigdataFederation} since each read would use RMI. This - * condition should be checked by the operator implementation. - * - * @return An iterator from which the elements may be read. - */ - IChunkedOrderedIterator<E> eval(IBigdataFederation<?> fed, - IJoinNexus joinNexus); - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -30,6 +30,7 @@ import java.io.Serializable; +import com.bigdata.btree.IRangeQuery; import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.IRelation; @@ -61,7 +62,7 @@ /** * Interface declaring well known annotations. */ - public interface Annotations extends BOp.Annotations { + public interface Annotations extends BOp.Annotations, BufferAnnotations { /** * The name of the relation on which the predicate will read. @@ -104,6 +105,85 @@ * not address a specific shard. */ String PARTITION_ID = "partitionId"; + + int DEFAULT_PARTITION_ID = -1; + + /** + * Boolean option determines whether the predicate will use a local + * access path or a remote access path (default + * {@value #DEFAULT_REMOTE_ACCESS_PATH}). + * <p> + * <em>Local access paths</em> are much more efficient and should be + * used for most purposes. However, it is not possible to impose certain + * kinds of filters on a sharded or hash partitioned operations across + * local access paths. In particular, a DISTINCT filter can not be + * imposed using sharded or hash partitioned. + * <p> + * When the access path is local, the parent operator must be annotated + * to use a {@link BOpEvaluationContext#SHARDED shard wise} or + * {@link BOpEvaluationContext#HASHED node-wise} mapping of the binding + * sets. + * <p> + * <em>Remote access paths</em> use a scale-out index view. This view + * makes the scale-out index appear as if it were monolithic rather than + * sharded or hash partitioned. The monolithic view of a scale-out index + * can be used to impose a DISTINCT filter since all tuples will flow + * back to the caller. + * <p> + * When the access path is remote, the parent operator should use + * {@link BOpEvaluationContext#ANY} to prevent the binding sets from + * being moved around when the access path is remote. + * + * @see BOpEvaluationContext + */ + String REMOTE_ACCESS_PATH = "remoteAccessPath"; + + boolean DEFAULT_REMOTE_ACCESS_PATH = false; + + /** + * If the estimated rangeCount for an {@link AccessPath#iterator()} is + * LTE this threshold then use a fully buffered (synchronous) iterator. + * Otherwise use an asynchronous iterator whose capacity is governed by + * {@link #CHUNK_OF_CHUNKS_CAPACITY}. + * + * @see #DEFAULT_FULLY_BUFFERED_READ_THRESHOLD + */ + String FULLY_BUFFERED_READ_THRESHOLD = PipelineOp.class.getName() + + ".fullyBufferedReadThreshold"; + + /** + * Default for {@link #FULLY_BUFFERED_READ_THRESHOLD}. + * + * @todo Experiment with this. It should probably be something close to + * the branching factor, e.g., 100. + */ + int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 100; + + /** + * Flags for the iterator ({@link IRangeQuery#KEYS}, + * {@link IRangeQuery#VALS}, {@link IRangeQuery#PARALLEL}). + * <p> + * Note: The {@link IRangeQuery#PARALLEL} flag here is an indication + * that the iterator may run in parallel across the index partitions. + * This only effects scale-out and only for simple triple patterns since + * the pipeline join does something different (it runs inside the index + * partition using the local index, not the client's view of a + * distributed index). + * + * @see #DEFAULT_FLAGS + */ + String FLAGS = PipelineOp.class.getName() + ".flags"; + + /** + * The default flags will visit the keys and values of the non-deleted + * tuples and allows parallelism in the iterator (when supported). + * + * @todo consider making parallelism something that the query planner + * must specify explicitly. + */ + final int DEFAULT_FLAGS = IRangeQuery.KEYS | IRangeQuery.VALS + | IRangeQuery.PARALLEL; + } /** @@ -275,6 +355,13 @@ public int getVariableCount(IKeyOrder<E> keyOrder); /** + * Return <code>true</code> if this is a remote access path. + * + * @see Annotations#REMOTE_ACCESS_PATH + */ + public boolean isRemoteAccessPath(); + + /** * Return the variable or constant at the specified index. * * @param index Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -30,16 +30,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Level; -import org.apache.log4j.Priority; - import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.btree.IRangeQuery; -import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.relation.accesspath.IBuffer; /** * An pipeline operator reads from a source and writes on a sink. This is an @@ -62,103 +56,8 @@ /** * Well known annotations pertaining to the binding set pipeline. */ - public interface Annotations extends BOp.Annotations { + public interface Annotations extends BOp.Annotations, BufferAnnotations { - /** - * The maximum #of chunks that can be buffered before an the producer - * would block (default {@value #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY}). - * Note that partial chunks may be combined into full chunks whose - * nominal capacity is specified by {@link #CHUNK_CAPACITY}. - * - * @see #DEFAULT_CHUNK_OF_CHUNKS_CAPACITY - */ - String CHUNK_OF_CHUNKS_CAPACITY = PipelineOp.class.getName() - + ".chunkOfChunksCapacity"; - - /** - * Default for {@link #CHUNK_OF_CHUNKS_CAPACITY} - * - * @todo was 100. dialed down to reduce heap consumption for arrays. - * test performance @ 100 and 1000. - */ - int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100; - - /** - * Sets the capacity of the {@link IBuffer}s used to accumulate a chunk - * of {@link IBindingSet}s (default {@value #CHUNK_CAPACITY}). Partial - * chunks may be automatically combined into full chunks. - * - * @see #DEFAULT_CHUNK_CAPACITY - * @see #CHUNK_OF_CHUNKS_CAPACITY - */ - String CHUNK_CAPACITY = PipelineOp.class.getName() + ".chunkCapacity"; - - /** - * Default for {@link #CHUNK_CAPACITY} - */ - int DEFAULT_CHUNK_CAPACITY = 100; - - /** - * The timeout in milliseconds that the {@link BlockingBuffer} will wait - * for another chunk to combine with the current chunk before returning - * the current chunk (default {@value #DEFAULT_CHUNK_TIMEOUT}). This may - * be ZERO (0) to disable the chunk combiner. - * - * @see #DEFAULT_CHUNK_TIMEOUT - */ - String CHUNK_TIMEOUT = PipelineOp.class.getName() + ".chunkTimeout"; - - /** - * The default for {@link #CHUNK_TIMEOUT}. - * - * @todo Experiment with values for this. Low values will push chunks - * through quickly. High values will cause chunks to be combined - * and move larger chunks around. [But if we factor BlockingBuffer - * out of the query engine then this will go away]. - */ - int DEFAULT_CHUNK_TIMEOUT = 20; - - /** - * If the estimated rangeCount for an {@link AccessPath#iterator()} is - * LTE this threshold then use a fully buffered (synchronous) iterator. - * Otherwise use an asynchronous iterator whose capacity is governed by - * {@link #CHUNK_OF_CHUNKS_CAPACITY}. - * - * @see #DEFAULT_FULLY_BUFFERED_READ_THRESHOLD - */ - String FULLY_BUFFERED_READ_THRESHOLD = PipelineOp.class.getName() - + ".fullyBufferedReadThreshold"; - - /** - * Default for {@link #FULLY_BUFFERED_READ_THRESHOLD}. - * - * @todo Experiment with this. It should probably be something close to - * the branching factor, e.g., 100. - */ - int DEFAULT_FULLY_BUFFERED_READ_THRESHOLD = 100; - - /** - * Flags for the iterator ({@link IRangeQuery#KEYS}, - * {@link IRangeQuery#VALS}, {@link IRangeQuery#PARALLEL}). - * <p> - * Note: The {@link IRangeQuery#PARALLEL} flag here is an indication - * that the iterator may run in parallel across the index partitions. - * This only effects scale-out and only for simple triple patterns since - * the pipeline join does something different (it runs inside the index - * partition using the local index, not the client's view of a - * distributed index). - * - * @see #DEFAULT_FLAGS - */ - String FLAGS = PipelineOp.class.getName() + ".flags"; - - /** - * The default flags will visit the keys and values of the non-deleted - * tuples and allows parallelism in the iterator (when supported). - */ - final int DEFAULT_FLAGS = IRangeQuery.KEYS | IRangeQuery.VALS - | IRangeQuery.PARALLEL; - } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -42,14 +42,9 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; -import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.ITx; -import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.relation.rule.ISolutionExpander; -import com.bigdata.relation.rule.eval.IJoinNexus; -import com.bigdata.service.IBigdataFederation; -import com.bigdata.striterator.IChunkedOrderedIterator; import com.bigdata.striterator.IKeyOrder; /** @@ -183,8 +178,9 @@ public int getPartitionId() { - return (Integer)annotations.get(Annotations.PARTITION_ID); - + return (Integer) getProperty(Annotations.PARTITION_ID, + Annotations.DEFAULT_PARTITION_ID); + } @SuppressWarnings("unchecked") @@ -256,6 +252,11 @@ } return nunbound; } + + final public boolean isRemoteAccessPath() { + return getProperty(Annotations.REMOTE_ACCESS_PATH, + Annotations.DEFAULT_REMOTE_ACCESS_PATH); + } public Predicate<E> asBound(final IBindingSet bindingSet) { @@ -493,27 +494,4 @@ */ private int hash = 0; - /** - * @todo This does not allow us to override the iterator behavior based on - * the annotations. It also provides expander logic for scaleup and - * handles reading on a shard. It ignores the {@link IKeyOrder} - * associated with the {@link IPredicate} and there is no way to - * specify the {@link IRangeQuery} flags. - */ - @SuppressWarnings("unchecked") - public IChunkedOrderedIterator<E> eval(final IBigdataFederation<?> fed, - final IJoinNexus joinNexus) { - - // Resolve the relation name to the IRelation object. - final IRelation<E> relation = (IRelation<E>) joinNexus - .getTailRelationView(this/* predicate */); - - if (relation == null) - throw new RuntimeException("Not found: " + getOnlyRelationName()); - - return joinNexus.getTailAccessPath(relation, this/* predicate */) - .iterator(); - - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -3,7 +3,6 @@ import java.util.Map; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpEvaluationContext; /** * A version of {@link CopyBindingSetOp} which is always evaluated on the query @@ -21,11 +20,18 @@ } public StartOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); + + super(args, annotations); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + } - final public BOpEvaluationContext getEvaluationContext() { - return BOpEvaluationContext.CONTROLLER; - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContextBase; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IConstraint; @@ -48,6 +49,7 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.bset.StartOp; import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.solutions.SliceOp; import com.bigdata.rdf.sail.BigdataSail; import com.bigdata.relation.rule.IProgram; import com.bigdata.relation.rule.IRule; @@ -108,6 +110,8 @@ final BindingSetPipelineOp startOp = new StartOp(new BOp[] {}, NV.asMap(new NV[] {// new NV(Predicate.Annotations.BOP_ID, bopId++),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// })); /* @@ -210,6 +214,9 @@ constraints.size() > 0 ? constraints.toArray(new IConstraint[constraints.size()]) : null),// new NV(PipelineJoin.Annotations.OPTIONAL, pred.isOptional()),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// })); left = joinOp; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/notes.txt 2010-09-25 00:31:32 UTC (rev 3630) @@ -1,15 +1,37 @@ RunningQuery: -- FIXME Raise this into an annotation that we can tweak from the unit - tests and then debug the problem. [Write unit tests at the - RunState level.] - -- FIXME Add an annotation or method to mark operators which must be - evaluated using operator-at-a-time evaluation. SORT is the - main example here (it must be operator at a time of necessity) - but other operators may implemented with operator at a time - assumptions. Add a unit tests for sort on the query engine. + - TestJiniFederatedQueryEngine. + - Join=ANY, Predicate=RMI. + + - Unit tests of the default and named graph access path patterns. + + - Cost model for the default graph and named graph access path + patterns so we can choose the right one for each query. + + - Subqueries {Union,Steps,Star}. Implement subquery support. (We + can't test Star until we have the mutation API in place.) + + - PipelineType {Vectored,OneShot}. + + A vectored operator processes its inputs in chunks, producing + output chunks each time it runs. + + An one shot operator runs exactly once for a given query and + must wait for all of its inputs to become available before it + can begin. For example, SORT is a one shot operator. + + - Mutation {Insert, Delete}. Concurrency control. Scale-out + mutation operators must go through the ConcurrencyManager in order + to respect the isolation levels imposed within AbstractTask. For + standalone, we can using either UnisolatedReadWriteIndex or the + ConcurrencyManager as appropriate (but how can we tell which is + appropriate!?!). + + - Mutation {Create, Destroy}. This gets into resource management, + so defer for the moment but tackle in the context of RDFS closure + using STAR. + - MemoryType {Chunked,Blocked}. Blocked operators need to inherit some interface which @@ -56,16 +78,6 @@ */ Blocked, - - - PipelineType {Vectored,OneShot}. - - A vectored operator processes its inputs in chunks, producing - output chunks each time it runs. - - An one shot operator runs exactly once for a given query and - must wait for all of its inputs to become available before it - can begin. For example, SORT is a one shot operator. - Note: Many of the maxParallel annotations related to thread consumption will go away with Java7 and async file IO. Other annotations, such as the #of 1M buffers to allocate to an operator, @@ -210,8 +222,12 @@ unbound and then applies an IN filter. This works better because we can handle the reads on the SPOC index with C unbound very efficiently in scale-out by using a multi-block iterator on the - index segments. + index segments. [However, we must still impose DISTINCT on the + access path.] + - For very high volume operations we could do distributed merge + sorts to impose distinct and do operator at a time processing. + - @todo Add annotation to Predicate to indicate the use of an RMI access path in scale-out. Modify PipelineJoin such that it can be used as an ANY operator -or- a SHARDED operator. For default graph @@ -318,9 +334,6 @@ ==== Features: - - (***) Fix termination problems in RunningQuery relating to binding - set chunk / chunk message multiplicity. - - operator-at-once evaluation. The operator is triggered once its possible triggers are done. This is just an application of the same utility method which we use to decide when a query is done. @@ -350,7 +363,6 @@ used for fast computation of the delta between two historical commit points. - * FIXME Unit tests for non-distinct {@link IElementFilter}s on an * {@link IPredicate}, unit tests for distinct element filter on an * {@link IPredicate} which is capable of distributed operations. Do not use Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -34,9 +34,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; @@ -189,6 +187,15 @@ if (sampleSize <= 0) throw new IllegalArgumentException(); + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + V = new Vertex[v.length]; for (int i = 0; i < v.length; i++) { @@ -263,14 +270,4 @@ } - /** - * This operator must be evaluated on the query controller. - */ - @Override - public BOpEvaluationContext getEvaluationContext() { - - return BOpEvaluationContext.CONTROLLER; - - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -87,6 +87,12 @@ * Note: In order to support pipelining, query plans need to be arranged in a * "left-deep" manner and there may not be intervening operators between the * pipeline join operator and the {@link IPredicate} on which it will read. + * <p> + * Note: In scale-out, the {@link PipelineJoin} is generally annotated as a + * {@link BOpEvaluationContext#SHARDED} or {@link BOpEvaluationContext#HASHED} + * operator and the {@link IPredicate} is annotated for local access paths. If + * you need to use remote access paths, then the {@link PipelineJoin} should be + * annotated as a {@link BOpEvaluationContext#ANY} operator. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -278,15 +284,15 @@ } - /** - * Returns {@link BOpEvaluationContext#SHARDED} - */ - @Override - final public BOpEvaluationContext getEvaluationContext() { - - return BOpEvaluationContext.SHARDED; - - } + // /** + // * Returns {@link BOpEvaluationContext#SHARDED} + // */ + // @Override + // final public BOpEvaluationContext getEvaluationContext() { + // + // return BOpEvaluationContext.SHARDED; + // + // } public IPredicate<E> getPredicate() { @@ -339,14 +345,14 @@ public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - return new FutureTask<Void>(new JoinTask(this, context)); + return new FutureTask<Void>(new JoinTask<E>(this, context)); } /** * Pipeline join impl. */ - private static class JoinTask extends Haltable<Void> implements Callable<Void> { + private static class JoinTask<E> extends Haltable<Void> implements Callable<Void> { /** * The join that is being executed. @@ -394,12 +400,12 @@ /** * The source for the elements to be joined. */ - final private IPredicate<?> right; + final private IPredicate<E> right; /** * The relation associated with the {@link #right} operand. */ - final private IRelation<?> relation; + final private IRelation<E> relation; /** * The partition identifier -or- <code>-1</code> if we are not reading @@ -476,7 +482,7 @@ * @param context */ public JoinTask(// - final PipelineJoin<?> joinOp,// + final PipelineJoin<E> joinOp,// final BOpContext<IBindingSet> context ) { @@ -851,7 +857,7 @@ * Aggregate the source bindingSets that license the same * asBound predicate. */ - final Map<IPredicate<?>, Collection<IBindingSet>> map = combineBindingSets(chunk); + final Map<IPredicate<E>, Collection<IBindingSet>> map = combineBindingSets(chunk); /* * Generate an AccessPathTask from each distinct asBound @@ -898,7 +904,7 @@ final IBindingSet bindingSet = chunk[0]; // constrain the predicate to the given bindings. - IPredicate<?> predicate = right.asBound(bindingSet); + IPredicate<E> predicate = right.asBound(bindingSet); if (partitionId != -1) { @@ -917,7 +923,8 @@ } - new AccessPathTask(predicate,Arrays.asList(chunk)).call(); + new JoinTask.AccessPathTask(predicate, Arrays.asList(chunk)) + .call(); } @@ -937,13 +944,13 @@ * bindingSets in the chunk from which the predicate was * generated. */ - protected Map<IPredicate<?>, Collection<IBindingSet>> combineBindingSets( + protected Map<IPredicate<E>, Collection<IBindingSet>> combineBindingSets( final IBindingSet[] chunk) { if (log.isDebugEnabled()) log.debug("chunkSize=" + chunk.length); - final Map<IPredicate<?>, Collection<IBindingSet>> map = new LinkedHashMap<IPredicate<?>, Collection<IBindingSet>>( + final Map<IPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<IPredicate<E>, Collection<IBindingSet>>( chunk.length); for (IBindingSet bindingSet : chunk) { @@ -951,7 +958,7 @@ halted(); // constrain the predicate to the given bindings. - IPredicate<?> predicate = right.asBound(bindingSet); + IPredicate<E> predicate = right.asBound(bindingSet); if (partitionId != -1) { @@ -1025,16 +1032,16 @@ * @throws Exception */ protected AccessPathTask[] getAccessPathTasks( - final Map<IPredicate<?>, Collection<IBindingSet>> map) { + final Map<IPredicate<E>, Collection<IBindingSet>> map) { final int n = map.size(); if (log.isDebugEnabled()) log.debug("#distinct predicates=" + n); - final AccessPathTask[] tasks = new AccessPathTask[n]; + final AccessPathTask[] tasks = new JoinTask.AccessPathTask[n]; - final Iterator<Map.Entry<IPredicate<?>, Collection<IBindingSet>>> itr = map + final Iterator<Map.Entry<IPredicate<E>, Collection<IBindingSet>>> itr = map .entrySet().iterator(); int i = 0; @@ -1043,7 +1050,7 @@ halted(); - final Map.Entry<IPredicate<?>, Collection<IBindingSet>> entry = itr + final Map.Entry<IPredicate<E>, Collection<IBindingSet>> entry = itr .next(); tasks[i++] = new AccessPathTask(entry.getKey(), entry @@ -1203,7 +1210,7 @@ * {@link IPredicate} for this join dimension. The asBound * {@link IPredicate} is {@link IAccessPath#getPredicate()}. */ - final private IAccessPath<?> accessPath; + final private IAccessPath<E> accessPath; /** * Return the <em>fromKey</em> for the {@link IAccessPath} generated @@ -1215,7 +1222,7 @@ */ protected byte[] getFromKey() { - return ((AccessPath<?>) accessPath).getFromKey(); + return ((AccessPath<E>) accessPath).getFromKey(); } @@ -1239,7 +1246,7 @@ if (this == o) return true; - if (!(o instanceof AccessPathTask)) + if (!(o instanceof JoinTask.AccessPathTask)) return false; return accessPath.getPredicate().equals( @@ -1262,7 +1269,7 @@ * join dimension that all result in the same asBound * {@link IPredicate}. */ - public AccessPathTask(final IPredicate<?> predicate, + public AccessPathTask(final IPredicate<E> predicate, final Collection<IBindingSet> bindingSets) { if (predicate == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-24 19:37:50 UTC (rev 3629) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/mutation/InsertOp.java 2010-09-25 00:31:32 UTC (rev 3630) @@ -33,7 +33,6 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; -import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BindingSetPipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IPredicate; @@ -44,7 +43,6 @@ import com.bigdata.btree.UnisolatedReadWriteIndex; import com.bigdata.btree.keys.IKeyBuilder; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.ITx; import com.bigdata.relation.IRelation; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -293,6 +291,9 @@ * * FIXME This must obtain the appropriate lock for the mutable * index in scale-out. + * + * FIXME Allow remote writes as well if a remote access path is + * marked on the {@link IPredicate}. */ public <T> ILocalBTreeView getMu... [truncated message content] |