From: <tho...@us...> - 2013-12-30 16:25:38
|
Revision: 7702 http://bigdata.svn.sourceforge.net/bigdata/?rev=7702&view=rev Author: thompsonbry Date: 2013-12-30 16:25:31 +0000 (Mon, 30 Dec 2013) Log Message: ----------- Modified ASTRangeCountOptimizer to obtain the range counts in parallel threads for the statement pattern nodes. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTRangeCountOptimizer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTRangeCountOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTRangeCountOptimizer.java 2013-12-30 16:02:18 UTC (rev 7701) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTRangeCountOptimizer.java 2013-12-30 16:25:31 UTC (rev 7702) @@ -24,8 +24,14 @@ package com.bigdata.rdf.sparql.ast.optimizers; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.log4j.Logger; + import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; @@ -38,8 +44,10 @@ import com.bigdata.rdf.sparql.ast.TermNode; import com.bigdata.rdf.sparql.ast.eval.AST2BOpContext; import com.bigdata.rdf.sparql.ast.optimizers.ASTStaticJoinOptimizer.Annotations; +import com.bigdata.rdf.spo.SPORelation; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.relation.accesspath.IAccessPath; +import com.bigdata.util.concurrent.ExecutionExceptions; /** * Attach range counts to all statement patterns in the query. @@ -50,11 +58,13 @@ public class ASTRangeCountOptimizer extends AbstractJoinGroupOptimizer implements IASTOptimizer { -// private static final transient Logger log = Logger.getLogger(ASTPropertyPathOptimizer.class); + private static final transient Logger log = Logger + .getLogger(ASTRangeCountOptimizer.class); /** * Optimize the join group. */ + @Override protected void optimizeJoinGroup(final AST2BOpContext ctx, final StaticAnalysis sa, final IBindingSet[] bSets, final JoinGroupNode group) { @@ -62,40 +72,110 @@ final List<StatementPatternNode> spNodes = group.getStatementPatterns(); if (!spNodes.isEmpty()) { - + // Always attach the range counts. - attachRangeCounts(ctx, spNodes, getExogenousBindings(bSets)); + attachRangeCounts(ctx, spNodes, getExogenousBindings(bSets)); } } /** - * Use the SPORelation from the database to grab the appropriate range - * counts for the {@link StatementPatternNode}s. Only tries to attach them - * if the annotation {@link Annotations#ESTIMATED_CARDINALITY} is not - * already attached to the node. This makes it possible to write unit - * tests without real data. + * Use the {@link SPORelation} from the database to grab the appropriate + * range counts for the {@link StatementPatternNode}s. Only tries to attach + * them if the annotation {@link Annotations#ESTIMATED_CARDINALITY} is not + * already attached to the node. This makes it possible to write unit tests + * without real data. */ private final void attachRangeCounts(final AST2BOpContext ctx, final List<StatementPatternNode> spNodes, final IBindingSet exogenousBindings) { final AbstractTripleStore db = ctx.getAbstractTripleStore(); - - for (StatementPatternNode sp : spNodes) { - - if (sp.getProperty(Annotations.ESTIMATED_CARDINALITY) == null) { - - estimateCardinality(sp, db, exogenousBindings); - } - - } - + // Setup tasks to obtain estimated range counts. + final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); + for (StatementPatternNode sp : spNodes) { + + if (sp.getProperty(Annotations.ESTIMATED_CARDINALITY) == null) { + + tasks.add(new RangeCountTask(sp, db, exogenousBindings)); + + } + + } + + // Obtain range counts in parallel. + final List<Future<Void>> futures; + try { + + futures = db.getExecutorService().invokeAll(tasks); + + } catch (InterruptedException e) { + // propagate interrupt. + Thread.currentThread().interrupt(); + return; + } + + // Check futures for errors. + final List<Throwable> causes = new LinkedList<Throwable>(); + for (Future<Void> f : futures) { + try { + f.get(); + } catch (InterruptedException e) { + log.error(e); + causes.add(e); + } catch (ExecutionException e) { + log.error(e); + causes.add(e); + } + } + + /* + * If there were any errors, then throw an exception listing them. + */ + if (!causes.isEmpty()) { + // Throw exception back to the leader. + if (causes.size() == 1) + throw new RuntimeException(causes.get(0)); + throw new RuntimeException("nerrors=" + causes.size(), + new ExecutionExceptions(causes)); + } + } /** + * Task unconditionally obtains the range count for the + * {@link StatementPatternNode}. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class RangeCountTask implements Callable<Void> { + + private final StatementPatternNode sp; + private final AbstractTripleStore db; + private final IBindingSet exogenousBindings; + + public RangeCountTask(final StatementPatternNode sp, + final AbstractTripleStore db, + final IBindingSet exogenousBindings) { + this.sp = sp; + this.db = db; + this.exogenousBindings = exogenousBindings; + } + + @Override + public Void call() throws Exception { + + estimateCardinality(sp, db, exogenousBindings); + + return null; + } + + } + + /** * For testing purposes we can override this method. * @param sp * @param db @@ -222,6 +302,4 @@ } - - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |