From: <tho...@us...> - 2010-10-25 15:41:03
|
Revision: 3843 http://bigdata.svn.sourceforge.net/bigdata/?rev=3843&view=rev Author: thompsonbry Date: 2010-10-25 15:40:55 +0000 (Mon, 25 Oct 2010) Log Message: ----------- accessPathDups were not being detected due to a change in the quads query branch such that hashCode() and equals() for predicates can not be used to test for duplicate patterns of variables and constants. This was fixed by adding the HashedPredicate class. That change cuts significant time from Q2 and Q9. This change did break 2 of the unit tests in TestQueryEngine and I have not yet diagnosed the problem there. However, the SAIL test suites are all good with this change. This change also reintroduces query-level logging with breakouts for each join in the pipeline evaluation. This logging level is controlled by QueryLog and corresponds closely to the older RuleStats logging. At this point, lexicon materialization (Q6,Q14) appears to be slightly slower in the branch, Q2 is faster, and Q9 is slightly slower. The remaining performance difference could be: - lexicon materialization changes. - chaining buffers in the trunk but not in the branch. - buffer configuration properties (explore this again for Q2 and Q9 now that dups are being eliminated). [java] ### Finished testing BIGDATA_SPARQL_ENDPOINT ### [java] BIGDATA_SPARQL_ENDPOINT #trials=10 #parallel=1 [java] query Time Result# [java] query1 56 4 [java] query3 37 6 [java] query4 68 34 [java] query5 108 719 [java] query7 34 61 [java] query8 379 6463 [java] query10 25 0 [java] query11 24 0 [java] query12 26 0 [java] query13 25 0 [java] query14 4046 393730 [java] query6 4056 430114 [java] query2 983 130 [java] query9 5280 8627 [java] Total 15147 procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu------ r b swpd free buff cache si so bi bo in cs us sy id wa st 0 0 0 9886784 337552 4922340 0 0 7 5 42 38 2 0 98 0 0 3 0 0 7829228 337628 4922436 0 0 0 19 1117 186494 54 6 41 0 0 7 0 0 7718196 337700 4922428 0 0 0 9 1109 233047 73 6 20 0 0 0 0 0 8014492 337764 4922372 0 0 0 9 1114 8260 63 3 34 0 0 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -283,17 +283,17 @@ boolean DEFAULT_CONTROLLER = false; - /** - * For hash partitioned operators, this is the set of the member nodes - * for the operator. - * <p> - * This annotation is required for such operators since the set of known - * nodes of a given type (such as all data services) can otherwise - * change at runtime. - * - * @todo Move onto an interface parallel to {@link IShardwisePipelineOp} - */ - String MEMBER_SERVICES = "memberServices"; +// /** +// * For hash partitioned operators, this is the set of the member nodes +// * for the operator. +// * <p> +// * This annotation is required for such operators since the set of known +// * nodes of a given type (such as all data services) can otherwise +// * change at runtime. +// * +// * @todo Move onto an interface parallel to {@link IShardwisePipelineOp} +// */ +// String MEMBER_SERVICES = "memberServices"; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -549,6 +549,49 @@ } + /** + * Return a list containing the evaluation order for the pipeline. Only the + * child operands are visited. Operators in subqueries are not visited since + * they will be assigned {@link BOpStats} objects when they are run as a + * subquery. The evaluation order is given by the depth-first left-deep + * traversal of the query. + * + * @todo unit tests. + */ + public static Integer[] getEvaluationOrder(final BOp op) { + + final List<Integer> order = new LinkedList<Integer>(); + + getEvaluationOrder(op, order, 0/*depth*/); + + return order.toArray(new Integer[order.size()]); + + } + + private static void getEvaluationOrder(final BOp op, final List<Integer> order, final int depth) { + + if(!(op instanceof PipelineOp)) + return; + + final int bopId = op.getId(); + + if (depth == 0 + || !op.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)) { + + if (op.arity() > 0) { + + // left-deep recursion + getEvaluationOrder(op.get(0), order, depth + 1); + + } + + } + + order.add(bopId); + + } + /** * Combine chunks drawn from an iterator into a single chunk. This is useful * when materializing intermediate results for an all-at-once operator. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -543,70 +543,91 @@ } - /* - * Intentionally removed. See BOpBase. - * - * hashCode() and equals() for Predicate were once used to cache access - * paths, but that code was history long before we developed the bop model. - */ - -// public boolean equals(final Object other) { -// -// if (this == other) -// return true; -// -// if(!(other instanceof IPredicate<?>)) -// return false; -// -// final IPredicate<?> o = (IPredicate<?>)other; -// -// final int arity = arity(); -// -// if(arity != o.arity()) return false; -// -// for (int i = 0; i < arity; i++) { -// -// final IVariableOrConstant<?> x = get(i); -// -// final IVariableOrConstant<?> y = o.get(i); -// -// if (x != y && !(x.equals(y))) { -// -// return false; -// -// } -// -// } -// -// return true; -// -// } -// -// public int hashCode() { -// -// int h = hash; -// -// if (h == 0) { -// -// final int n = arity(); -// -// for (int i = 0; i < n; i++) { -// -// h = 31 * h + get(i).hashCode(); -// -// } -// -// hash = h; -// -// } -// -// return h; -// -// } -// -// /** -// * Caches the hash code. -// */ -// private int hash = 0; + /** + * This class may be used to insert instances of {@link IPredicate}s into a + * hash map where equals is decided based solely on the pattern of variables + * and constants found on the {@link IPredicate}. This may be used to create + * access path caches or to identify and eliminate duplicate requests for + * the same access path. + */ + public static class HashedPredicate<E> { + /** + * The predicate. + */ + public final IPredicate<E> pred; + + /** + * The cached hash code. + */ + final private int hash; + + public HashedPredicate(final IPredicate<E> pred) { + + if (pred == null) + throw new IllegalArgumentException(); + + this.pred = pred; + + this.hash = computeHash(); + + } + + public boolean equals(final Object other) { + + if (this == other) + return true; + + if (!(other instanceof HashedPredicate<?>)) + return false; + + final IPredicate<?> o = ((HashedPredicate<?>) other).pred; + + final int arity = pred.arity(); + + if (arity != o.arity()) + return false; + + for (int i = 0; i < arity; i++) { + + final IVariableOrConstant<?> x = pred.get(i); + + final IVariableOrConstant<?> y = o.get(i); + + if (x != y && !(x.equals(y))) { + + return false; + + } + + } + + return true; + + } + + public int hashCode() { + + return hash; + + } + + private final int computeHash() { + + int h = 0; + + final int n = pred.arity(); + + for (int i = 0; i < n; i++) { + + h = 31 * h + pred.get(i).hashCode(); + + } + + return h; + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -47,13 +47,13 @@ */ private static final long serialVersionUID = 1L; -// /** -// * The timestamp (nanoseconds) assigned when this {@link BOpStats} object -// * was creatred. This can not be directly aggregated into wall time since -// * concurrent processes are nearly always used during query evaluation. -// */ -// private final long startTime = System.nanoTime(); - + /** + * The elapsed time (milliseconds) for the corresponding operation. When + * aggregated, this will generally exceed the wall time since concurrent + * processes are nearly always used during query evaluation. + */ + final public CAT elapsed = new CAT(); + /** * #of chunks in. */ @@ -97,6 +97,7 @@ // Do not add to self! return; } + elapsed.add(o.elapsed.get()); chunksIn.add(o.chunksIn.get()); unitsIn.add(o.unitsIn.get()); unitsOut.add(o.unitsOut.get()); @@ -111,7 +112,8 @@ public String toString() { final StringBuilder sb = new StringBuilder(); sb.append(super.toString()); - sb.append("{chunksIn=" + chunksIn.get()); + sb.append("{elapsed=" + elapsed.get()); + sb.append(",chunksIn=" + chunksIn.get()); sb.append(",unitsIn=" + unitsIn.get()); sb.append(",chunksOut=" + chunksOut.get()); sb.append(",unitsOut=" + unitsOut.get()); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -27,18 +27,35 @@ package com.bigdata.bop.engine; +import java.util.Map; +import java.util.UUID; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.PipelineOp; import com.bigdata.btree.ILocalBTreeView; import com.bigdata.journal.IIndexManager; import com.bigdata.service.IBigdataFederation; /** - * Interface exposing a limited set of the state of an executing query. + * Non-Remote interface exposing a limited set of the state of an executing + * query. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ public interface IRunningQuery { + /** + * The query. + */ + BOp getQuery(); + + /** + * The unique identifier for this query. + */ + UUID getQueryId(); + /** * The {@link IBigdataFederation} IFF the operator is being evaluated on an * {@link IBigdataFederation}. When evaluating operations against an @@ -59,8 +76,51 @@ * The query engine. This may be used to submit subqueries for evaluation. */ QueryEngine getQueryEngine(); + + /** + * Return an unmodifiable index from {@link BOp.Annotations#BOP_ID} to + * {@link BOp}. This index may contain operators which are not part of the + * pipeline evaluation, such as {@link IPredicate}s. + */ + Map<Integer/*bopId*/,BOp> getBOpIndex(); + + /** + * Return an unmodifiable map exposing the statistics for the operators in + * the query and <code>null</code> unless this is the query controller. + * There will be a single entry in the map for each distinct + * {@link PipelineOp}. Entries might not appear until that operator has + * either begun or completed at least one evaluation phase. This index only + * contains operators which are actually part of the pipeline evaluation. + */ + Map<Integer/* bopId */, BOpStats> getStats(); /** + * Return the query deadline (the time at which it will terminate regardless + * of its run state). + * + * @return The query deadline (milliseconds since the epoch) and + * {@link Long#MAX_VALUE} if no explicit deadline was specified. + */ + public long getDeadline(); + + /** + * The timestamp (ms) when the query began execution. + */ + public long getStartTime(); + + /** + * The timestamp (ms) when the query was done and ZERO (0) if the query is + * not yet done. + */ + public long getDoneTime(); + + /** + * The elapsed time (ms) for the query. This will be updated for each call + * until the query is done executing. + */ + public long getElapsed(); + + /** * Cancel the running query (normal termination). * <p> * Note: This method provides a means for an operator to indicate that the @@ -84,5 +144,11 @@ * if the argument is <code>null</code>. */ Throwable halt(final Throwable t); + + /** + * Return the cause if the query was terminated by an exception. + * @return + */ + Throwable getCause(); } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryLog.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -0,0 +1,304 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +/* + * Created on Jun 22, 2009 + */ + +package com.bigdata.bop.engine; + +import java.text.DateFormat; +import java.util.Date; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IPredicate; +import com.bigdata.bop.join.PipelineJoin; +import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; +import com.bigdata.rdf.sail.Rule2BOpUtility; +import com.bigdata.striterator.IKeyOrder; + +/** + * Class defines the log on which summary operator execution statistics are + * written.. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: RuleLog.java 3448 2010-08-18 20:55:58Z thompsonbry $ + */ +public class QueryLog { + + protected static final transient Logger log = Logger + .getLogger(QueryLog.class); + + static { + if(log.isInfoEnabled()) + log.info(QueryLog.getTableHeader()); + } + + /** + * Log rule execution statistics. + * + * @param stats + * The rule execution statistics. + * + * @todo need start and end time for the query. + */ + static public void log(final IRunningQuery q) { + + if (log.isInfoEnabled()) { + + final Integer[] order = BOpUtility.getEvaluationOrder(q.getQuery()); + + log.info(getTableRow(q, -1/* orderIndex */, q.getQuery().getId(), + true/* summary */)); + + int orderIndex = 0; + for (Integer bopId : order) { + log.info(getTableRow(q, orderIndex, bopId, false/* summary */)); + orderIndex++; + } + + } + + } + + static private String getTableHeader() { + + final StringBuilder sb = new StringBuilder(); + + /* + * Common columns for the overall query and for each pipeline operator. + */ + sb.append("queryId"); + sb.append("\tbeginTime"); + sb.append("\tdoneTime"); + sb.append("\tdeadline"); + sb.append("\telapsed"); + sb.append("\tserviceId"); + sb.append("\tcause"); + sb.append("\tbop"); + /* + * Columns for each pipeline operator. + */ + sb.append("\tevalOrder"); // [0..n-1] + sb.append("\tbopId"); + sb.append("\tevalContext"); + sb.append("\tcontroller"); + // metadata considered by the static optimizer. + sb.append("\tstaticBestKeyOrder"); // original key order assigned by static optimizer. + sb.append("\tnvars"); // #of variables in the predicate for a join. + sb.append("\tfastRangeCount"); // fast range count used by the static optimizer. + // dynamics (aggregated for totals as well). + sb.append("\tfanIO"); + sb.append("\tsumMillis"); // cumulative milliseconds for eval of this operator. + sb.append("\tchunksIn"); + sb.append("\tunitsIn"); + sb.append("\tchunksOut"); + sb.append("\tunitsOut"); + sb.append("\tmultipler"); // expansion rate multipler in the solution count. + sb.append("\taccessPathDups"); + sb.append("\taccessPathCount"); + sb.append("\taccessPathChunksIn"); + sb.append("\taccessPathUnitsIn"); + // dynamics based on elapsed wall clock time. + sb.append("\tsolutions/ms"); + sb.append("\tmutations/ms"); + // + // cost model(s) + // + sb.append('\n'); + + return sb.toString(); + + } + + /** + * Return a tabular representation of the query {@link RunState}. + * + * @param q The {@link IRunningQuery}. + * @param evalOrder The evaluation order for the operator. + * @param bopId The identifier for the operator. + * @param summary <code>true</code> iff the summary for the query should be written. + * @return The row of the table. + */ + static private String getTableRow(final IRunningQuery q, final int evalOrder, final Integer bopId, final boolean summary) { + + final StringBuilder sb = new StringBuilder(); + + final DateFormat dateFormat = DateFormat.getDateTimeInstance( + DateFormat.FULL, DateFormat.FULL); + + // The elapsed time for the query (wall time in milliseconds). + final long elapsed = q.getElapsed(); + + // The serviceId on which the query is running : null unless scale-out. + final UUID serviceId = q.getQueryEngine().getServiceUUID(); + + // The thrown cause : null unless the query was terminated abnormally. + final Throwable cause = q.getCause(); + + sb.append(q.getQueryId()); + sb.append('\t'); + sb.append(dateFormat.format(new Date(q.getStartTime()))); + sb.append('\t'); + sb.append(dateFormat.format(new Date(q.getDoneTime()))); + sb.append('\t'); + if(q.getDeadline()!=Long.MAX_VALUE) + sb.append(dateFormat.format(new Date(q.getDeadline()))); + sb.append('\t'); + sb.append(elapsed); + sb.append('\t'); + sb.append(serviceId == null ? "N/A" : serviceId.toString()); + sb.append('\t'); + if (cause != null) + sb.append(cause.getLocalizedMessage()); + + final Map<Integer, BOp> bopIndex = q.getBOpIndex(); + final Map<Integer, BOpStats> statsMap = q.getStats(); + final BOp bop = bopIndex.get(bopId); + + // the operator. + sb.append('\t'); + if (summary) { + /* + * The entire query (recursively). New lines are translated out to + * keep this from breaking the table format. + */ + sb.append(BOpUtility.toString(q.getQuery()).replace('\n', ' ')); + } else { + // Otherwise how just this bop. + sb.append(bopIndex.get(bopId).toString()); + } + + sb.append('\t'); + sb.append(evalOrder); + sb.append('\t'); + sb.append(Integer.toString(bopId)); + sb.append('\t'); + sb.append(bop.getEvaluationContext()); + sb.append('\t'); + sb.append(bop.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)); + + /* + * Static optimizer metadata. + * + * FIXME Should report [nvars] be the expected asBound #of variables + * given the assigned evaluation order and the expectation of propagated + * bindings (optionals may leave some unbound). + */ + { + + final IPredicate pred = (IPredicate<?>) bop + .getProperty(PipelineJoin.Annotations.PREDICATE); + + if (pred != null) { + + final IKeyOrder keyOrder = (IKeyOrder<?>) pred + .getProperty(Rule2BOpUtility.Annotations.ORIGINAL_INDEX); + + final Long rangeCount = (Long) pred + .getProperty(Rule2BOpUtility.Annotations.ESTIMATED_CARDINALITY); + + sb.append('\t'); // keyorder + if (keyOrder != null) + sb.append(keyOrder); + + sb.append('\t'); // nvars + if (keyOrder != null) + sb.append(pred.getVariableCount(keyOrder)); + + sb.append('\t'); // rangeCount + if (rangeCount!= null) + sb.append(rangeCount); + + } else { + sb.append('\t'); // keyorder + sb.append('\t'); // nvars + sb.append('\t'); // rangeCount + } + } + + /* + * Dynamics. + */ + + int fanIO = 0; // @todo aggregate from RunState. + + final PipelineJoinStats stats = new PipelineJoinStats(); + if(summary) { + // Aggregate the statistics for all pipeline operators. + for (BOpStats t : statsMap.values()) { + stats.add(t); + } + } else { + // Just this operator. + stats.add(statsMap.get(bopId)); + } + final long unitsIn = stats.unitsIn.get(); + final long unitsOut = stats.unitsOut.get(); + sb.append('\t'); + sb.append(Integer.toString(fanIO)); + sb.append('\t'); + sb.append(stats.elapsed.get()); + sb.append('\t'); + sb.append(stats.chunksIn.get()); + sb.append('\t'); + sb.append(stats.unitsIn.get()); + sb.append('\t'); + sb.append(stats.chunksOut.get()); + sb.append('\t'); + sb.append(stats.unitsOut.get()); + sb.append('\t'); + sb.append(unitsIn == 0 ? "N/A" : unitsOut / (double) unitsIn); + sb.append('\t'); + sb.append(stats.accessPathDups.get()); + sb.append('\t'); + sb.append(stats.accessPathCount.get()); + sb.append('\t'); + sb.append(stats.accessPathChunksIn.get()); + sb.append('\t'); + sb.append(stats.accessPathUnitsIn.get()); + + /* + * Use the total elapsed time for the query (wall time). + */ + // solutions/ms + sb.append('\t'); + sb.append(elapsed == 0 ? 0 : stats.unitsOut.get() / elapsed); + // mutations/ms : @todo mutations/ms. + sb.append('\t'); +// sb.append(elapsed==0?0:stats.unitsOut.get()/elapsed); + + sb.append('\n'); + + return sb.toString(); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -45,7 +45,9 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpUtility; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.join.PipelineJoin.PipelineJoinStats; import com.bigdata.relation.accesspath.IBlockingBuffer; /** @@ -744,6 +746,8 @@ sb.append("\tlabel"); sb.append("\tbopId"); sb.append("\tserviceId"); + sb.append("\tevalContext"); + sb.append("\tcontroller"); sb.append("\tcause"); sb.append("\tbop"); sb.append("\tshardId"); @@ -767,7 +771,16 @@ } - sb.append("\tstats"); + sb.append("\telapsed"); + sb.append("\tchunksIn"); + sb.append("\tunitsIn"); + sb.append("\tchunksOut"); + sb.append("\tunitsOut"); + sb.append("\taccessPathDups"); + sb.append("\taccessPathCount"); + sb.append("\taccessPathChunksIn"); + sb.append("\taccessPathUnitsIn"); + //{chunksIn=1,unitsIn=100,chunksOut=4,unitsOut=313,accessPathDups=0,accessPathCount=100,chunkCount=100,elementCount=313} sb.append('\n'); @@ -830,14 +843,34 @@ sb.append('\t'); sb.append(serviceId == null ? "N/A" : serviceId.toString()); + { + final BOp bop = bopIndex.get(bopId); + sb.append('\t'); + sb.append(bop.getEvaluationContext()); + sb.append('\t'); + sb.append(bop.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)); + } + // the thrown cause. sb.append('\t'); if (cause != null) sb.append(cause.getLocalizedMessage()); - // the operator. - sb.append('\t'); - sb.append(bopIndex.get(bopId)); + // the operator. + sb.append('\t'); + if (nsteps.get() == 1) { + /* + * For the startQ row @ nsteps==1, show the entire query. This is + * the only way people will be able to see the detailed annotations + * on predicates used in joins. New line characters are translated + * out to keep things in the table format. + */ + sb.append(BOpUtility.toString(query).replace('\n', ' ')); + } else { + // Otherwise how just this bop. + sb.append(bopIndex.get(bopId).toString()); + } sb.append('\t'); sb.append(Integer.toString(shardId)); @@ -873,11 +906,33 @@ } - // the statistics : this is at the end to keep the table pretty. - sb.append('\t'); + /* + * The statistics. This is at the end to keep the table pretty. + * Different kinds of operators may have additional statistics. They + * have to be explicitly handled here to format them into a table. + */ if (stats != null) { - // @todo use a multi-column version of stats. - sb.append(stats.toString()); + sb.append('\t'); + sb.append(stats.elapsed.get()); + sb.append('\t'); + sb.append(stats.chunksIn.get()); + sb.append('\t'); + sb.append(stats.unitsIn.get()); + sb.append('\t'); + sb.append(stats.chunksOut.get()); + sb.append('\t'); + sb.append(stats.unitsOut.get()); + if (stats instanceof PipelineJoinStats) { + final PipelineJoinStats t = (PipelineJoinStats) stats; + sb.append('\t'); + sb.append(t.accessPathDups.get()); + sb.append('\t'); + sb.append(t.accessPathCount.get()); + sb.append('\t'); + sb.append(t.accessPathChunksIn.get()); + sb.append('\t'); + sb.append(t.accessPathUnitsIn.get()); + } } sb.append('\n'); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -27,6 +27,7 @@ */ package com.bigdata.bop.engine; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -109,6 +110,18 @@ */ final private AtomicLong deadline = new AtomicLong(Long.MAX_VALUE); + /** + * The timestamp(ms) when the query begins to execute. + */ + final private AtomicLong startTime = new AtomicLong(System + .currentTimeMillis()); + + /** + * The timestamp (ms) when the query is done executing and ZERO (0L) if the + * query is not done. + */ + final private AtomicLong doneTime = new AtomicLong(0L); + /** * <code>true</code> iff the outer {@link QueryEngine} is the controller for * this query. @@ -304,19 +317,25 @@ } - /** - * Return the query deadline (the time at which it will terminate regardless - * of its run state). - * - * @return The query deadline (milliseconds since the epoch) and - * {@link Long#MAX_VALUE} if no explicit deadline was specified. - */ public long getDeadline() { - return deadline.get(); + } + public long getStartTime() { + return startTime.get(); } + public long getDoneTime() { + return doneTime.get(); + } + + public long getElapsed() { + long mark = doneTime.get(); + if (mark == 0L) + mark = System.currentTimeMillis(); + return mark - startTime.get(); + } + /** * The class executing the query on this node. */ @@ -366,31 +385,15 @@ } - /** - * Return the current statistics for the query and <code>null</code> unless - * this is the query controller. There will be a single entry in the map for - * each distinct {@link PipelineOp}. The map entries are inserted when we - * first begin to run an instance of that operator on some - * {@link IChunkMessage}. - */ public Map<Integer/* bopId */, BOpStats> getStats() { - return statsMap; + return Collections.unmodifiableMap(statsMap); } - /** - * Lookup and return the {@link BOp} with that identifier using an index. - * - * @param bopId - * The identifier. - * - * @return The {@link BOp} -or- <code>null</code> if no {@link BOp} was - * found in the query with for that identifier. - */ - public BOp getBOp(final int bopId) { + public Map<Integer,BOp> getBOpIndex() { - return bopIndex.get(bopId); + return bopIndex; } @@ -1295,10 +1298,16 @@ clientProxy.startOp(new StartOpMessage(queryId, t.bopId, t.partitionId, serviceId, t.messagesIn)); - /* - * Run the operator task. - */ - t.call(); + /* + * Run the operator task. + */ + final long begin = System.currentTimeMillis(); + try { + t.call(); + } finally { + t.context.getStats().elapsed.add(System.currentTimeMillis() + - begin); + } /* * Queue task to notify the query controller that operator task @@ -1972,6 +1981,11 @@ } // life cycle hook for the end of the query. lifeCycleTearDownQuery(); + // mark done time. + doneTime.set(System.currentTimeMillis()); + // log summary statistics for the query. + if (isController()) + QueryLog.log(this); } // remove from the collection of running queries. queryEngine.halt(this); @@ -2066,6 +2080,12 @@ } + final public Throwable getCause() { + + return future.getCause(); + + } + public IBigdataFederation<?> getFederation() { return queryEngine.getFederation(); @@ -2097,5 +2117,5 @@ return StandaloneChunkHandler.INSTANCE; } - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederationChunkHandler.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -108,7 +108,7 @@ final FederatedRunningQuery q = (FederatedRunningQuery) query; - final BOp targetOp = q.getBOp(sinkId); + final BOp targetOp = q.getBOpIndex().get(sinkId); if (targetOp == null) throw new IllegalStateException("Not found: " + sinkId); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -46,13 +46,14 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; -import com.bigdata.bop.NV; -import com.bigdata.bop.PipelineOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IShardwisePipelineOp; import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.ap.Predicate.HashedPredicate; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.BytesUtil; import com.bigdata.btree.keys.IKeyBuilder; @@ -179,36 +180,36 @@ /** * The #of chunks read from an {@link IAccessPath}. */ - public final CAT chunkCount = new CAT(); + public final CAT accessPathChunksIn = new CAT(); /** * The #of elements read from an {@link IAccessPath}. */ - public final CAT elementCount = new CAT(); + public final CAT accessPathUnitsIn = new CAT(); - /** - * The maximum observed fan in for this join dimension (maximum #of - * sources observed writing on any join task for this join dimension). - * Since join tasks may be closed and new join tasks re-opened for the - * same query, join dimension and index partition, and since each join - * task for the same join dimension could, in principle, have a - * different fan in based on the actual binding sets propagated this is - * not necessarily the "actual" fan in for the join dimension. You would - * have to track the #of distinct partitionId values to track that. - */ - public int fanIn; +// /** +// * The maximum observed fan in for this join dimension (maximum #of +// * sources observed writing on any join task for this join dimension). +// * Since join tasks may be closed and new join tasks re-opened for the +// * same query, join dimension and index partition, and since each join +// * task for the same join dimension could, in principle, have a +// * different fan in based on the actual binding sets propagated this is +// * not necessarily the "actual" fan in for the join dimension. You would +// * have to track the #of distinct partitionId values to track that. +// */ +// public int fanIn; +// +// /** +// * The maximum observed fan out for this join dimension (maximum #of +// * sinks on which any join task is writing for this join dimension). +// * Since join tasks may be closed and new join tasks re-opened for the +// * same query, join dimension and index partition, and since each join +// * task for the same join dimension could, in principle, have a +// * different fan out based on the actual binding sets propagated this is +// * not necessarily the "actual" fan out for the join dimension. +// */ +// public int fanOut; - /** - * The maximum observed fan out for this join dimension (maximum #of - * sinks on which any join task is writing for this join dimension). - * Since join tasks may be closed and new join tasks re-opened for the - * same query, join dimension and index partition, and since each join - * task for the same join dimension could, in principle, have a - * different fan out based on the actual binding sets propagated this is - * not necessarily the "actual" fan out for the join dimension. - */ - public int fanOut; - public void add(final BOpStats o) { super.add(o); @@ -221,18 +222,18 @@ accessPathCount.add(t.accessPathCount.get()); - chunkCount.add(t.chunkCount.get()); + accessPathChunksIn.add(t.accessPathChunksIn.get()); - elementCount.add(t.elementCount.get()); + accessPathUnitsIn.add(t.accessPathUnitsIn.get()); - if (t.fanIn > this.fanIn) { - // maximum reported fanIn for this join dimension. - this.fanIn = t.fanIn; - } - if (t.fanOut > this.fanOut) { - // maximum reported fanOut for this join dimension. - this.fanOut += t.fanOut; - } +// if (t.fanIn > this.fanIn) { +// // maximum reported fanIn for this join dimension. +// this.fanIn = t.fanIn; +// } +// if (t.fanOut > this.fanOut) { +// // maximum reported fanOut for this join dimension. +// this.fanOut += t.fanOut; +// } } @@ -242,8 +243,8 @@ protected void toString(final StringBuilder sb) { sb.append(",accessPathDups=" + accessPathDups.estimate_get()); sb.append(",accessPathCount=" + accessPathCount.estimate_get()); - sb.append(",chunkCount=" + chunkCount.estimate_get()); - sb.append(",elementCount=" + elementCount.estimate_get()); + sb.append(",accessPathChunksIn=" + accessPathChunksIn.estimate_get()); + sb.append(",accessPathUnitsIn=" + accessPathUnitsIn.estimate_get()); } } @@ -530,6 +531,8 @@ */ public Void call() throws Exception { +// final long begin = System.currentTimeMillis(); + if (log.isDebugEnabled()) log.debug("joinOp=" + joinOp); @@ -597,6 +600,10 @@ throw new RuntimeException(t); +// } finally { +// +// stats.elapsed.add(System.currentTimeMillis() - begin); + } } @@ -849,7 +856,7 @@ * Aggregate the source bindingSets that license the same * asBound predicate. */ - final Map<IPredicate<E>, Collection<IBindingSet>> map = combineBindingSets(chunk); + final Map<HashedPredicate<E>, Collection<IBindingSet>> map = combineBindingSets(chunk); /* * Generate an AccessPathTask from each distinct asBound @@ -936,13 +943,13 @@ * bindingSets in the chunk from which the predicate was * generated. */ - protected Map<IPredicate<E>, Collection<IBindingSet>> combineBindingSets( + protected Map<HashedPredicate<E>, Collection<IBindingSet>> combineBindingSets( final IBindingSet[] chunk) { if (log.isDebugEnabled()) log.debug("chunkSize=" + chunk.length); - final Map<IPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<IPredicate<E>, Collection<IBindingSet>>( + final Map<HashedPredicate<E>, Collection<IBindingSet>> map = new LinkedHashMap<HashedPredicate<E>, Collection<IBindingSet>>( chunk.length); for (IBindingSet bindingSet : chunk) { @@ -970,7 +977,8 @@ } // lookup the asBound predicate in the map. - Collection<IBindingSet> values = map.get(asBound); + final HashedPredicate<E> hashedPred = new HashedPredicate<E>(asBound); + Collection<IBindingSet> values = map.get(hashedPred); if (values == null) { @@ -983,7 +991,7 @@ values = new LinkedList<IBindingSet>(); - map.put(asBound, values); + map.put(hashedPred, values); } else { @@ -1024,7 +1032,7 @@ * @throws Exception */ protected AccessPathTask[] getAccessPathTasks( - final Map<IPredicate<E>, Collection<IBindingSet>> map) { + final Map<HashedPredicate<E>, Collection<IBindingSet>> map) { final int n = map.size(); @@ -1033,7 +1041,7 @@ final AccessPathTask[] tasks = new JoinTask.AccessPathTask[n]; - final Iterator<Map.Entry<IPredicate<E>, Collection<IBindingSet>>> itr = map + final Iterator<Map.Entry<HashedPredicate<E>, Collection<IBindingSet>>> itr = map .entrySet().iterator(); int i = 0; @@ -1042,10 +1050,10 @@ halted(); - final Map.Entry<IPredicate<E>, Collection<IBindingSet>> entry = itr + final Map.Entry<HashedPredicate<E>, Collection<IBindingSet>> entry = itr .next(); - tasks[i++] = new AccessPathTask(entry.getKey(), entry + tasks[i++] = new AccessPathTask(entry.getKey().pred, entry .getValue()); } @@ -1363,7 +1371,7 @@ final Object[] chunk = itr.nextChunk(); - stats.chunkCount.increment(); + stats.accessPathChunksIn.increment(); // process the chunk in the caller's thread. final boolean somethingAccepted = new ChunkTask( @@ -1460,7 +1468,7 @@ numElements += chunk.length; - stats.chunkCount.increment(); + stats.accessPathChunksIn.increment(); nchunks++; @@ -1493,7 +1501,7 @@ } } } - stats.elementCount.add(numElements); + stats.accessPathUnitsIn.add(numElements); } @@ -1746,7 +1754,7 @@ // naccepted for the current element (trace only). int naccepted = 0; - stats.elementCount.increment(); + stats.accessPathUnitsIn.increment(); for (IBindingSet bset : bindingSets) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties 2010-10-25 15:40:55 UTC (rev 3843) @@ -225,6 +225,21 @@ #log4j.appender.destPlain.layout.ConversionPattern= ## +# Summary query evaluation log (tab delimited file). +#log4j.logger.com.bigdata.bop.engine.QueryLog=INFO,queryLog +log4j.additivity.com.bigdata.bop.engine.QueryLog=false +log4j.appender.queryLog=org.apache.log4j.FileAppender +log4j.appender.queryLog.Threshold=ALL +log4j.appender.queryLog.File=queryLog.csv +log4j.appender.queryLog.Append=true +# I find that it is nicer to have this unbuffered since you can see what +# is going on and to make sure that I have complete rule evaluation logs +# on shutdown. +log4j.appender.queryLog.BufferedIO=false +log4j.appender.queryLog.layout=org.apache.log4j.PatternLayout +log4j.appender.queryLog.layout.ConversionPattern=%m + +## # BOp run state trace (tab delimited file). Uncomment the next line to enable. #log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.Map; +import java.util.concurrent.FutureTask; import junit.framework.TestCase2; @@ -662,4 +663,55 @@ } + /** + * Unit tests for extracting the left-deep evaluation order for the query + * pipeline. + * <p> + * - test when the 1st operator is a control operator. + * <p> + * - test when there is an embedded control operator (subquery). + * <p> + * Note: this is not testing with left/right branches in the query plan. + * That sort of plan is not currently supported by pipeline evaluation. + */ + public void test_getEvaluationOrder() { + + final BOp op2 = new MyPipelineOp(new BOp[]{},NV.asMap(// + new NV(BOp.Annotations.BOP_ID,1)// +// new NV(BOp.Annotations.CONTROLLER,false)// + )); + final BOp op1 = new MyPipelineOp(new BOp[]{op2},NV.asMap(// + new NV(BOp.Annotations.BOP_ID,2)// +// new NV(BOp.Annotations.CONTROLLER,false)// + )); + final BOp op3 = new MyPipelineOp(new BOp[]{op1},NV.asMap(// + new NV(BOp.Annotations.BOP_ID,3),// + new NV(BOp.Annotations.CONTROLLER,true)// + )); + + assertEquals(new Integer[]{1,2,3},BOpUtility.getEvaluationOrder(op3)); + + } + + private static class MyPipelineOp extends PipelineOp { + + private static final long serialVersionUID = 1L; + + /** Deep copy constructor. */ + protected MyPipelineOp(MyPipelineOp op) { + super(op); + } + + /** Shallow copy constructor. */ + protected MyPipelineOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + @Override + public FutureTask<Void> eval(BOpContext<IBindingSet> context) { + return null; + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -27,8 +27,12 @@ package com.bigdata.bop.engine; +import java.util.Map; +import java.util.UUID; + import org.apache.log4j.Logger; +import com.bigdata.bop.BOp; import com.bigdata.journal.IIndexManager; import com.bigdata.service.IBigdataFederation; @@ -87,4 +91,56 @@ throw new UnsupportedOperationException(); } + @Override + public Map<Integer, BOp> getBOpIndex() { + return null; + } + + @Override + public Map<Integer, BOpStats> getStats() { + return null; + } + + @Override + public long getDeadline() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getDoneTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getElapsed() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getStartTime() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Throwable getCause() { + // TODO Auto-generated method stub + return null; + } + + @Override + public BOp getQuery() { + // TODO Auto-generated method stub + return null; + } + + @Override + public UUID getQueryId() { + // TODO Auto-generated method stub + return null; + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -1866,7 +1866,6 @@ } /** ->>>>>>> .r3835 * Verify the expected solutions. * * @param expected Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java 2010-10-25 15:40:55 UTC (rev 3843) @@ -62,6 +62,7 @@ import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.ThickAsynchronousIterator; import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.Dechunkerator; /** * Unit tests for the {@link PipelineJoin} operator. @@ -231,8 +232,8 @@ // access path assertEquals(0L, stats.accessPathDups.get()); assertEquals(1L, stats.accessPathCount.get()); - assertEquals(1L, stats.chunkCount.get()); - assertEquals(2L, stats.elementCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(2L, stats.accessPathUnitsIn.get()); assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); @@ -241,6 +242,104 @@ } /** + * Unit test for a pipeline join in which we expect duplicate access paths to + * be eliminated. + * + * @throws ExecutionException + * @throws InterruptedException + */ + public void test_join_duplicateElimination() throws InterruptedException, ExecutionException { + + final int startId = 1; + final int joinId = 2; + final int predId = 3; + + final BOp startOp = new CopyOp(new BOp[] {}, NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + })); + + final Predicate<E> predOp = new Predicate<E>(new IVariableOrConstant[] { + new Constant<String>("Mary"), Var.var("x") }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId),// + new NV(Predicate.Annotations.TIMESTAMP, + ITx.READ_COMMITTED),// + })); + + final PipelineJoin<E> query = new PipelineJoin<E>( + new BOp[] { startOp },// + new NV(Predicate.Annotations.BOP_ID, joinId),// + new NV(PipelineJoin.Annotations.PREDICATE, predOp)); + + // the expected solutions (each solution appears twice since we feed two empty binding sets in). + final IBindingSet[] expected = new IBindingSet[] {// + new ArrayBindingSet(// + new IVariable[] { Var.var("x") },// + new IConstant[] { new Constant<String>("John") }// + ),// + new ArrayBindingSet(// + new IVariable[] { Var.var("x") },// + new IConstant[] { new Constant<String>("Paul") }// + ),// + new ArrayBindingSet(// + new IVariable[] { Var.var("x") },// + new IConstant[] { new Constant<String>("John") }// + ),// + new ArrayBindingSet(// + new IVariable[] { Var.var("x") },// + new IConstant[] { new Constant<String>("Paul") }// + ),// + }; + + final PipelineJoinStats stats = query.newStats(); + + // submit TWO (2) empty binding sets in ONE (1) chunk. + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { new IBindingSet[] { new HashBindingSet(), new HashBindingSet()} }); + + final IBlockingBuffer<IBindingSet[]> sink = new BlockingBufferWithStats<IBindingSet[]>(query, stats); + + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + new MockRunningQuery(null/* fed */, jnl/* indexManager */ + ), -1/* partitionId */, stats, + source, sink, null/* sink2 */); + + // get task. + final FutureTask<Void> ft = query.eval(context); + + // execute task. + jnl.getExecutorService().execute(ft); + + ft.get();// wait for completion (before showing stats), then look for errors. + + // show stats. + System.err.println("stats: "+stats); + + // verify solutions. + TestQueryEngine.assertSameSolutionsAnyOrder(expected, new Dechunkerator<IBindingSet>(sink.iterator())); + + // verify stats. + + // join task + assertEquals(1L, stats.chunksIn.get()); + assertEquals(2L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + // access path + assertEquals(1L, stats.accessPathDups.get()); + assertEquals(1L, stats.accessPathCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(2L, stats.accessPathUnitsIn.get()); + + assertTrue(ft.isDone()); + assertFalse(ft.isCancelled()); + ft.get(); // verify nothing thrown. + + } + + /** * Unit test for a join with an {@link IConstraint}. The constraint is used * to filter out one of the solutions where "Mary" is the present in the * first column of the relation. @@ -316,8 +415,8 @@ // access path assertEquals(0L, stats.accessPathDups.get()); assertEquals(1L, stats.accessPathCount.get()); - assertEquals(1L, stats.chunkCount.get()); - assertEquals(2L, stats.elementCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(2L, stats.accessPathUnitsIn.get()); assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); @@ -426,8 +525,8 @@ // access path assertEquals(0L, stats.accessPathDups.get()); assertEquals(1L, stats.accessPathCount.get()); - assertEquals(1L, stats.chunkCount.get()); - assertEquals(5L, stats.elementCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(5L, stats.accessPathUnitsIn.get()); assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); @@ -531,8 +630,8 @@ // access path assertEquals(0L, stats.accessPathDups.get()); assertEquals(2L, stats.accessPathCount.get()); - assertEquals(1L, stats.chunkCount.get()); - assertEquals(2L, stats.elementCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(2L, stats.accessPathUnitsIn.get()); assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); @@ -641,8 +740,8 @@ // access path assertEquals(0L, stats.accessPathDups.get()); assertEquals(2L, stats.accessPathCount.get()); - assertEquals(1L, stats.chunkCount.get()); - assertEquals(2L, stats.elementCount.get()); + assertEquals(1L, stats.accessPathChunksIn.get()); + assertEquals(2L, stats.accessPathUnitsIn.get()); assertTrue(ft.isDone()); assertFalse(ft.isCancelled()); Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties 2010-10-24 18:18:10 UTC (rev 3842) +++ branches/QUADS_QUERY_BRANCH/bigdata-perf/bsbm/src/resources/logging/log4j.properties 2010-10-25 15:40:55 UTC (rev 3843) @@ -222,3 +222,33 @@ log4j.appender.ruleLog.BufferedIO=false log4j.appender.ruleLog.layout=org.apache.log4j.PatternLayout log4j.appender.ruleLog.layout.ConversionPattern=%m + +## +# Summary query evaluation log (tab delimited file). +#log4j.logger.com.bigdata.bop.engine.QueryLog=INFO,queryLog +log4j.additivity.com.bigdata.bop.engine.QueryLog=false +log4j.appender.queryLog=org.apache.log4j.FileAppender +log4j.appender.queryLog.Threshold=ALL +log4j.appender.queryLog.File=queryLog.csv +log4j.appender.queryLog.Append=true +# I find that it is nicer to have this unbuffered since you can see what +# is going on and to make sure that I have complete rule evaluation logs +# on shutdown. +log4j.appender.queryLog.BufferedIO=false +log4j.appender.queryLog.layout=org.apache.log4j.PatternLayout +log4j.appender.queryLog.layout.ConversionPattern=%m + +## +# BOp run state trace (tab delimited file). Uncomment the next line to enable. +#log4j.logger.com.bigdata.bop.engine.RunState$TableLog=INFO,queryRunStateLog +log4j.additivity.com.bigdata.bop.engine.RunState$TableLog=false +log4j.appender.queryRunStateLog=org.apache.log4j.FileAppender +log4j.appender.queryRunStateLog.Threshold=ALL +log4j.appender.queryRunStateLog.File=queryRunState.log +log4j.appender.queryRunStateLog.Append=true +# I find that it is nicer to have this unbuffered since you can see what +# is going on and to make sure that I have complete rule evaluation logs +# on shutdown. +log4j.appender.queryRunStateLog.BufferedIO=false +log4j.appender.queryRunStateLog.layout=org.apache.log4j.PatternLayout +log4j.appender.queryRunStateLog.layout.ConversionPattern=%m Modified: branches/QUADS_QUERY_BRANCH/bigdata-perf/lubm/src/resources/logging... [truncated message content] |