From: <tho...@us...> - 2010-09-06 20:45:47
|
Revision: 3511 http://bigdata.svn.sourceforge.net/bigdata/?rev=3511&view=rev Author: thompsonbry Date: 2010-09-06 20:45:37 +0000 (Mon, 06 Sep 2010) Log Message: ----------- Modified the IBindingSet API to define the contract for hashCode(). This was necessary in order for DISTINCT on binding sets to work correctly. Reorganized the QueryEngine classes out of the test suite. Added the basic framework for the FederatedQueryEngine test suite. Added a "de-chunk" pattern to unchunk iterators. Reconciled the BufferService with the ResourceService, including their test suites. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ArrayBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/EmptyBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractStepTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/ResourceManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/resources/StoreManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/DataService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/GenericStriterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBindingSet.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/striterator/TestAll.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/DelegateIndexManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/ManagedResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/Dechunkerator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestAll_DynamicSharding.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestAll_ResourceService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestCase3.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestReceiveBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestReceiveFile.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/striterator/TestDechunkerator.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BindingSetChunk.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BufferService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryPeer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ManagedBufferService.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/ReceiveBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestCase3.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestReceiveBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestReceiveFile.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestSendReceiveBuffers.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/service/TestResourceService.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ArrayBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ArrayBindingSet.java 2010-09-05 20:17:44 UTC (rev 3510) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ArrayBindingSet.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -225,6 +225,9 @@ } + // clear the hash code. + hash = 0; + assert nbound == 0; } @@ -261,6 +264,9 @@ } + // clear the hash code. + hash = 0; + nbound--; break; @@ -316,7 +322,10 @@ if (vars[i] == var) { vals[i] = val; - + + // clear the hash code. + hash = 0; + return; } @@ -327,6 +336,9 @@ vals[nbound] = val; + // clear the hash code. + hash = 0; + nbound++; } @@ -408,11 +420,16 @@ } - public boolean equals(final IBindingSet o) { + public boolean equals(final Object t) { - if (o == this) + if (this == t) return true; + if(!(t instanceof IBindingSet)) + return false; + + final IBindingSet o = (IBindingSet)t; + if (nbound != o.size()) return false; @@ -430,4 +447,27 @@ } + public int hashCode() { + + if (hash == 0) { + + int result = 0; + + for (int i = 0; i < nbound; i++) { + + if (vals[i] == null) + continue; + + result ^= vals[i].hashCode(); + + } + + hash = result; + + } + return hash; + + } + private int hash; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/EmptyBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/EmptyBindingSet.java 2010-09-05 20:17:44 UTC (rev 3510) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/EmptyBindingSet.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -96,18 +96,32 @@ return 0; } - public boolean equals(IBindingSet o) { + public boolean equals(final Object t) { - if (this == o) + if (this == t) return true; - + + if (!(t instanceof IBindingSet)) + return false; + + final IBindingSet o = (IBindingSet) t; + if (o.size() == 0) return true; - + return false; } + /** + * The hash code of an empty binding set is always zero. + */ + public int hashCode() { + + return 0; + + } + public IConstant get(IVariable var) { if (var == null) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java 2010-09-05 20:17:44 UTC (rev 3510) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/HashBindingSet.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -146,6 +146,9 @@ throw new IllegalArgumentException(); map.put(var,val); + + // clear the hash code. + hash = 0; } @@ -156,12 +159,18 @@ map.remove(var); + // clear the hash code. + hash = 0; + } public void clearAll() { map.clear(); - + + // clear the hash code. + hash = 0; + } public String toString() { @@ -233,11 +242,14 @@ final HashBindingSet bs = new HashBindingSet(); - for (IVariable var : variablesToKeep) { + for (IVariable<?> var : variablesToKeep) { - IConstant val = map.get(var); + final IConstant<?> val = map.get(var); + if (val != null) { + bs.map.put(var, val); + } } @@ -246,11 +258,16 @@ } - public boolean equals(final IBindingSet o) { + public boolean equals(final Object t) { - if (o == this) + if (this == t) return true; + if(!(t instanceof IBindingSet)) + return false; + + final IBindingSet o = (IBindingSet) t; + if (size() != o.size()) return false; @@ -260,9 +277,9 @@ final Map.Entry<IVariable,IConstant> entry = itr.next(); - final IVariable var = entry.getKey(); + final IVariable<?> var = entry.getKey(); - final IConstant val = entry.getValue(); + final IConstant<?> val = entry.getValue(); // if (!o.isBound(vars[i])) // return false; @@ -276,4 +293,27 @@ } + public int hashCode() { + + if (hash == 0) { + + int result = 0; + + for(IConstant<?> c : map.values()) { + + if (c == null) + continue; + + result ^= c.hashCode(); + + } + + hash = result; + + } + return hash; + + } + private int hash; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java 2010-09-05 20:17:44 UTC (rev 3510) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IBindingSet.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -167,6 +167,20 @@ * @param o * Another binding set. */ - public boolean equals(IBindingSet o); + public boolean equals(Object o); + + /** + * The hash code of a binding is defined as the bit-wise XOR of the hash + * codes of the {@link IConstant}s for its bound variables. Unbound + * variables are ignored when computing the hash code. Binding sets are + * unordered collections, therefore the calculated hash code intentionally + * does not dependent on the order in which the bindings are iterated over. + * The hash code reflects the current state of the bindings and must be + * recomputed if the bindings are changed. + * + * @todo the test suites should be enhanced to verify the contract for + * {@link IBindingSet#hashCode()} + */ + public int hashCode(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-05 20:17:44 UTC (rev 3510) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPipelineOp.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -27,6 +27,7 @@ package com.bigdata.bop; +import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import com.bigdata.bop.engine.BOpStats; @@ -69,6 +70,11 @@ * * @return The {@link FutureTask} which will compute the operator's * evaluation. + * + * @todo Modify to return a {@link Callable}s for now since we must run each + * task in its own thread until Java7 gives us fork/join pools and + * asynchronous file I/O. For the fork/join model we will probably + * return the ForkJoinTask. */ FutureTask<Void> eval(BOpContext<E> context); Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java (from rev 3508, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpStats.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,134 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 26, 2010 + */ + +package com.bigdata.bop.engine; + +import java.io.Serializable; + +import com.bigdata.bop.BOp; +import com.bigdata.counters.CAT; + +/** + * Statistics associated with the evaluation of a {@link BOp}. These statistics + * are per {@link BOp}. The top-level {@link BOp} will reflect the throughput + * for the entire pipeline. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo Add time per bop. This can not be directly aggregated into wall time + * since there are concurrent processes. However, this will be useful + * since we tend to process materialized chunks with the new + * {@link QueryEngine} such that the operator evaluation time now more or + * less directly corresponds to the time it takes to act on local data, + * producing local outputs. The {@link QueryEngine} itself now handles the + * transportation of data between the nodes so that time can be factored + * out of the local aspects of query execution. + */ +public class BOpStats implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + +// /** +// * The timestamp (milliseconds) associated with the start of execution for +// * the join dimension. This is not aggregated. It should only be used to +// * compute the elapsed time for the operator. +// */ +// private final long startTime; + +// /** +// * The index partition for which these statistics were collected or -1 +// * if the statistics are aggregated across index partitions. +// */ +// public final int partitionId; + + /** + * #of chunks in. + */ + final public CAT chunksIn = new CAT(); + + /** + * #of units sets in (tuples, elements, binding sets, etc). + */ + final public CAT unitsIn = new CAT(); + + /** + * #of chunks out. + */ + final public CAT chunksOut = new CAT(); + + /** + * #of units sets in (tuples, elements, binding sets, etc). + */ + final public CAT unitsOut = new CAT(); + + /** + * Constructor. + */ + public BOpStats() { + + } + + /** + * Combine the statistics (addition). + * + * @param o + * Another statistics object. + */ + public void add(final BOpStats o) { + chunksIn.add(o.chunksIn.get()); + unitsIn.add(o.unitsIn.get()); + unitsOut.add(o.unitsOut.get()); + chunksOut.add(o.chunksIn.get()); + } + + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append(getClass().getName()); + sb.append("{chunksIn=" + chunksIn.estimate_get()); + sb.append(",unitsIn=" + unitsIn.estimate_get()); + sb.append(",chunksOut=" + chunksOut.estimate_get()); + sb.append(",unitsOut=" + unitsOut.estimate_get()); + toString(sb); + sb.append("}"); + return sb.toString(); + } + + /** + * Extension hook for {@link #toString()}. + * + * @param sb + * Where to write the additional state. + */ + protected void toString(final StringBuilder sb) { + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java (from rev 3489, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BOpShard.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BSBundle.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,75 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +/** + * An immutable class capturing the evaluation context of an operator against a + * shard. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class BSBundle { + + public final int bopId; + + public final int shardId; + + public BSBundle(final int bopId, final int shardId) { + + this.bopId = bopId; + + this.shardId = shardId; + + } + + /** + * {@inheritDoc} + * + * @todo verify that this is a decent hash function. + */ + public int hashCode() { + + return (bopId * 31) + shardId; + + } + + public boolean equals(final Object o) { + + if (this == o) + return true; + + if (!(o instanceof BSBundle)) + return false; + + return bopId == ((BSBundle) o).bopId + && shardId == ((BSBundle) o).shardId; + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java (from rev 3472, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/BindingSetChunk.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BindingSetChunk.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,51 @@ +package com.bigdata.bop.engine; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.relation.accesspath.IAsynchronousIterator; + +/** + * A chunk of intermediate results which are ready to be consumed by some + * {@link BOp} in a specific query. + */ +public class BindingSetChunk { + + /** + * The query identifier. + */ + final long queryId; + + /** + * The target {@link BOp}. + */ + final int bopId; + + /** + * The index partition which is being targeted for that {@link BOp}. + */ + final int partitionId; + + /** + * The binding sets to be consumed by that {@link BOp}. + */ + final IAsynchronousIterator<IBindingSet[]> source; + + public BindingSetChunk(final long queryId, final int bopId, + final int partitionId, + final IAsynchronousIterator<IBindingSet[]> source) { + if (source == null) + throw new IllegalArgumentException(); + this.queryId = queryId; + this.bopId = bopId; + this.partitionId = partitionId; + this.source = source; + } + + public String toString() { + + return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId + + ",partitionId=" + partitionId + "}"; + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java (from rev 3489, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/HaltOpMessage.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/HaltOpMessage.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,135 @@ +package com.bigdata.bop.engine; + +import java.io.Serializable; +import java.util.UUID; + +/** + * A message sent to the {@link IQueryClient} when an operator is done executing + * for some chunk of inputs. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class HaltOpMessage implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** The identifier of the query. */ + final long queryId; + + /** The identifier of the operator. */ + final int bopId; + + /** + * The index partition identifier against which the operator was + * executing. + */ + final int partitionId; + + /** + * The identifier of the service on which the operator was executing. + */ + final UUID serviceId; + + /** + * * The cause and <code>null</code> if the operator halted normally. + */ + final Throwable cause; + + /** + * The operator identifier for the primary sink -or- <code>null</code> + * if there is no primary sink (for example, if this is the last + * operator in the pipeline). + */ + final Integer sinkId; + + /** + * The number of the {@link BindingSetChunk}s that were output for the + * primary sink. (This information is used for the atomic termination + * decision.) + * <p> + * For a given downstream operator this is ONE (1) for scale-up. For + * scale-out, this is one per index partition over which the + * intermediate results were mapped. + */ + final int sinkChunksOut; + + /** + * The operator identifier for the alternative sink -or- + * <code>null</code> if there is no alternative sink. + */ + final Integer altSinkId; + + /** + * The number of the {@link BindingSetChunk}s that were output for the + * alternative sink. (This information is used for the atomic + * termination decision.) + * <p> + * For a given downstream operator this is ONE (1) for scale-up. For + * scale-out, this is one per index partition over which the + * intermediate results were mapped. It is zero if there was no + * alternative sink for the operator. + */ + final int altSinkChunksOut; + + /** + * The statistics for the execution of the bop against the partition on + * the service. + */ + final BOpStats taskStats; + + /** + * @param queryId + * The query identifier. + * @param bopId + * The operator whose execution phase has terminated for a + * specific index partition and input chunk. + * @param partitionId + * The index partition against which the operator was + * executed. + * @param serviceId + * The node which executed the operator. + * @param cause + * <code>null</code> unless execution halted abnormally. + * @param chunksOut + * A map reporting the #of binding set chunks which were + * output for each downstream operator for which at least one + * chunk of output was produced. + * @param taskStats + * The statistics for the execution of that bop on that shard + * and service. + */ + public HaltOpMessage( + // + final long queryId, final int bopId, final int partitionId, + final UUID serviceId, Throwable cause, // + final Integer sinkId, final int sinkChunksOut,// + final Integer altSinkId, final int altSinkChunksOut,// + final BOpStats taskStats) { + + if (altSinkId != null && sinkId == null) { + // The primary sink must be defined if the altSink is defined. + throw new IllegalArgumentException(); + } + + if (sinkId != null && altSinkId != null + && sinkId.intValue() == altSinkId.intValue()) { + // The primary and alternative sink may not be the same operator. + throw new IllegalArgumentException(); + } + + this.queryId = queryId; + this.bopId = bopId; + this.partitionId = partitionId; + this.serviceId = serviceId; + this.cause = cause; + this.sinkId = sinkId; + this.sinkChunksOut = sinkChunksOut; + this.altSinkId = altSinkId; + this.altSinkChunksOut = altSinkChunksOut; + this.taskStats = taskStats; + } +} \ No newline at end of file Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java (from rev 3489, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryClient.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryClient.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,105 @@ +package com.bigdata.bop.engine; + +import java.rmi.Remote; +import java.rmi.RemoteException; + +import com.bigdata.bop.BOp; + +/** + * Interface for a client executing queries. + */ +public interface IQueryClient extends IQueryPeer, Remote { + + /* + * @todo Could return a data structure which encapsulates the query results + * and could allow multiple results from a query, e.g., one per step in a + * program. + */ + +// /** +// * Evaluate a query which materializes elements, such as an +// * {@link IPredicate}. +// * +// * @param queryId +// * The unique identifier for the query. +// * @param timestamp +// * The timestamp or transaction against which the query will run. +// * @param query +// * The query to evaluate. +// * @param source +// * The initial binding sets to get the query going (this is +// * typically an iterator visiting a single empty binding set). +// * +// * @return An iterator visiting the elements materialized by the query. +// * +// * @throws Exception +// */ +// public IChunkedIterator<?> eval(long queryId, long timestamp, BOp query) +// throws Exception; + +// /** +// * Evaluate a query which visits {@link IBindingSet}s, such as a join. +// * +// * @param queryId +// * The unique identifier for the query. +// * @param timestamp +// * The timestamp or transaction against which the query will run. +// * @param query +// * The query to evaluate. +// * @param source +// * The initial binding sets to get the query going (this is +// * typically an iterator visiting a single empty binding set). +// * +// * @return An iterator visiting {@link IBindingSet}s which result from +// * evaluating the query. +// * +// * @throws Exception +// */ +// public IChunkedIterator<IBindingSet> eval(long queryId, long timestamp, +// BOp query, IAsynchronousIterator<IBindingSet[]> source) +// throws Exception; + + /** + * Return the query. + * + * @param queryId + * The query identifier. + * @return The query. + * + * @throws RemoteException + */ + public BOp getQuery(long queryId) throws RemoteException; + + /** + * Notify the client that execution has started for some query, operator, + * node, and index partition. + */ + public void startOp(StartOpMessage msg) + throws RemoteException; + + /** + * Notify the client that execution has halted for some query, operator, + * node, shard, and source binding set chunk(s). If execution halted + * abnormally, then the cause is sent as well. + */ + public void haltOp(HaltOpMessage msg) throws RemoteException; + +// /** +// * Notify the query controller that a chunk of intermediate results is +// * available for the query. +// * +// * @param queryId +// * The query identifier. +// */ +// public void addChunk(long queryId) throws RemoteException; +// +// /** +// * Notify the query controller that a chunk of intermediate results was +// * taken for processing by the query. +// * +// * @param queryId +// * The query identifier. +// */ +// public void takeChunk(long queryId) throws RemoteException; + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java (from rev 3466, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IQueryPeer.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IQueryPeer.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,36 @@ +package com.bigdata.bop.engine; + +import java.net.InetSocketAddress; +import java.rmi.Remote; +import java.rmi.RemoteException; + +import com.bigdata.bop.BOp; + +/** + * Interface for a node participating in the exchange of NIO buffers to + * support query execution. + */ +public interface IQueryPeer extends Remote { + + /** + * Notify a service that a buffer having data for some {@link BOp} in some + * running query is available. The receiver may request the data when they + * are ready. If the query is cancelled, then the sender will drop the + * buffer. + * + * @param clientProxy + * proxy used to communicate with the client running the query. + * @param serviceAddr + * address which may be used to demand the data. + * @param queryId + * the unique query identifier. + * @param bopId + * the identifier for the target {@link BOp}. + * + * @return <code>true</code> unless the receiver knows that the query has + * already been cancelled. + */ + void bufferReady(IQueryClient clientProxy, InetSocketAddress serviceAddr, + long queryId, int bopId) throws RemoteException; + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java (from rev 3510, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/IRunningQuery.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,75 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * Created on Sep 5, 2010 + */ + +package com.bigdata.bop.engine; + +import com.bigdata.btree.ILocalBTreeView; +import com.bigdata.journal.IIndexManager; +import com.bigdata.service.IBigdataFederation; + +/** + * Interface exposing a limited set of the state of an executing query. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IRunningQuery { + + /** + * The {@link IBigdataFederation} IFF the operator is being evaluated on an + * {@link IBigdataFederation}. When evaluating operations against an + * {@link IBigdataFederation}, this reference provides access to the + * scale-out view of the indices and to other bigdata services. + */ + IBigdataFederation<?> getFederation(); + + /** + * The <strong>local</strong> {@link IIndexManager}. Query evaluation occurs + * against the local indices. In scale-out, query evaluation proceeds shard + * wise and this {@link IIndexManager} MUST be able to read on the + * {@link ILocalBTreeView}. + */ + IIndexManager getIndexManager(); + + /** + * The timestamp or transaction identifier against which the query is + * reading. + */ + long getReadTimestamp(); + + /** + * The timestamp or transaction identifier against which the query is + * writing. + */ + long getWriteTimestamp(); + + /** + * Terminate query evaluation + */ + void halt(); + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java (from rev 3489, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/PipelineUtility.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,156 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.NoSuchBOpException; + +/** + * Utility methods relevant to pipelined operator evaluation. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class PipelineUtility { + + private static final Logger log = Logger.getLogger(PipelineUtility.class); + + /** + * Return <code>true</code> iff the <i>runningCountMap</i> AND + * <i>availableChunkMap</i> map are ZERO (0) for both the given operator and + * for all operators which proceed the given operator in the tree structure + * of its operands. + * <p> + * Note: The movement of the intermediate binding set chunks forms an + * acyclic directed graph. We can decide whether or not a {@link BOp} in the + * query plan can be triggered by the current activity pattern by inspecting + * the {@link BOp} and its operands recursively. If neither the {@link BOp} + * nor any of its operands (recursively) has non-zero activity then the + * {@link BOp} can not be triggered and this method will return + * <code>true</code>. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the {@link BOp} can not be triggered given + * the query plan and the activity map. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + */ + static public boolean isDone(final int bopId, final BOp queryPlan, + final Map<Integer, BOp> queryIndex, + final Map<Integer, AtomicLong> runningCountMap, + final Map<Integer, AtomicLong> availableChunkCountMap) { + + if (queryPlan == null) + throw new IllegalArgumentException(); + if (queryIndex == null) + throw new IllegalArgumentException(); + if (availableChunkCountMap == null) + throw new IllegalArgumentException(); + + final BOp op = queryIndex.get(bopId); + + if (op == null) + throw new NoSuchBOpException(bopId); + + final Iterator<BOp> itr = BOpUtility.preOrderIterator(op); + + while (itr.hasNext()) { + + final BOp t = itr.next(); + + final Integer id = (Integer) t.getProperty(BOp.Annotations.BOP_ID); + + if (id == null) + continue; + { + + final AtomicLong runningCount = runningCountMap.get(id); + + if (runningCount != null && runningCount.get() != 0) { + + if (log.isInfoEnabled()) + log.info("Operator can be triggered: op=" + op + + ", possible trigger=" + t + " is running."); + + return false; + + } + + } + + { + + final AtomicLong availableChunkCount = availableChunkCountMap + .get(id); + + if (availableChunkCount != null + && availableChunkCount.get() != 0) { + + if (log.isInfoEnabled()) + log.info("Operator can be triggered: op=" + op + + ", possible trigger=" + t + " has " + + availableChunkCount + " chunks available."); + + return false; + + } + + } + + } + + return true; + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java (from rev 3510, branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/QueryEngine.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-09-06 20:45:37 UTC (rev 3511) @@ -0,0 +1,800 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 21, 2010 + */ + +package com.bigdata.bop.engine; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.rmi.RemoteException; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; + +import alice.tuprolog.Prolog; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BindingSetPipelineOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.bset.Union; +import com.bigdata.btree.BTree; +import com.bigdata.btree.IndexSegment; +import com.bigdata.btree.view.FusedView; +import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITx; +import com.bigdata.journal.TimestampUtility; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.spo.SPORelation; +import com.bigdata.relation.IMutableRelation; +import com.bigdata.relation.IRelation; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.relation.rule.IRule; +import com.bigdata.relation.rule.Program; +import com.bigdata.relation.rule.eval.pipeline.DistributedJoinTask; +import com.bigdata.resources.IndexManager; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.IDataService; +import com.bigdata.service.ndx.IAsynchronousWriteBufferFactory; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.IChunkedOrderedIterator; + +/** + * A class managing execution of concurrent queries against a local + * {@link IIndexManager}. + * <p> + * <h2>Design notes</h2> + * <p> + * Much of the complexity of the current approach owes itself to having to run a + * separate task for each join for each shard in order to have the appropriate + * lock when running against the unisolated shard view. This also means that the + * join task is running inside of the concurrency manager and hence has the + * local view of the shard. + * <p> + * The main, and perhaps the only, reason why we run unisolated rules is during + * closure, when we query against the unisolated indices and then write the + * entailments back on the unisolated indices. + * <p> + * Supporting closure has always been complicated. This complexity is mostly + * handled by ProgramTask#executeMutation() and + * AbstractTripleStore#newJoinNexusFactory() which play games with the + * timestamps used to read and write on the database, with commit points + * designed to create visibility for tuples written by a mutation rule, and with + * the automated advance of the read timestamp for the query in each closure + * pass in order to make newly committed tuples visible to subsequent rounds of + * closure. For scale-out, we do shard-wise auto commits so we always have a + * commit point which makes each write visible and the read timestamp is + * actually a read-only transaction which prevents the historical data we need + * during a closure round from being released as we are driving updates onto the + * federation. For the RWStore, we are having a similar problem (in the HA + * branch since that is where we are working on the RWStore) where historically + * allocated records were being released as writes drove updates on the indices. + * Again, we "solved" the problem for the RWStore using a commit point followed + * by a read-only transaction reading on that commit point to hold onto the view + * on which the next closure round needs to read (this uncovered a problem with + * the RWStore and transaction service interaction which Martyn is currently + * working to resolve through a combination of shadow allocators and deferred + * deletes which are processed once the release time is advanced by the + * transaction service). + * <p> + * The WORM does not have some of these problems with closure because we never + * delete history, so we do not need to create a commit point and a read-behind + * transaction. However, the WORM would have problems with concurrent access to + * the unisolated indices except that we hack that problem through the + * transparent use of the UnisolatedReadWriteIndex, which allows multiple + * threads to access the same unisolated index view using a read/write lock + * pattern (concurrent readers are allowed, but there is only one writer and it + * has exclusive access when it is running). This works out because we never run + * closure operations against the WORM through the concurrency manager. If we + * did, we would have to create a commit point after each mutation and use a + * read-behind transaction to prevent concurrent access to the unisolated index. + * <p> + * The main advantage that I can see of the current complexity is that it allows + * us to do load+closure as a single operation on the WORM, resulting in a + * single commit point. This makes that operation ACID without having to use + * full read/write transactions. This is how we gain the ACID contract for the + * standalone Journal in the SAIL for the WORM. Of course, the SAIL does not + * have that contract for the RWStore because we have to do the commit and + * read-behind transaction in order to have visibility and avoid concurrent + * access to the unisolated index (by reading behind on the last commit point). + * <p> + * I think that the reality is even one step more complicated. When doing truth + * maintenance (incremental closure), we bring the temporary graph to a fixed + * point (the rules write on the temp store) and then apply the delta in a + * single write to the database. That suggests that incremental truth + * maintenance would continue to be ACID, but that database-at-once-closure + * would be round-wise ACID. + * <p> + * So, I would like to suggest that we break ACID for database-at-once-closure + * and always follow the pattern of (1) do a commit before each round of + * closure; and (2) create a read-behind transaction to prevent the release of + * that commit point as we drive writes onto the indices. If we follow this + * pattern then we can write on the unisolated indices without conflict and read + * on the historical views without conflict. Since there will be a commit point + * before each mutation rule runs (which corresponds to a closure round), + * database-at-once-closure will be atomic within a round, but will not be a + * single atomic operation. Per above, I think that we would retain the ACID + * property for incremental truth maintenance against a WORM or RW mode Journal. + * + * <p> + * ---- + * </p> + * + * The advantage of this proposal (commit before each mutation rule and run + * query against a read-behind transaction) is that this could enormously + * simplify how we execute joins. + * <p> + * Right now, we use a factory pattern to create a join task on each node for + * each shard for which that node receives binding sets for a query. The main + * reason for doing this is to gain the appropriate lock for the unisolated + * index. If we never run a query against the unisolated index then we can go + * around the concurrency manager and run a single "query manager" task for all + * joins for all shards for all queries. This has some great benefits which I + * will go into below. + * <p> + * That "query manager" task would be responsible for accepting buffers + * containing elements or binding sets from other nodes and scheduling + * consumption of those data based on various criteria (order of arrival, + * priority, buffer resource requirements, timeout, etc.). This manager task + * could use a fork join pool to execute light weight operations (NIO, + * formulation of access paths from binding sets, mapping of binding sets onto + * shards, joining a chunk already read from an access path against a binding + * set, etc). Operations which touch the disk need to run in their own thread + * (until we get Java 7 async file IO, which is already available in a preview + * library). We could handle that by queuing those operations against a fixed + * size thread pool for reads. + * <p> + * This is a radical change in how we handle distributed query execution, but I + * think that it could have a huge payoff by reducing the complexity of the join + * logic, making it significantly easier to execute different kinds of join + * operations, reducing the overhead for acquiring locks for the unisolated + * index views, reducing the #of threads consumed by joins (from one per shard + * per join per query to a fixed pool of N threads for reads), etc. It would + * centralize the management of resources on each node and make it possible for + * us to handle things like join termination by simply purging data from the + * query manager task for the terminated join. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * + * FIXME Unit tests for non-distinct {@link IElementFilter}s on an + * {@link IPredicate}, unit tests for distinct element filter on an + * {@link IPredicate} which is capable of distributed operations. Do not use + * distinct where not required (SPOC, only one graph, etc). + * <p> + * It seems like the right way to approach this is by unifying the stackable CTC + * striterator pattern with the chunked iterator pattern and passing the query + * engine (or the bop context) into the iterator construction process (or simply + * requesting that the query engine construct the iterator stack). + * <p> + * In terms of harmonization, it is difficult to say which way would work + * better. In the short term we could simply allow both and mask the differences + * in how we construct the filters, but the conversion to/from striterators and + * chunked iterators seems to waste a bit of effort. + * <p> + * The trickiest part of all of this is to allow a distributed filter pattern + * where the filter gets created on a set of nodes identified by the operator + * and the elements move among those nodes using the query engine's buffers. + * <p> + * To actually implement the distributed distinct filter we need to stack the + * following: + * + * <pre> + * - ITupleIterator + * - Resolve ITuple to Element (e.g., SPOC). + * - Layer on optional IElementFilter associated with the IPredicate. + * - Layer on SameVariableConstraint iff required (done by AccessPath) + * - Resolve SPO to SPO, stripping off the context position. + * - Chunk SPOs (SPO[], IKeyOrder), where the key order is from the access path. + * - Filter SPO[] using DHT constructed on specified nodes of the cluster. + * The SPO[] chunks should be packaged into NIO buffers and shipped to those + * nodes. The results should be shipped back as a bit vectors packaged into + * a NIO buffers. + * - Dechunk SPO[] to SPO since that is the current expectation for the filter + * stack. + * - The result then gets wrapped as a {@link IChunkedOrderedIterator} by + * the AccessPath using a {@link ChunkedArrayIterator}. + * </pre> + * + * This stack is a bit complex(!). But it is certainly easy enough to generate + * the necessary bits programmatically. + * + * FIXME Handling the {@link Union} of binding sets. Consider whether the chunk + * combiner logic from the {@link DistributedJoinTask} could be reused. + * + * FIXME INSERT and DELETE which will construct elements using + * {@link IRelation#newElement(java.util.List, IBindingSet)} from a binding set + * and then use {@link IMutableRelation#insert(IChunkedOrderedIterator)} and + * {@link IMutableRelation#delete(IChunkedOrderedIterator)}. For s/o, we first + * need to move the bits into the right places so it makes sense to unpack the + * processing of the loop over the elements and move the data around, writing on + * each index as necessary. There could be eventually consistent approaches to + * this as well. For justifications we need to update some additional indices, + * in which case we are stuck going through {@link IRelation} rather than + * routing data directly or using the {@link IAsynchronousWriteBufferFactory}. + * For example, we could handle routing and writing in s/o as follows: + * + * <pre> + * INSERT(relation,bindingSets) + * + * expands to + * + * SEQUENCE( + * SELECT(s,p,o), // drop bindings that we do not need + * PARALLEL( + * INSERT_INDEX(spo), // construct (s,p,o) elements and insert + * INSERT_INDEX(pos), // construct (p,o,s) elements and insert + * INSERT_INDEX(osp), // construct (o,s,p) elements and insert + * )) + * + * </pre> + * + * The output of the SELECT operator would be automatically mapped against the + * shards on which the next operators need to write. Since there is a nested + * PARALLEL operator, the mapping will be against the shards of each of the + * given indices. (A simpler operator would invoke + * {@link SPORelation#insert(IChunkedOrderedIterator)}. Handling justifications + * requires that we also formulate the justification chain from the pattern of + * variable bindings in the rule). + * + * FIXME Handle {@link Program}s. There are three flavors, which should probably + * be broken into three operators: sequence(ops), set(ops), and closure(op). The + * 'set' version would be parallelized, or at least have an annotation for + * parallel evaluation. These things belong in the same broad category as the + * join graph since they are operators which control the evaluation of other + * operators (the current pipeline join also has that characteristic which it + * uses to do the nested index subqueries). + * + * FIXME SPARQL to BOP translation + * <p> + * The initial pass should translate from {@link IRule} to {@link BOp}s so we + * can immediately begin running SPARQL queries against the {@link QueryEngine}. + * A second pass should explore a rules base translation from the openrdf SPARQL + * operator tree into {@link BOp}s, perhaps using an embedded {@link Prolog} + * engine. What follows is a partial list of special considerations for that + * translation: + * <ul> + * <li>Distinct can be trivially enforced for default graph queries against the + * SPOC index.</li> + * <li>Local distinct should wait until there is more than one tuple from the + * index since a single tuple does not need to be made distinct using a hash + * map.</li> + * <li>Low volume distributed queries should use solution modifiers which + * evaluate on the query controller node rather than using distributed sort, + * distinct, slice, or aggregation operators.</li> + * <li></li> + * <li></li> + * <li></li> + * <li>High volume queries should use special operators (different + * implementations of joins, use an external merge sort, etc).</li> + * </ul> + * + * FIXME SPARQL Coverage: Add native support for all SPARQL operators. A lot of + * this can be picked up from Sesame. Some things, such as isIRI() can be done + * natively against the {@link IV}. Likewise, there is already a set of + * comparison methods for {@link IV}s which are inlined values. Add support for + * <ul> + * <li></li> + * <li></li> + * <li></li> + * <li></li> + * <li></li> + * <li></li> + * </ul> + * + * @todo Expander patterns will continue to exist until we handle the standalone + * backchainers in a different manner for scale-out so add support for + * those for now. + * + * @todo There is a going to be a relationship to recycling of intermediates + * (for individual {@link BOp}s or {@link BOp} tree fragments) and a + * distributed query cache which handles invalidation (for updates) and + * {@link BOp} aware reuse of result sets available in the cache. This + * sort of thing will have to be coordinated among the cache nodes. + */ +public class QueryEngine implements IQueryPeer, IQueryClient { + + pr... [truncated message content] |