From: <mrp...@us...> - 2010-09-22 23:36:19
|
Revision: 3613 http://bigdata.svn.sourceforge.net/bigdata/?rev=3613&view=rev Author: mrpersonick Date: 2010-09-22 23:36:12 +0000 (Wed, 22 Sep 2010) Log Message: ----------- pulling out some rangeCount stuff into a superclass for use by the evaluation plan Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-22 21:39:51 UTC (rev 3612) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-22 23:36:12 UTC (rev 3613) @@ -60,7 +60,7 @@ * @param <E> * The generic type of the objects processed by the operator. */ -public class BOpContext<E> { +public class BOpContext<E> extends BOpContextBase { static private final Logger log = Logger.getLogger(BOpContext.class); @@ -102,6 +102,7 @@ * {@link IBigdataFederation}, this reference provides access to the * scale-out view of the indices and to other bigdata services. */ + @Override public IBigdataFederation<?> getFederation() { return runningQuery.getFederation(); } @@ -112,7 +113,8 @@ * wise and this {@link IIndexManager} MUST be able to read on the * {@link ILocalBTreeView}. */ - public final IIndexManager getIndexManager() { + @Override + public IIndexManager getIndexManager() { return runningQuery.getIndexManager(); } @@ -266,6 +268,9 @@ public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { + + super(null); + this.runningQuery = runningQuery; // if (indexManager == null) // throw new IllegalArgumentException(); @@ -300,223 +305,6 @@ } /** - * Locate and return the view of the relation(s) identified by the - * {@link IPredicate}. - * <p> - * Note: This method is responsible for returning a fused view when more - * than one relation name was specified for the {@link IPredicate}. It - * SHOULD be used whenever the {@link IRelation} is selected based on a - * predicate in the tail of an {@link IRule} and could therefore be a fused - * view of more than one relation instance. (The head of the {@link IRule} - * must be a simple {@link IRelation} and not a view.) - * <p> - * Note: The implementation should choose the read timestamp for each - * relation in the view using {@link #getReadTimestamp(String)}. - * - * @param pred - * The {@link IPredicate}, which MUST be a tail from some - * {@link IRule}. - * - * @return The {@link IRelation}. - * - * @todo Replaces {@link IJoinNexus#getTailRelationView(IPredicate)}. In - * order to support mutation operator we will also have to pass in the - * {@link #writeTimestamp} or differentiate this in the method name. - */ - public IRelation getRelation(final IPredicate<?> pred) { - - /* - * Note: This uses the federation as the index manager when locating a - * resource for scale-out. However, s/o reads must use the local index - * manager when actually obtaining the index view for the relation. - */ - final IIndexManager tmp = getFederation() == null ? getIndexManager() - : getFederation(); - - final long timestamp = (Long) pred - .getRequiredProperty(BOp.Annotations.TIMESTAMP); - - return (IRelation<?>) tmp.getResourceLocator().locate( - pred.getOnlyRelationName(), timestamp); - - } - -// /** -// * Return a writable view of the relation. -// * -// * @param namespace -// * The namespace of the relation. -// * -// * @return A writable view of the relation. -// * -// * @deprecated by getRelation() -// */ -// public IRelation getWriteRelation(final String namespace) { -// -// /* -// * @todo Cache the resource locator? -// * -// * @todo This should be using the federation as the index manager when -// * locating a resource for scale-out, right? But s/o writes must use -// * the local index manager when actually obtaining the index view for -// * the relation. -// */ -// return (IRelation) getIndexManager().getResourceLocator().locate( -// namespace, getWriteTimestamp()); -// -// } - - /** - * Obtain an access path reading from relation for the specified predicate - * (from the tail of some rule). - * <p> - * Note that passing in the {@link IRelation} is important since it - * otherwise must be discovered using the {@link IResourceLocator}. By - * requiring the caller to resolve it before hand and pass it into this - * method the contention and demand on the {@link IResourceLocator} cache is - * reduced. - * - * @param relation - * The relation. - * @param pred - * The predicate. When {@link IPredicate#getPartitionId()} is - * set, the returned {@link IAccessPath} MUST read on the - * identified local index partition (directly, not via RMI). - * - * @return The access path. - * - * @todo replaces - * {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. - */ - @SuppressWarnings("unchecked") - public IAccessPath<?> getAccessPath(final IRelation<?> relation, - final IPredicate<?> predicate) { - - if (relation == null) - throw new IllegalArgumentException(); - - if (predicate == null) - throw new IllegalArgumentException(); - // FIXME This should be as assigned by the query planner so the query is fully declarative. - final IKeyOrder keyOrder = relation.getKeyOrder((IPredicate) predicate); - - if (keyOrder == null) - throw new RuntimeException("No access path: " + predicate); - - final int partitionId = predicate.getPartitionId(); - - final long timestamp = (Long) predicate - .getRequiredProperty(BOp.Annotations.TIMESTAMP); - - final int flags = predicate.getProperty( - PipelineOp.Annotations.FLAGS, - PipelineOp.Annotations.DEFAULT_FLAGS) - | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY - : 0); - - final int chunkOfChunksCapacity = predicate.getProperty( - PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, - PipelineOp.Annotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); - - final int chunkCapacity = predicate.getProperty( - PipelineOp.Annotations.CHUNK_CAPACITY, - PipelineOp.Annotations.DEFAULT_CHUNK_CAPACITY); - - final int fullyBufferedReadThreshold = predicate.getProperty( - PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD, - PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); - - final IIndexManager indexManager = getIndexManager(); - - if (predicate.getPartitionId() != -1) { - - /* - * Note: This handles a read against a local index partition. For - * scale-out, the [indexManager] will be the data service's local - * index manager. - * - * Note: Expanders ARE NOT applied in this code path. Expanders - * require a total view of the relation, which is not available - * during scale-out pipeline joins. Likewise, the [backchain] - * property will be ignored since it is handled by an expander. - * - * @todo Replace this with IRelation#getAccessPathForIndexPartition() - */ -// return ((AbstractRelation<?>) relation) -// .getAccessPathForIndexPartition(indexManager, -// (IPredicate) predicate); - /* - * @todo This condition should probably be an error since the expander - * will be ignored. - */ -// if (predicate.getSolutionExpander() != null) -// throw new IllegalArgumentException(); - - final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); - - // The name of the desired index partition. - final String name = DataService.getIndexPartitionName(namespace - + "." + keyOrder.getIndexName(), partitionId); - - // MUST be a local index view. - final ILocalBTreeView ndx = (ILocalBTreeView) indexManager - .getIndex(name, timestamp); - - return new AccessPath(relation, indexManager, timestamp, - predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, - chunkCapacity, fullyBufferedReadThreshold).init(); - - } - - /* - * Find the best access path for the predicate for that relation. - * - * @todo Replace this with IRelation#getAccessPath(IPredicate) once the - * bop conversion is done. It is the same logic. - */ - IAccessPath accessPath; - { - -// accessPath = relation.getAccessPath((IPredicate) predicate); - - final IIndex ndx = relation.getIndex(keyOrder); - - if (ndx == null) { - - throw new IllegalArgumentException("no index? relation=" - + relation.getNamespace() + ", timestamp=" - + timestamp + ", keyOrder=" + keyOrder + ", pred=" - + predicate + ", indexManager=" + getIndexManager()); - - } - - accessPath = new AccessPath((IRelation) relation, indexManager, - timestamp, (IPredicate) predicate, - (IKeyOrder) keyOrder, ndx, flags, chunkOfChunksCapacity, - chunkCapacity, fullyBufferedReadThreshold).init(); - - } - - /* - * @todo No expander's for bops, at least not right now. They could be - * added in easily enough, which would support additional features for - * standalone query evaluation (runtime materialization of some - * entailments). - */ - // final ISolutionExpander expander = predicate.getSolutionExpander(); - // - // if (expander != null) { - // - // // allow the predicate to wrap the access path - // accessPath = expander.getAccessPath(accessPath); - // - // } - - // return that access path. - return accessPath; - } - - /** * Binds variables from a visited element. * <p> * Note: The bindings are propagated before the constraints are verified so Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-22 23:36:12 UTC (rev 3613) @@ -0,0 +1,313 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +/* + * Created on Aug 26, 2010 + */ +package com.bigdata.bop; + +import org.apache.log4j.Logger; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.btree.IIndex; +import com.bigdata.btree.ILocalBTreeView; +import com.bigdata.btree.IRangeQuery; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.TimestampUtility; +import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.AccessPath; +import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.relation.locator.IResourceLocator; +import com.bigdata.relation.rule.IRule; +import com.bigdata.relation.rule.ISolutionExpander; +import com.bigdata.relation.rule.eval.IJoinNexus; +import com.bigdata.service.DataService; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.striterator.IKeyOrder; + +/** + * The evaluation context for the operator (NOT serializable). + * + * @param <E> + * The generic type of the objects processed by the operator. + */ +public class BOpContextBase { + + static private final Logger log = Logger.getLogger(BOpContextBase.class); + + private final QueryEngine queryEngine; + + /** + * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs + * against the local indices. In scale-out, query evaluation proceeds shard + * wise and this {@link IIndexManager} MUST be able to read on the + * {@link ILocalBTreeView}. + */ + public IIndexManager getIndexManager() { + return queryEngine.getIndexManager(); + } + + /** + * The {@link IBigdataFederation} IFF the operator is being evaluated on an + * {@link IBigdataFederation}. When evaluating operations against an + * {@link IBigdataFederation}, this reference provides access to the + * scale-out view of the indices and to other bigdata services. + */ + public IBigdataFederation<?> getFederation() { + return queryEngine.getFederation(); + } + + /** + * + * @param indexManager + * The <strong>local</strong> {@link IIndexManager}. Query + * evaluation occurs against the local indices. In scale-out, + * query evaluation proceeds shard wise and this + * {@link IIndexManager} MUST be able to read on the + * {@link ILocalBTreeView}. + * + */ + public BOpContextBase(final QueryEngine queryEngine) { + this.queryEngine = queryEngine; + } + + /** + * Locate and return the view of the relation(s) identified by the + * {@link IPredicate}. + * <p> + * Note: This method is responsible for returning a fused view when more + * than one relation name was specified for the {@link IPredicate}. It + * SHOULD be used whenever the {@link IRelation} is selected based on a + * predicate in the tail of an {@link IRule} and could therefore be a fused + * view of more than one relation instance. (The head of the {@link IRule} + * must be a simple {@link IRelation} and not a view.) + * <p> + * Note: The implementation should choose the read timestamp for each + * relation in the view using {@link #getReadTimestamp(String)}. + * + * @param pred + * The {@link IPredicate}, which MUST be a tail from some + * {@link IRule}. + * + * @return The {@link IRelation}. + * + * @todo Replaces {@link IJoinNexus#getTailRelationView(IPredicate)}. In + * order to support mutation operator we will also have to pass in the + * {@link #writeTimestamp} or differentiate this in the method name. + */ + public IRelation getRelation(final IPredicate<?> pred) { + + /* + * Note: This uses the federation as the index manager when locating a + * resource for scale-out. However, s/o reads must use the local index + * manager when actually obtaining the index view for the relation. + */ + final IIndexManager tmp = getFederation() == null ? getIndexManager() + : getFederation(); + + final long timestamp = (Long) pred + .getRequiredProperty(BOp.Annotations.TIMESTAMP); + + return (IRelation<?>) tmp.getResourceLocator().locate( + pred.getOnlyRelationName(), timestamp); + + } + +// /** +// * Return a writable view of the relation. +// * +// * @param namespace +// * The namespace of the relation. +// * +// * @return A writable view of the relation. +// * +// * @deprecated by getRelation() +// */ +// public IRelation getWriteRelation(final String namespace) { +// +// /* +// * @todo Cache the resource locator? +// * +// * @todo This should be using the federation as the index manager when +// * locating a resource for scale-out, right? But s/o writes must use +// * the local index manager when actually obtaining the index view for +// * the relation. +// */ +// return (IRelation) getIndexManager().getResourceLocator().locate( +// namespace, getWriteTimestamp()); +// +// } + + /** + * Obtain an access path reading from relation for the specified predicate + * (from the tail of some rule). + * <p> + * Note that passing in the {@link IRelation} is important since it + * otherwise must be discovered using the {@link IResourceLocator}. By + * requiring the caller to resolve it before hand and pass it into this + * method the contention and demand on the {@link IResourceLocator} cache is + * reduced. + * + * @param relation + * The relation. + * @param pred + * The predicate. When {@link IPredicate#getPartitionId()} is + * set, the returned {@link IAccessPath} MUST read on the + * identified local index partition (directly, not via RMI). + * + * @return The access path. + * + * @todo replaces + * {@link IJoinNexus#getTailAccessPath(IRelation, IPredicate)}. + */ + @SuppressWarnings("unchecked") + public IAccessPath<?> getAccessPath(final IRelation<?> relation, + final IPredicate<?> predicate) { + + if (relation == null) + throw new IllegalArgumentException(); + + if (predicate == null) + throw new IllegalArgumentException(); + // FIXME This should be as assigned by the query planner so the query is fully declarative. + final IKeyOrder keyOrder = relation.getKeyOrder((IPredicate) predicate); + + if (keyOrder == null) + throw new RuntimeException("No access path: " + predicate); + + final int partitionId = predicate.getPartitionId(); + + final long timestamp = (Long) predicate + .getRequiredProperty(BOp.Annotations.TIMESTAMP); + + final int flags = predicate.getProperty( + PipelineOp.Annotations.FLAGS, + PipelineOp.Annotations.DEFAULT_FLAGS) + | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY + : 0); + + final int chunkOfChunksCapacity = predicate.getProperty( + PipelineOp.Annotations.CHUNK_OF_CHUNKS_CAPACITY, + PipelineOp.Annotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); + + final int chunkCapacity = predicate.getProperty( + PipelineOp.Annotations.CHUNK_CAPACITY, + PipelineOp.Annotations.DEFAULT_CHUNK_CAPACITY); + + final int fullyBufferedReadThreshold = predicate.getProperty( + PipelineOp.Annotations.FULLY_BUFFERED_READ_THRESHOLD, + PipelineOp.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); + + final IIndexManager indexManager = getIndexManager(); + + if (predicate.getPartitionId() != -1) { + + /* + * Note: This handles a read against a local index partition. For + * scale-out, the [indexManager] will be the data service's local + * index manager. + * + * Note: Expanders ARE NOT applied in this code path. Expanders + * require a total view of the relation, which is not available + * during scale-out pipeline joins. Likewise, the [backchain] + * property will be ignored since it is handled by an expander. + * + * @todo Replace this with IRelation#getAccessPathForIndexPartition() + */ +// return ((AbstractRelation<?>) relation) +// .getAccessPathForIndexPartition(indexManager, +// (IPredicate) predicate); + /* + * @todo This condition should probably be an error since the expander + * will be ignored. + */ +// if (predicate.getSolutionExpander() != null) +// throw new IllegalArgumentException(); + + final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); + + // The name of the desired index partition. + final String name = DataService.getIndexPartitionName(namespace + + "." + keyOrder.getIndexName(), partitionId); + + // MUST be a local index view. + final ILocalBTreeView ndx = (ILocalBTreeView) indexManager + .getIndex(name, timestamp); + + return new AccessPath(relation, indexManager, timestamp, + predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, + chunkCapacity, fullyBufferedReadThreshold).init(); + + } + + /* + * Find the best access path for the predicate for that relation. + * + * @todo Replace this with IRelation#getAccessPath(IPredicate) once the + * bop conversion is done. It is the same logic. + */ + IAccessPath accessPath; + { + +// accessPath = relation.getAccessPath((IPredicate) predicate); + + final IIndex ndx = relation.getIndex(keyOrder); + + if (ndx == null) { + + throw new IllegalArgumentException("no index? relation=" + + relation.getNamespace() + ", timestamp=" + + timestamp + ", keyOrder=" + keyOrder + ", pred=" + + predicate + ", indexManager=" + getIndexManager()); + + } + + accessPath = new AccessPath((IRelation) relation, indexManager, + timestamp, (IPredicate) predicate, + (IKeyOrder) keyOrder, ndx, flags, chunkOfChunksCapacity, + chunkCapacity, fullyBufferedReadThreshold).init(); + + } + + /* + * @todo No expander's for bops, at least not right now. They could be + * added in easily enough, which would support additional features for + * standalone query evaluation (runtime materialization of some + * entailments). + * + * FIXME temporarily enabled expanders (mikep) + */ + final ISolutionExpander<?> expander = predicate.getSolutionExpander(); + + if (expander != null) { + + // allow the predicate to wrap the access path + accessPath = expander.getAccessPath(accessPath); + + } + + // return that access path. + return accessPath; + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-09-23 20:20:30
|
Revision: 3618 http://bigdata.svn.sourceforge.net/bigdata/?rev=3618&view=rev Author: thompsonbry Date: 2010-09-23 20:20:24 +0000 (Thu, 23 Sep 2010) Log Message: ----------- Fixed a problem I introduced into the layering of the BOpContext. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 20:10:58 UTC (rev 3617) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-09-23 20:20:24 UTC (rev 3618) @@ -196,7 +196,7 @@ final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { - super(null); + super(runningQuery.getFederation(), runningQuery.getIndexManager()); this.runningQuery = runningQuery; // if (indexManager == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 20:10:58 UTC (rev 3617) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-09-23 20:20:24 UTC (rev 3618) @@ -59,7 +59,10 @@ static private final transient Logger log = Logger.getLogger(BOpContextBase.class); - private final QueryEngine queryEngine; +// private final QueryEngine queryEngine; + + private final IBigdataFederation<?> fed; + private final IIndexManager indexManager; /** * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs @@ -68,17 +71,18 @@ * {@link ILocalBTreeView}. */ final public IIndexManager getIndexManager() { - return queryEngine.getIndexManager(); + return indexManager; } - + /** * The {@link IBigdataFederation} IFF the operator is being evaluated on an - * {@link IBigdataFederation}. When evaluating operations against an - * {@link IBigdataFederation}, this reference provides access to the - * scale-out view of the indices and to other bigdata services. + * {@link IBigdataFederation} and otherwise <code>null</code>. When + * evaluating operations against an {@link IBigdataFederation}, this + * reference provides access to the scale-out view of the indices and to + * other bigdata services. */ final public IBigdataFederation<?> getFederation() { - return queryEngine.getFederation(); + return fed; } /** @@ -88,23 +92,37 @@ * <em>local</em> {@link #getIndexManager() index manager}. */ public final Executor getExecutorService() { - return getIndexManager().getExecutorService(); + return indexManager.getExecutorService(); } + public BOpContextBase(final QueryEngine queryEngine) { + + this(queryEngine.getFederation(), queryEngine.getIndexManager()); + + } + /** - * + * Core constructor. + * @param fed * @param indexManager - * The <strong>local</strong> {@link IIndexManager}. Query - * evaluation occurs against the local indices. In scale-out, - * query evaluation proceeds shard wise and this - * {@link IIndexManager} MUST be able to read on the - * {@link ILocalBTreeView}. - * */ - public BOpContextBase(final QueryEngine queryEngine) { - this.queryEngine = queryEngine; + public BOpContextBase(final IBigdataFederation<?> fed, + final IIndexManager indexManager) { + + /* + * @todo null is permitted here for the unit tests, but we should really + * mock the IIndexManager and pass in a non-null object here and then + * verify that the reference is non-null. + */ +// if (indexManager == null) +// throw new IllegalArgumentException(); + + this.fed = fed; + + this.indexManager = indexManager; + } - + /** * Locate and return the view of the relation(s) identified by the * {@link IPredicate}. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-09-29 19:55:36
|
Revision: 3685 http://bigdata.svn.sourceforge.net/bigdata/?rev=3685&view=rev Author: thompsonbry Date: 2010-09-29 19:55:30 +0000 (Wed, 29 Sep 2010) Log Message: ----------- Added NOARGS and NOANNS for BOPs that we create a lot of. These are a singleton empty array and empty map respectively. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-29 19:54:27 UTC (rev 3684) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-09-29 19:55:30 UTC (rev 3685) @@ -71,6 +71,20 @@ private static final long serialVersionUID = 1L; /** + * An empty array. + */ + static protected final transient BOp[] NOARGS = new BOp[] {}; + + /** + * An empty immutable annotations map. + * + * @todo This is for things like {@link Constant} and {@link Var} which are + * never annotated. However, i'm not sure if this is a good idea or + * not. A "copy on write" map might be better. + */ + static protected final transient Map<String,Object> NOANNS = Collections.emptyMap(); + + /** * The argument values - <strong>direct access to this field is * discouraged</strong> - the field is protected to support * <em>mutation</em> APIs and should not be relied on for other purposes. @@ -288,6 +302,10 @@ /** deep copy the arguments. */ static protected BOp[] deepCopy(final BOp[] a) { + if (a == NOARGS) { + // fast path for zero arity operators. + return a; + } final BOp[] t = new BOp[a.length]; for (int i = 0; i < a.length; i++) { t[i] = a[i] == null ? null : a[i].clone(); @@ -311,7 +329,11 @@ * containing an ontology or some conditional assertions with a query * plan. */ - static protected Map<String,Object> deepCopy(final Map<String,Object> a) { + static protected Map<String, Object> deepCopy(final Map<String, Object> a) { + if (a == NOANNS) { + // Fast past for immutable, empty annotations. + return a; + } // allocate map. final Map<String, Object> t = new LinkedHashMap<String, Object>(a .size()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-09-29 19:54:27 UTC (rev 3684) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-09-29 19:55:30 UTC (rev 3685) @@ -66,13 +66,16 @@ * @param op */ public Constant(final Constant<E> op) { + super(op); + this.value = op.value; + } public Constant(final E value) { - super(new BOp[] {}, null/* annotations */); + super(NOARGS, NOANNS); if (value == null) throw new IllegalArgumentException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2010-09-29 19:54:27 UTC (rev 3684) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2010-09-29 19:55:30 UTC (rev 3685) @@ -58,7 +58,8 @@ */ private Var(final String name) { - super(new BOp[] {}, null/* annotations */); +// super(new BOp[] {}, null/* annotations */); + super(NOARGS, NOANNS); assert name != null; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-19 17:29:14
|
Revision: 3823 http://bigdata.svn.sourceforge.net/bigdata/?rev=3823&view=rev Author: thompsonbry Date: 2010-10-19 17:29:08 +0000 (Tue, 19 Oct 2010) Log Message: ----------- Added a Constant constructor variant which accept the name of the variable bound to that constant. BOpContext knows how to use this to insert the constant into the binding set in bind(...) [that will be in a separate commit]. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-19 16:24:57 UTC (rev 3822) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-19 17:29:08 UTC (rev 3823) @@ -108,6 +108,18 @@ <T> T getProperty(final String name, final T defaultValue); /** + * Unconditionally sets the property. + * + * @param name + * The name. + * @param value + * The value. + * + * @return A copy of this {@link BOp} on which the property has been set. + */ + BOp setProperty(final String name, final Object value); + + /** * Return the value of the named annotation. * * @param name Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-19 16:24:57 UTC (rev 3822) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-19 17:29:08 UTC (rev 3823) @@ -438,16 +438,6 @@ } - /** - * Unconditionally sets the property. - * - * @param name - * The name. - * @param value - * The value. - * - * @return A copy of this {@link BOp} on which the property has been set. - */ public BOpBase setProperty(final String name, final Object value) { final BOpBase tmp = this.clone(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-10-19 16:24:57 UTC (rev 3822) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-10-19 17:29:08 UTC (rev 3823) @@ -37,6 +37,16 @@ private static final long serialVersionUID = -2967861242470442497L; final private E value; + + public interface Annotations extends ImmutableBOp.Annotations { + + /** + * The {@link IVariable} which is bound to that constant value + * (optional). + */ + String VAR = Constant.class.getName() + ".var"; + + } final public boolean isVar() { @@ -72,18 +82,32 @@ this.value = op.value; } - + + public Constant(final IVariable<E> var, final E value) { + + super(NOARGS, NV.asMap(new NV(Annotations.VAR, var))); + + if (var == null) + throw new IllegalArgumentException(); + + if (value == null) + throw new IllegalArgumentException(); + + this.value = value; + + } + public Constant(final E value) { - + super(NOARGS, NOANNS); - + if (value == null) throw new IllegalArgumentException(); - + this.value = value; - + } - + /** * Clone is overridden to reduce heap churn. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-02 10:18:59
|
Revision: 3860 http://bigdata.svn.sourceforge.net/bigdata/?rev=3860&view=rev Author: thompsonbry Date: 2010-11-02 10:18:52 +0000 (Tue, 02 Nov 2010) Log Message: ----------- PipelineJoin - fixed a bug where the detection of duplicate access paths could cause an optional join to drop some solutions. See [1]. RunningQuery - removed unused import and added information to a log message. [1] https://sourceforge.net/apps/trac/bigdata/ticket/192 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-01 19:06:48 UTC (rev 3859) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-02 10:18:52 UTC (rev 3860) @@ -27,7 +27,6 @@ */ package com.bigdata.bop.engine; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -1342,7 +1341,8 @@ } catch (Throwable ex1) { // Log an error. - log.error("queryId=" + queryId + ", bopId=" + t.bopId, ex1); + log.error("queryId=" + queryId + ", bopId=" + t.bopId + + ", bop=" + t.bop, ex1); /* * Mark the query as halted on this node regardless of whether Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-01 19:06:48 UTC (rev 3859) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-11-02 10:18:52 UTC (rev 3860) @@ -1206,6 +1206,17 @@ final private IBindingSet[] bindingSets; /** + * An array correlated with the {@link #bindingSets} whose values + * are the #of solutions generated for each of the source binding + * sets consumed by this {@link AccessPathTask}. This array is used + * to determine, whether or not any solutions were produced for a + * given {@link IBindingSet}. When the join is optional and no + * solutions were produced for a given {@link IBindingSet}, the + * {@link IBindingSet} is output anyway. + */ + final private int[] naccepted; + + /** * The {@link IAccessPath} corresponding to the asBound * {@link IPredicate} for this join dimension. The asBound * {@link IPredicate} is {@link IAccessPath#getPredicate()}. @@ -1301,6 +1312,8 @@ // convert to array for thread-safe traversal. this.bindingSets = bindingSets.toArray(new IBindingSet[n]); + + this.naccepted = new int[n]; } @@ -1352,7 +1365,7 @@ */ protected void handleJoin() { - boolean nothingAccepted = true; +// boolean nothingAccepted = true; // Obtain the iterator for the current join dimension. final IChunkedOrderedIterator<?> itr = accessPath.iterator(); @@ -1363,10 +1376,6 @@ final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer = threadLocalBufferFactory .get(); - // Thread-local buffer iff optional sink is in use. - final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = threadLocalBufferFactory2 == null ? null - : threadLocalBufferFactory2.get(); - while (itr.hasNext()) { final Object[] chunk = itr.nextChunk(); @@ -1374,19 +1383,45 @@ stats.accessPathChunksIn.increment(); // process the chunk in the caller's thread. - final boolean somethingAccepted = new ChunkTask( - bindingSets, unsyncBuffer, chunk).call(); +// final boolean somethingAccepted = + new ChunkTask(bindingSets, naccepted, unsyncBuffer, + chunk).call(); - if (somethingAccepted) { +// if (somethingAccepted) { +// +// // something in the chunk was accepted. +// nothingAccepted = false; +// +// } - // something in the chunk was accepted. - nothingAccepted = false; - - } - } // next chunk. - if (nothingAccepted && optional) { +// if (nothingAccepted && optional) { +// +// /* +// * Note: when NO binding sets were accepted AND the +// * predicate is OPTIONAL then we output the _original_ +// * binding set(s) to the sink join task(s). +// */ +// +// // Thread-local buffer iff optional sink is in use. +// final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = threadLocalBufferFactory2 == null ? null +// : threadLocalBufferFactory2.get(); +// +// for (IBindingSet bs : this.bindingSets) { +// +// if (unsyncBuffer2 == null) { +// // use the default sink. +// unsyncBuffer.add(bs); +// } else { +// // use the alternative sink. +// unsyncBuffer2.add(bs); +// } +// +// } +// +// } + if (optional) { /* * Note: when NO binding sets were accepted AND the @@ -1394,8 +1429,22 @@ * binding set(s) to the sink join task(s). */ - for (IBindingSet bs : this.bindingSets) { + // Thread-local buffer iff optional sink is in use. + final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer2 = threadLocalBufferFactory2 == null ? null + : threadLocalBufferFactory2.get(); + for (int bindex = 0; bindex < bindingSets.length; bindex++) { + + if (naccepted[bindex] > 0) + continue; + + final IBindingSet bs = bindingSets[bindex]; + + if (log.isTraceEnabled()) + log + .trace("Passing on solution which fails an optional join: " + + bs); + if (unsyncBuffer2 == null) { // use the default sink. unsyncBuffer.add(bs); @@ -1407,7 +1456,6 @@ } } - return; } catch (Throwable t) { @@ -1655,7 +1703,7 @@ * Thompson</a> * @version $Id$ */ - protected class ChunkTask implements Callable<Boolean> { + protected class ChunkTask implements Callable<Void> { /** * The {@link IBindingSet}s which the each element in the chunk will @@ -1665,6 +1713,11 @@ private final IBindingSet[] bindingSets; /** + * The #of solutions accepted for each of the {@link #bindingSets}. + */ + private final int[] naccepted; + + /** * A per-{@link Thread} buffer that is used to collect * {@link IBindingSet}s into chunks before handing them off to the * next join dimension. The hand-off occurs no later than when the @@ -1684,35 +1737,39 @@ * The bindings with which the each element in the chunk * will be paired to create the bindings for the * downstream join dimension. + * @param naccepted + * An array used to indicate as a side-effect the #of + * solutions accepted for each of the {@link IBindingSet} + * s. * @param unsyncBuffer * A per-{@link Thread} buffer used to accumulate chunks - * of generated {@link IBindingSet}s (optional). When the - * {@link ChunkTask} will be run in its own thread, pass - * <code>null</code> and the buffer will be obtained in - * {@link #call()}. + * of generated {@link IBindingSet}s. * @param chunk * A chunk of elements read from the {@link IAccessPath} * for the current join dimension. */ public ChunkTask( final IBindingSet[] bindingSet, + final int[] naccepted, final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer, final Object[] chunk) { if (bindingSet == null) throw new IllegalArgumentException(); - // Allow null! - // if (unsyncBuffer == null) - // throw new IllegalArgumentException(); + if (naccepted== null) + throw new IllegalArgumentException(); + + if (unsyncBuffer == null) + throw new IllegalArgumentException(); if (chunk == null) throw new IllegalArgumentException(); -// this.tailIndex = getTailIndex(orderIndex); - this.bindingSets = bindingSet; + this.naccepted = naccepted; + this.chunk = chunk; this.unsyncBuffer = unsyncBuffer; @@ -1720,10 +1777,6 @@ } /** - * @return <code>true</code> iff NO elements in the chunk (as read - * from the access path by the caller) were accepted when - * combined with the {@link #bindingSets} from the source - * {@link JoinTask}. * * @throws BufferClosedException * if there is an attempt to output a chunk of @@ -1733,29 +1786,31 @@ * true for query on the lastJoin) and that * {@link IBlockingBuffer} has been closed. */ - public Boolean call() throws Exception { + public Void call() throws Exception { try { // ChunkTrace.chunk(orderIndex, chunk); - boolean nothingAccepted = true; +// boolean nothingAccepted = true; - // Use caller's or obtain our own as necessary. - final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer = (this.unsyncBuffer == null) ? threadLocalBufferFactory - .get() - : this.unsyncBuffer; +// // Use caller's or obtain our own as necessary. +// final AbstractUnsynchronizedArrayBuffer<IBindingSet> unsyncBuffer = (this.unsyncBuffer == null) ? threadLocalBufferFactory +// .get() +// : this.unsyncBuffer; for (Object e : chunk) { if (isDone()) - return nothingAccepted; + return null; +// return nothingAccepted; // naccepted for the current element (trace only). int naccepted = 0; stats.accessPathUnitsIn.increment(); + int bindex = 0; for (IBindingSet bset : bindingSets) { /* @@ -1774,12 +1829,19 @@ // Accept this binding set. unsyncBuffer.add(bset); + // #of binding sets accepted. naccepted++; + + // #of elements accepted for this binding set. + this.naccepted[bindex]++; - nothingAccepted = false; +// // something was accepted. +// nothingAccepted = false; } + bindex++; + } if (log.isDebugEnabled()) @@ -1793,9 +1855,12 @@ } } - // if something is accepted in the chunk return true. - return nothingAccepted ? Boolean.FALSE : Boolean.TRUE; +// // if something is accepted in the chunk return true. +// return nothingAccepted ? Boolean.FALSE : Boolean.TRUE; + // Done. + return null; + } catch (Throwable t) { halt(t); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:29:26
|
Revision: 3916 http://bigdata.svn.sourceforge.net/bigdata/?rev=3916&view=rev Author: thompsonbry Date: 2010-11-08 21:29:20 +0000 (Mon, 08 Nov 2010) Log Message: ----------- BOpBase : more information in exception BOpContext : made method public, static. it should be moved to BOpUtility. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-08 21:28:39 UTC (rev 3915) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-11-08 21:29:20 UTC (rev 3916) @@ -394,7 +394,8 @@ final Object tmp = annotations.get(name); if (tmp == null) - throw new IllegalStateException("Required property: " + name); + throw new IllegalStateException("Required property: " + name + + " : " + this); return tmp; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-11-08 21:28:39 UTC (rev 3915) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-11-08 21:29:20 UTC (rev 3916) @@ -261,9 +261,11 @@ * The predicate. * @param bindingSet * The binding set, which is modified as a side-effect. + * + * @todo move to {@link BOpUtility}? */ @SuppressWarnings("unchecked") - final private void copyValues(final IElement e, final IPredicate<?> pred, + static public void copyValues(final IElement e, final IPredicate<?> pred, final IBindingSet bindingSet) { for (int i = 0; i < pred.arity(); i++) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-08 21:30:35
|
Revision: 3917 http://bigdata.svn.sourceforge.net/bigdata/?rev=3917&view=rev Author: thompsonbry Date: 2010-11-08 21:30:29 +0000 (Mon, 08 Nov 2010) Log Message: ----------- Modified to stop propagating chunks (and not throw new errors) when the query is already halted (not all cases in which this occurs have been addressed so far). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IChunkHandler.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -44,32 +44,32 @@ */ public interface IChunkHandler { - /** - * Take an {@link IBindingSet}[] chunk generated by some pass over an - * operator and make it available to the target operator. How this is done - * depends on whether the query is running against a standalone database or - * the scale-out database. - * <p> - * Note: The return value is used as part of the termination criteria for - * the query which depends on (a) the #of running operator tasks and (b) the - * #of {@link IChunkMessage}s generated (available) and consumed. The return - * value of this method increases the #of {@link IChunkMessage} available to - * the query. - * - * @param query - * The query. - * @param bopId - * The operator which wrote on the sink. - * @param sinkId - * The identifier of the target operator. - * @param chunk - * The intermediate results to be passed to that target operator. - * - * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) - * for scale-up. For scale-out, there will be at least one - * {@link IChunkMessage} per index partition over which the - * intermediate results were mapped. - */ + /** + * Take an {@link IBindingSet}[] chunk generated by some pass over an + * operator and make it available to the target operator. How this is done + * depends on whether the query is running against a standalone database or + * the scale-out database. + * <p> + * Note: The return value is used as part of the termination criteria for + * the query which depends on (a) the #of running operator tasks and (b) the + * #of {@link IChunkMessage}s generated (available) and consumed. The return + * value of this method increases the #of {@link IChunkMessage}s available + * to the query. + * + * @param query + * The query. + * @param bopId + * The operator which wrote on the sink. + * @param sinkId + * The identifier of the target operator. + * @param chunk + * The intermediate results to be passed to that target operator. + * + * @return The #of {@link IChunkMessage} sent. This will always be ONE (1) + * for scale-up. For scale-out, there will be at least one + * {@link IChunkMessage} per index partition over which the + * intermediate results were mapped. + */ int handleChunk(RunningQuery query, int bopId, int sinkId, IBindingSet[] chunk); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -412,6 +412,12 @@ } + protected boolean isRunning() { + + return engineFuture.get() != null && !shutdown; + + } + protected void execute(final Runnable r) { localIndexManager.getExecutorService().execute(r); @@ -492,20 +498,24 @@ } } // QueryEngineTask - /** - * Add a chunk of intermediate results for consumption by some query. The - * chunk will be attached to the query and the query will be scheduled for - * execution. - * - * @param msg - * A chunk of intermediate results. - * - * @throws IllegalArgumentException - * if the chunk is <code>null</code>. - * @throws IllegalStateException - * if the chunk is not materialized. - */ - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + /** + * Add a chunk of intermediate results for consumption by some query. The + * chunk will be attached to the query and the query will be scheduled for + * execution. + * + * @param msg + * A chunk of intermediate results. + * + * @return <code>true</code> if the chunk was accepted. This will return + * <code>false</code> if the query is done (including cancelled) or + * the query engine is shutdown. + * + * @throws IllegalArgumentException + * if the chunk is <code>null</code>. + * @throws IllegalStateException + * if the chunk is not materialized. + */ + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -515,17 +525,37 @@ final RunningQuery q = runningQueries.get(msg.getQueryId()); - if(q == null) + if(q == null) { + /* + * The query is not registered on this node. + * + * FIXME We should recognize the difference between a query which + * was never registered (and throw an error here) and a query which + * is done and has been removed from runningQueries. One way to do + * this is with an LRU of recently completed queries. + */ +// return false; throw new IllegalStateException(); + } - // add chunk to the query's input queue on this node. - q.acceptChunk(msg); + // add chunk to the query's input queue on this node. + if (!q.acceptChunk(msg)) { + // query is no longer running. + return false; + + } - assertRunning(); - - // add query to the engine's task queue. - priorityQueue.add(q); + if(!isRunning()) { + // query engine is no longer running. + return false; + + } + // add query to the engine's task queue. + priorityQueue.add(q); + + return true; + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -1076,15 +1076,17 @@ // } // } - /** - * Make a chunk of binding sets available for consumption by the query. - * <p> - * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} - * - * @param msg - * The chunk. - */ - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + /** + * Make a chunk of binding sets available for consumption by the query. + * <p> + * Note: this is invoked by {@link QueryEngine#acceptChunk(IChunkMessage)} + * + * @param msg + * The chunk. + * + * @return <code>true</code> if the message was accepted. + */ + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { if (msg == null) throw new IllegalArgumentException(); @@ -1099,9 +1101,11 @@ try { - // verify still running. - if (future.isDone()) - throw new RuntimeException(ERR_QUERY_DONE, future.getCause()); + if (future.isDone()) { + // The query is no longer running. + return false; + //throw new RuntimeException(ERR_QUERY_DONE, future.getCause()); + } BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues .get(bundle); @@ -1116,6 +1120,8 @@ queue.add(msg); + return true; + } finally { lock.unlock(); @@ -2092,8 +2098,9 @@ try { - log.error(toString(), t); - + if (!InnerCause.isInnerCause(t, InterruptedException.class)) + log.error(toString(), t); + try { // signal error condition. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-11-08 21:29:20 UTC (rev 3916) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java 2010-11-08 21:30:29 UTC (rev 3917) @@ -288,9 +288,9 @@ * {@inheritDoc} */ @Override - protected void acceptChunk(final IChunkMessage<IBindingSet> msg) { + protected boolean acceptChunk(final IChunkMessage<IBindingSet> msg) { - super.acceptChunk(msg); + return super.acceptChunk(msg); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-12 02:08:14
|
Revision: 3939 http://bigdata.svn.sourceforge.net/bigdata/?rev=3939&view=rev Author: thompsonbry Date: 2010-11-12 00:58:16 +0000 (Fri, 12 Nov 2010) Log Message: ----------- Missed an update to BOpUtility which broke the build. Added some comments on JoinGraph. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-12 00:18:05 UTC (rev 3938) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-11-12 00:58:16 UTC (rev 3939) @@ -912,4 +912,39 @@ } + /** + * Inject (or replace) an {@link Integer} "rowId" column. This does not have + * a side-effect on the source {@link IBindingSet}s. + * + * @param var + * The name of the column. + * @param start + * The starting value for the identifier. + * @param in + * The source {@link IBindingSet}s. + * + * @return The modified {@link IBindingSet}s. + */ + public static IBindingSet[] injectRowIdColumn(final IVariable var, + final int start, final IBindingSet[] in) { + + if (in == null) + throw new IllegalArgumentException(); + + final IBindingSet[] out = new IBindingSet[in.length]; + + for (int i = 0; i < out.length; i++) { + + final IBindingSet bset = in[i].clone(); + + bset.set(var, new Constant<Integer>(Integer.valueOf(start + i))); + + out[i] = bset; + + } + + return out; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-12 00:18:05 UTC (rev 3938) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-12 00:58:16 UTC (rev 3939) @@ -880,6 +880,11 @@ * will be falsely high by whatever ratio the chosen vertex * cardinality exceeds the one having the minimum cardinality which * is connected via an edge to the target vertex). + * + * FIXME I am not convinced that this approach is quite right. I am + * also not convinced that this approach will correctly carry the + * additional metadata on the EdgeSample (exact, estimate overflow + * and underflow, etc). */ final VertexSample moreSelectiveVertexSample = vSource.sample.rangeCount < vTarget.sample.rangeCount ? vSource.sample : vTarget.sample; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-11-17 21:37:40
|
Revision: 3953 http://bigdata.svn.sourceforge.net/bigdata/?rev=3953&view=rev Author: thompsonbry Date: 2010-11-17 21:37:33 +0000 (Wed, 17 Nov 2010) Log Message: ----------- JoinGraph - javadoc identifying some issues from a call with MikeP. BOpStats - now tracks the #of tasks which have been executed for a given operator. QueryLog - added the opCount column and some code reorganization. QueryEngineTestAnnotations - javadoc. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -123,7 +123,38 @@ * Since the join graph is fed the vertices (APs), it does not have access * to the annotated joins so we need to generated appropriately annotated * joins when sampling an edge and when evaluation a subquery. + * <p> + * One solution would be to always use the unpartitioned views of the + * indices for the runtime query optimizer, which is how we are estimating + * the range counts of the access paths right now. [Note that the static + * query optimizer ignores named and default graphs, while the runtime + * query optimizer SHOULD pay attention to these things and exploit their + * conditional selectivity for the query plan.] * + * @todo When there are optional join graphs, are we going to handle that by + * materializing a sample (or all) of the joins feeding that join graph + * and then apply the runtime optimizer to the optional join graph, + * getting out a sample to feed onto any downstream join graph? + * + * @todo When we run into a cardinality estimation underflow (the expected + * cardinality goes to zero) we could double the sample size for just + * those join paths which hit a zero estimated cardinality and re-run them + * within the round. This would imply that we keep per join path limits. + * The vertex and edge samples are already aware of the limit at which + * they were last sampled so this should not cause any problems there. + * + * @todo When comparing choices among join paths having fully bound tails where + * the estimated cardinality has also gone to zero, we should prefer to + * evaluate vertices in the tail with better index locality first. For + * example, if one vertex had one variable in the original plan while + * another had two variables, then solutions which reach the 2-var vertex + * could be spread out over a much wider range of the selected index than + * those which reach the 1-var vertex. [In order to support this, we would + * need a means to indicate that a fully bound access path should use an + * index specified by the query optimizer rather than the primary index + * for the relation. In addition, this suggests that we should keep bloom + * filters for more than just the SPO(C) index in scale-out.] + * * @todo Examine behavior when we do not have perfect covering indices. This * will mean that some vertices can not be sampled using an index and that * estimation of their cardinality will have to await the estimation of Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -54,6 +54,13 @@ */ final public CAT elapsed = new CAT(); + /** + * The #of instances of a given operator which have been created for a given + * query. This provides interesting information about the #of task instances + * for each operator which were required to execute a query. + */ + final public CAT opCount = new CAT(); + /** * #of chunks in. */ @@ -83,7 +90,9 @@ * Constructor. */ public BOpStats() { - + + opCount.increment(); + } /** @@ -98,21 +107,18 @@ return; } elapsed.add(o.elapsed.get()); + opCount.add(o.opCount.get()); chunksIn.add(o.chunksIn.get()); unitsIn.add(o.unitsIn.get()); unitsOut.add(o.unitsOut.get()); chunksOut.add(o.chunksOut.get()); -// chunksIn.addAndGet(o.chunksIn.get()); -// unitsIn.addAndGet(o.unitsIn.get()); -// unitsOut.addAndGet(o.unitsOut.get()); -// chunksOut.addAndGet(o.chunksOut.get()); } - public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(super.toString()); sb.append("{elapsed=" + elapsed.get()); + sb.append(",opCount=" + opCount.get()); sb.append(",chunksIn=" + chunksIn.get()); sb.append(",unitsIn=" + unitsIn.get()); sb.append(",chunksOut=" + chunksOut.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngineTestAnnotations.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -69,9 +69,20 @@ boolean DEFAULT_ONE_MESSAGE_PER_CHUNK = false; + /** + * This option may be used to place an optional limit on the #of concurrent + * tasks which may run for the same (bopId,shardId) for a given query. The + * query is guaranteed to make progress as long as this is some positive + * integer. Limiting this value can limit the concurrency with which certain + * operators are evaluated and that can have a negative effect on the + * throughput for a given query. + */ String MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = QueryEngineTestAnnotations.class.getName() + ".maxConcurrentTasksPerOperatorAndShard"; + /** + * The default is essentially unlimited. + */ int DEFAULT_MAX_CONCURRENT_TASKS_PER_OPERATOR_AND_SHARD = Integer.MAX_VALUE; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 17:32:16 UTC (rev 3952) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-11-17 21:37:33 UTC (rev 3953) @@ -56,6 +56,10 @@ .getLogger(QueryLog.class); static { + logTableHeader(); + } + + static public void logTableHeader() { if(log.isInfoEnabled()) log.info(QueryLog.getTableHeader()); } @@ -74,27 +78,10 @@ try { -// if (log.isDebugEnabled()) { + logDetailRows(q); - /* - * Detail row for each operator in the query. - */ - final Integer[] order = BOpUtility.getEvaluationOrder(q - .getQuery()); - - int orderIndex = 0; - for (Integer bopId : order) { - log - .info(getTableRow(q, orderIndex, bopId, false/* summary */)); - orderIndex++; - } - -// } - - // summary row. - log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), - true/* summary */)); - + logSummaryRow(q); + } catch (RuntimeException t) { log.error(t,t); @@ -105,6 +92,34 @@ } + /** + * Log a detail row for each operator in the query. + */ + static private void logDetailRows(final IRunningQuery q) { + + final Integer[] order = BOpUtility.getEvaluationOrder(q.getQuery()); + + int orderIndex = 0; + + for (Integer bopId : order) { + + log.info(getTableRow(q, orderIndex, bopId, false/* summary */)); + + orderIndex++; + + } + + } + + /** + * Log a summary row for the query. + */ + static private void logSummaryRow(final IRunningQuery q) { + + log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), true/* summary */)); + + } + static private String getTableHeader() { final StringBuilder sb = new StringBuilder(); @@ -135,6 +150,7 @@ // dynamics (aggregated for totals as well). sb.append("\tfanIO"); sb.append("\tsumMillis"); // cumulative milliseconds for eval of this operator. + sb.append("\topCount"); // cumulative #of invocations of tasks for this operator. sb.append("\tchunksIn"); sb.append("\tunitsIn"); sb.append("\tchunksOut"); @@ -305,6 +321,8 @@ sb.append('\t'); sb.append(stats.elapsed.get()); sb.append('\t'); + sb.append(stats.opCount.get()); + sb.append('\t'); sb.append(stats.chunksIn.get()); sb.append('\t'); sb.append(stats.unitsIn.get()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-01-21 14:28:24
|
Revision: 4156 http://bigdata.svn.sourceforge.net/bigdata/?rev=4156&view=rev Author: thompsonbry Date: 2011-01-21 14:28:17 +0000 (Fri, 21 Jan 2011) Log Message: ----------- Modified Union (and its base class AbstractSubqueryOp) to consume the binding sets from the source and added a unit test for this. However, UNION still can not be used in any position other than the 1st operator in a pipeline because its operands are the subqueries. Right now, in order to use UNION in a non-primary position you have to use a SubqueryOp to wrap the Union. And Union itself issues subqueries, so this is perhaps more overhead than we would like. In order to change this we would have to move the subqueries to be executed by the union from operands to annotations, which might be just fine. Added some convenience methods to QueryEngine for evaluation against non-empty binding sets. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; @@ -45,24 +46,26 @@ import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.service.IBigdataFederation; import com.bigdata.util.concurrent.LatchedExecutor; /** * Executes each of the operands as a subquery. The operands are evaluated in * the order given and with the annotated parallelism. Each subquery is run as a * separate query but is linked to the parent query in the operator is being - * evaluated. The subqueries do not receive bindings from the parent and may be + * evaluated. The subqueries receive bindings from the pipeline and may be * executed independently. By default, the subqueries are run with unlimited - * parallelism. + * parallelism. Since the #of subqueries is generally small (2), this means that + * the subqueries run in parallel. * <p> * Note: This operator must execute on the query controller. * <p> * The {@link PipelineOp.Annotations#SINK_REF} of each child operand should be * overridden to specify the parent of the this operator. If you fail to do * this, then the intermediate results of the subqueries will be routed to this - * operator, which DOES NOT pass them on. This may cause unnecessary network - * traffic. It may also cause the query to block if the buffer capacity is - * limited. + * operator. This may cause unnecessary network traffic when running against the + * {@link IBigdataFederation}. It may also cause the query to block if the + * buffer capacity is limited. * <p> * If you want to route intermediate results from other computations into * subqueries, then consider a {@link Tee} pattern instead. @@ -76,9 +79,9 @@ * </pre> * * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries MUST be explicitly routed to the SLICE operator - * using {@link PipelineOp.Annotations#SINK_REF} on each of the subqueries. + * subquery will be run once for each source {@link IBindingSet}. The output of + * those subqueries is explicitly routed to the SLICE operator using + * {@link PipelineOp.Annotations#SINK_REF} for efficiency in scale-out. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -168,13 +171,12 @@ private final AbstractSubqueryOp controllerOp; private final BOpContext<IBindingSet> context; - private final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); - private final CountDownLatch latch; private final int nparallel; private final Executor executor; - - public ControllerTask(final AbstractSubqueryOp controllerOp, final BOpContext<IBindingSet> context) { + public ControllerTask(final AbstractSubqueryOp controllerOp, + final BOpContext<IBindingSet> context) { + if (controllerOp == null) throw new IllegalArgumentException(); @@ -191,47 +193,90 @@ this.executor = new LatchedExecutor(context.getIndexManager() .getExecutorService(), nparallel); - this.latch = new CountDownLatch(controllerOp.arity()); + } - /* - * Create FutureTasks for each subquery. The futures are not - * submitted to the Executor yet. That happens in call(). By - * deferring the evaluation until call() we gain the ability to - * cancel all subqueries if any subquery fails. - */ - for (BOp op : controllerOp.args()) { + /** + * Evaluate the subqueries with limited parallelism. + */ + public Void call() throws Exception { - /* - * Task runs subquery and cancels all subqueries in [tasks] if - * it fails. - */ - tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask(op, - context)) { - /* - * Hook future to count down the latch when the task is - * done. - */ - public void run() { - try { - super.run(); - } finally { - latch.countDown(); - } + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + + try { + + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + for (IBindingSet bset : chunk) { + + consumeBindingSet(bset); + } - }); + + } + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + + // Close the source. + source.close(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + } - + } - /** - * Evaluate the subqueries with limited parallelism. - */ - public Void call() throws Exception { + private void consumeBindingSet(final IBindingSet bset) + throws InterruptedException, ExecutionException { + final List<FutureTask<IRunningQuery>> tasks = new LinkedList<FutureTask<IRunningQuery>>(); + try { + final CountDownLatch latch = new CountDownLatch(controllerOp + .arity()); + /* + * Create FutureTasks for each subquery. The futures are not + * submitted to the Executor yet. That happens in call(). By + * deferring the evaluation until call() we gain the ability to + * cancel all subqueries if any subquery fails. + */ + for (BOp op : controllerOp.args()) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] + * if it fails. + */ + tasks.add(new FutureTask<IRunningQuery>(new SubqueryTask( + op, context, bset)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }); + + } + + /* * Run subqueries with limited parallelism. */ for (FutureTask<IRunningQuery> ft : tasks) { @@ -239,12 +284,6 @@ } /* - * Close the source. Controllers do not accept inputs from the - * pipeline. - */ - context.getSource().close(); - - /* * Wait for all subqueries to complete. */ latch.await(); @@ -255,25 +294,14 @@ for (FutureTask<IRunningQuery> ft : tasks) ft.get(); - // Now that we know the subqueries ran Ok, flush the sink. - context.getSink().flush(); - - // Done. - return null; - } finally { // Cancel any tasks which are still running. for (FutureTask<IRunningQuery> ft : tasks) ft.cancel(true/* mayInterruptIfRunning */); - - context.getSink().close(); - - if (context.getSink2() != null) - context.getSink2().close(); } - + } /** @@ -294,12 +322,20 @@ */ private final BOp subQueryOp; + /** + * The input for this invocation of the subquery. + */ + private final IBindingSet bset; + public SubqueryTask(final BOp subQuery, - final BOpContext<IBindingSet> parentContext) { + final BOpContext<IBindingSet> parentContext, + final IBindingSet bset) { this.subQueryOp = subQuery; this.parentContext = parentContext; + + this.bset = bset; } @@ -312,7 +348,7 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - runningSubquery = queryEngine.eval(subQueryOp); + runningSubquery = queryEngine.eval(subQueryOp, bset); // Iterator visiting the subquery solutions. subquerySolutionItr = runningSubquery.iterator(); @@ -343,8 +379,10 @@ * Such exceptions are NOT propagated here and WILL NOT * cause the parent query to terminate. */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(runningSubquery.getCause())); + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt( + runningSubquery == null ? t + : runningSubquery.getCause())); } return runningSubquery; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/SubqueryOp.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -28,7 +28,6 @@ package com.bigdata.bop.controller; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; @@ -40,10 +39,8 @@ import com.bigdata.bop.NV; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.ThickAsynchronousIterator; /** * For each binding set presented, this operator executes a subquery. Any @@ -391,25 +388,28 @@ final QueryEngine queryEngine = parentContext.getRunningQuery() .getQueryEngine(); - final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); - - final int startId = startOp.getId(); +// final BOp startOp = BOpUtility.getPipelineStart(subQueryOp); +// +// final int startId = startOp.getId(); +// +// final UUID queryId = UUID.randomUUID(); +// +// // execute the subquery, passing in the source binding set. +// runningSubquery = queryEngine +// .eval( +// queryId, +// (PipelineOp) subQueryOp, +// new LocalChunkMessage<IBindingSet>( +// queryEngine, +// queryId, +// startId, +// -1 /* partitionId */, +// new ThickAsynchronousIterator<IBindingSet[]>( +// new IBindingSet[][] { new IBindingSet[] { bset } }))); - final UUID queryId = UUID.randomUUID(); + runningSubquery = queryEngine.eval((PipelineOp) subQueryOp, + bset); - // execute the subquery, passing in the source binding set. - runningSubquery = queryEngine - .eval( - queryId, - (PipelineOp) subQueryOp, - new LocalChunkMessage<IBindingSet>( - queryEngine, - queryId, - startId, - -1 /* partitionId */, - new ThickAsynchronousIterator<IBindingSet[]>( - new IBindingSet[][] { new IBindingSet[] { bset } }))); - long ncopied = 0L; try { @@ -491,9 +491,11 @@ * Such exceptions are NOT propagated here and WILL NOT * cause the parent query to terminate. */ - throw new RuntimeException(ControllerTask.this.context - .getRunningQuery().halt(runningSubquery.getCause())); - } + throw new RuntimeException(ControllerTask.this.context + .getRunningQuery().halt( + runningSubquery == null ? t + : runningSubquery.getCause())); + } return runningSubquery; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -35,15 +35,15 @@ /** * UNION(ops)[maxParallel(default all)] + * * <pre> * UNION([a,b,c],{}) * </pre> * - * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries will be routed to the UNION operator (their - * parent) unless the subqueries explicitly override this behavior using - * {@link PipelineOp.Annotations#SINK_REF}. + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel for each + * source {@link IBindingSet}. The output of those subqueries will be routed to + * the UNION operator (their parent) unless the subqueries explicitly override + * this behavior using {@link PipelineOp.Annotations#SINK_REF}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-21 13:17:01 UTC (rev 4155) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2011-01-21 14:28:17 UTC (rev 4156) @@ -791,16 +791,57 @@ */ public AbstractRunningQuery eval(final BOp op) throws Exception { + return eval(op, new ListBindingSet()); + + } + + /** + * Evaluate a query. This node will serve as the controller for the query. + * + * @param query + * The query to evaluate. + * @param bset + * The initial binding set to present. + * + * @return The {@link IRunningQuery}. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + */ + public AbstractRunningQuery eval(final BOp op, final IBindingSet bset) + throws Exception { + + return eval(op, newBindingSetIterator(bset)); + + } + + /** + * Evaluate a query. This node will serve as the controller for the query. + * + * @param query + * The query to evaluate. + * @param bsets + * The binding sets to be consumed by the query. + * + * @return The {@link IRunningQuery}. + * + * @throws IllegalStateException + * if the {@link QueryEngine} has been {@link #shutdown()}. + * @throws Exception + */ + public AbstractRunningQuery eval(final BOp op, + final IAsynchronousIterator<IBindingSet[]> bsets) throws Exception { + final BOp startOp = BOpUtility.getPipelineStart(op); final int startId = startOp.getId(); - + final UUID queryId = UUID.randomUUID(); return eval(queryId, (PipelineOp) op, new LocalChunkMessage<IBindingSet>(this/* queryEngine */, - queryId, startId, -1 /* partitionId */, - newBindingSetIterator(new ListBindingSet()))); + queryId, startId, -1 /* partitionId */, bsets)); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-27 21:11:09
|
Revision: 4254 http://bigdata.svn.sourceforge.net/bigdata/?rev=4254&view=rev Author: thompsonbry Date: 2011-02-27 21:11:03 +0000 (Sun, 27 Feb 2011) Log Message: ----------- Modified the SPARQL Constraint operator to handle nested type errors. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/Constraint.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/Constraint.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/Constraint.java 2011-02-27 15:09:56 UTC (rev 4253) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/constraint/Constraint.java 2011-02-27 21:11:03 UTC (rev 4254) @@ -81,19 +81,27 @@ public boolean accept(final IBindingSet bs) { - try { +// try { // evaluate the BVE operator return ((BooleanValueExpression) get(0)).get(bs); - - } catch (Exception ex) { - - // trap the type error and filter out the solution - if (log.isInfoEnabled()) - log.info("discarding solution due to error: " + bs); - return false; - - } + +// } catch (Throwable t) { +// +// if (InnerCause.isInnerCause(t, SparqlTypeErrorException.class)) { +// +// // trap the type error and filter out the solution +// if (log.isInfoEnabled()) +// log.info("discarding solution due to type error: " + bs +// + " : " + t); +// +// return false; +// +// } +// +// throw new RuntimeException(t); +// +// } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java 2011-02-27 15:09:56 UTC (rev 4253) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/joinGraph/PartitionedJoinGroup.java 2011-02-27 21:11:03 UTC (rev 4254) @@ -1061,6 +1061,8 @@ }) // ); +// final PipelineOp queryOp = lastOp; + return queryOp; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |