From: <tho...@us...> - 2014-01-07 18:34:13
|
Revision: 7745 http://bigdata.svn.sourceforge.net/bigdata/?rev=7745&view=rev Author: thompsonbry Date: 2014-01-07 18:34:06 +0000 (Tue, 07 Jan 2014) Log Message: ----------- Modified the QueryEngine to support a listener interface that allows unit tests to hook the IRunningQuery object in order to observe details about the manner in which the query was actually executed (BOpStats, IQueryAttributes), check for proper release of native memory, etc. More work on the RTO integration. See #64 (RTO) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataAndSPARQLTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataDrivenSPARQLTestCase.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-01-07 18:30:50 UTC (rev 7744) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-01-07 18:34:06 UTC (rev 7745) @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -74,6 +75,7 @@ import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; import com.bigdata.util.concurrent.IHaltable; @@ -1886,10 +1888,21 @@ */ protected void halt(final AbstractRunningQuery q) { + boolean interrupted = false; lock.lock(); try { + // notify listener(s) + try { + fireEvent(q); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + // Defer impact until outside of this critical section. + interrupted = true; + } + } + // insert/touch the LRU of recently finished queries. doneQueries.put(q.getQueryId(), q.getFuture()); @@ -1909,6 +1922,9 @@ } + if (interrupted) + Thread.currentThread().interrupt(); + } /** @@ -1923,7 +1939,8 @@ * @throws RuntimeException * if the query halted with an error. */ - private void handleDoneQuery(final UUID queryId,final Future<Void> doneQueryFuture) { + private void handleDoneQuery(final UUID queryId, + final Future<Void> doneQueryFuture) { try { // Check the Future. doneQueryFuture.get(); @@ -1945,6 +1962,90 @@ } } + /** + * Listener API for {@link IRunningQuery} life cycle events (start/halt). + * <p> + * Note: While this interface makes it possible to catch the start and halt + * of an {@link IRunningQuery}, it imposes an overhead on the query engine + * and the potential for significant latency and other problems depending on + * the behavior of the {@link IRunningQueryListener}. This interface was + * added to facilitate certain test suites which could not otherwise be + * written. It should not be used for protection code. + */ + public interface IRunningQueryListener { + + void notify(IRunningQuery q); + + } + + /** Registered listeners. */ + private final CopyOnWriteArraySet<IRunningQueryListener> listeners = new CopyOnWriteArraySet<IRunningQueryListener>(); + + /** Add a query listener. */ + public void addListener(final IRunningQueryListener l) { + + if (l == null) + throw new IllegalArgumentException(); + + listeners.add(l); + + } + + /** Remove a query listener. */ + public void removeListener(final IRunningQueryListener l) { + + if (l == null) + throw new IllegalArgumentException(); + + listeners.remove(l); + + } + + /** + * Send an event to all registered listeners. + */ + private void fireEvent(final IRunningQuery q) { + + if (q == null) + throw new IllegalArgumentException(); + + if(listeners.isEmpty()) { + + // NOP + return; + + } + + final IRunningQueryListener[] a = listeners + .toArray(new IRunningQueryListener[0]); + + for (IRunningQueryListener l : a) { + + final IRunningQueryListener listener = l; + + try { + + // send event. + listener.notify(q); + + } catch (Throwable t) { + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + + // Propagate interrupt. + throw new RuntimeException(t); + + } + + // Log and ignore. + log.error(t, t); + + } + + } + + } + /* * RunningQuery factory. */ Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java 2014-01-07 18:30:50 UTC (rev 7744) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/AST2BOpRTO.java 2014-01-07 18:34:06 UTC (rev 7745) @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -39,11 +40,14 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpIdFactory; +import com.bigdata.bop.BadBOpIdTypeException; +import com.bigdata.bop.DuplicateBOpIdException; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IPredicate; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; import com.bigdata.bop.NV; +import com.bigdata.bop.NoBOpIdException; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.SampleIndex.SampleType; @@ -97,6 +101,19 @@ */ public class AST2BOpRTO extends AST2BOpJoins { + public interface Annotations extends AST2BOpJoins.Annotations { + + /** + * Annotation is used to tag the {@link StatementPatternNode}s in a + * {@link JoinGroupNode} with the identified assigned to the + * corresponding {@link IPredicate}. This makes it possible to lookup + * the SPO from the {@link IPredicate} when the RTO hands us back an + * ordered join path. + */ + String PREDICATE_ID = "PredicateId"; + + } + /** * When <code>true</code>, the RTO will only accept simple joins into the * join graph. Simple joins includes triples-mode joins and filters that do @@ -222,7 +239,7 @@ // final Set<StatementPatternNode> sps = new LinkedHashSet<StatementPatternNode>(); // The predicates for the join graph. @SuppressWarnings("rawtypes") - final Set<Predicate> preds = new LinkedHashSet<Predicate>(); + final LinkedList<Predicate> preds = new LinkedList<Predicate>(); // The constraints for the join graph. final List<IConstraint> constraints = new LinkedList<IConstraint>(); // The #of JOINs accepted into the RTO's join group. @@ -296,7 +313,6 @@ // Something the RTO can handle. sp = (StatementPatternNode) sp.clone();// TODO Use destructive move. -// sp.setId(ctx.nextId()); // assign id so we can reference back later. rtoJoinGroup.addChild(sp); // add to group. naccepted++; /* @@ -310,17 +326,24 @@ * when we take the selected join path from the RTO and * compile it into a query plan to fully execute the join * group. + * + * Note: This assigns ids to the predicates as a + * side-effect. Those ids are assigned by the + * AST2BOpContext's BOpIdFactory. You can not rely on + * specific ID values being assigned, so you need to build a + * map and track the correspondence between the SPs and the + * predicates. */ - final Predicate<?> pred = AST2BOpUtility.toPredicate(sp, ctx); - // final int joinId = ctx.nextId(); - // - // // annotations for this join. - // final List<NV> anns = new LinkedList<NV>(); - // - // anns.add(new NV(BOp.Annotations.BOP_ID, joinId)); + final Predicate<?> pred = AST2BOpUtility.toPredicate(sp, + ctx); preds.add(pred); + // tag the SP with predicate's ID. + sp.setProperty(Annotations.PREDICATE_ID, pred.getId()); if (attachedConstraints != null) { - // RTO will figure out where to attach these constraints. + /* + * The RTO will figure out where to attach these + * constraints. + */ constraints.addAll(attachedConstraints); } @@ -492,8 +515,8 @@ final JoinGroupNode rtoJoinGroup = (JoinGroupNode) joinGraph .getRequiredProperty(JoinGraph.Annotations.JOIN_GROUP); -// // Build an index over the bopIds in that JoinGroupNode. -// final Map<Integer, BOp> index = getIndex(rtoJoinGroup); + // Build an index over the bopIds in that JoinGroupNode. + final Map<Integer, StatementPatternNode> index = getIndex(rtoJoinGroup); // Factory avoids reuse of bopIds assigned to the predicates. final BOpIdFactory idFactory = new BOpIdFactory(); @@ -530,34 +553,16 @@ final boolean optional = pred.isOptional(); - /* - * Lookup the AST node for that predicate. - * - * Note: The predicates are assigned bopIds by the RTO starting with - * ONE (1). Therefore we substract out ONE from the predicate's id - * to find its index into the join group. - * - * TODO This assumes that the join group does not contain anything - * other than the SPs for the predicates that we are using., - * - * TODO HINTS: The Predicate's query hints should the hints for that - * specific join (aks the SP or other type of IJoinNode), not the - * hints for the JoinGroupNode or the JoinGraph operator. We could - * just pass the AST nodes themselves from the JoinGroupNode. That - * might make things easier, even if it make the query serialization - * fatter on a cluster. - */ -// final ASTBase astNode = (ASTBase) index.get(pred.getId()); + // Lookup the AST node for that predicate. + final StatementPatternNode sp = index.get(pred.getId()); - final ASTBase astNode = (ASTBase) rtoJoinGroup.get(pred.getId() - 1); - left = join(left, // pred, // optional ? new LinkedHashSet<IVariable<?>>(doneSet) : doneSet, // attachedJoinConstraints == null ? null : Arrays .asList(attachedJoinConstraints),// - astNode.getQueryHints(),// + sp.getQueryHints(),// ctx); } @@ -616,61 +621,60 @@ } -// /** -// * Return an index from the {@link BOp.Annotations#BOP_ID} to the -// * {@link BOp}. -// * <p> -// * {@link BOp}s should form directed acyclic graphs, but this is not -// * strictly enforced. The recursive traversal iterators declared by this -// * class do not protect against loops in the operator tree. However, -// * {@link #getIndex(BOp)} detects and report loops based on duplicate -// * {@link Annotations#BOP_ID}s -or- duplicate {@link BOp} references. -// * -// * @param op -// * A {@link BOp}. -// * -// * @return The index, which is immutable and thread-safe. -// * -// * @throws DuplicateBOpIdException -// * if there are two or more {@link BOp}s having the same -// * {@link Annotations#BOP_ID}. -// * @throws BadBOpIdTypeException -// * if the {@link Annotations#BOP_ID} is not an {@link Integer}. -// * @throws NoBOpIdException -// * if a {@link PipelineOp} does not have a -// * {@link Annotations#BOP_ID}. -// */ -// static private Map<Integer,BOp> getIndex(final JoinGroupNode op) { -// if(op == null) -// throw new IllegalArgumentException(); -// final LinkedHashMap<Integer, BOp> map = new LinkedHashMap<Integer, BOp>(); -// final Iterator<BOp> itr = op.argIterator(); -// while (itr.hasNext()) { -// final BOp t = itr.next(); -//// if(!(t instanceof PipelineOp)) -//// throw new NotPipelineOpException(t.toString()); -// final Object x = t.getProperty(BOp.Annotations.BOP_ID); -// if (x == null) { -// throw new NoBOpIdException(t.toString()); -// } -// if (!(x instanceof Integer)) { -// throw new BadBOpIdTypeException("Must be Integer, not: " -// + x.getClass() + ": " + BOp.Annotations.BOP_ID); -// } -// final Integer id = (Integer) t.getProperty(BOp.Annotations.BOP_ID); -// final BOp conflict = map.put(id, t); -// if (conflict != null) { -// /* -// * BOp appears more than once. This is not allowed for -// * pipeline operators. If you are getting this exception for -// * a non-pipeline operator, you should remove the bopId. -// */ -// throw new DuplicateBOpIdException("duplicate id=" + id -// + " for " + conflict + " and " + t); -// } -// } -// // wrap to ensure immutable and thread-safe. -// return Collections.unmodifiableMap(map); -// } + /** + * Return a map from the {@link Annotations#PREDICATE_ID} to the + * corresponding {@link StatementPatternNode}. + * + * @param op + * The join group. + * + * @return The index, which is immutable and thread-safe. + * + * @throws DuplicateBOpIdException + * if there are two or more {@link BOp}s having the same + * {@link Annotations#PREDICATE_ID}. + * @throws BadBOpIdTypeException + * if the {@link Annotations#PREDICATE_ID} is not an + * {@link Integer}. + * @throws NoBOpIdException + * if a {@link StatementPatternNode} does not have a + * {@link Annotations#PREDICATE_ID}. + */ + static private Map<Integer, StatementPatternNode> getIndex( + final JoinGroupNode op) { + if (op == null) + throw new IllegalArgumentException(); + final LinkedHashMap<Integer, StatementPatternNode> map = new LinkedHashMap<Integer, StatementPatternNode>(); + final Iterator<IGroupMemberNode> itr = op.iterator(); + while (itr.hasNext()) { + final BOp t = itr.next(); + if(!(t instanceof StatementPatternNode)) { + // Skip non-SP nodes. + continue; + } + final StatementPatternNode sp = (StatementPatternNode) t; + final Object x = t.getProperty(Annotations.PREDICATE_ID); + if (x == null) { + throw new NoBOpIdException(t.toString()); + } + if (!(x instanceof Integer)) { + throw new BadBOpIdTypeException("Must be Integer, not: " + + x.getClass() + ": " + Annotations.PREDICATE_ID); + } + final Integer id = (Integer) x; + final BOp conflict = map.put(id, sp); + if (conflict != null) { + /* + * BOp appears more than once. This is not allowed for + * pipeline operators. If you are getting this exception for + * a non-pipeline operator, you should remove the bopId. + */ + throw new DuplicateBOpIdException("duplicate id=" + id + + " for " + conflict + " and " + t); + } + } + // wrap to ensure immutable and thread-safe. + return Collections.unmodifiableMap(map); + } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataAndSPARQLTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataAndSPARQLTestCase.java 2014-01-07 18:30:50 UTC (rev 7744) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataAndSPARQLTestCase.java 2014-01-07 18:34:06 UTC (rev 7745) @@ -83,16 +83,26 @@ public abstract class AbstractDataAndSPARQLTestCase extends AbstractASTEvaluationTestCase { + public AbstractDataAndSPARQLTestCase() { + } + + public AbstractDataAndSPARQLTestCase(final String name) { + super(name); + } + public class AbsHelper { protected final String queryStr; + /** * This is the astContainer of the last query executed. */ protected ASTContainer astContainer; - public AbsHelper(String queryStr) { - this.queryStr = queryStr; + public AbsHelper(final String queryStr) { + + this.queryStr = queryStr; + } protected AbstractTripleStore getTripleStore() { @@ -101,17 +111,34 @@ } - protected void compareTupleQueryResults(final TupleQueryResult queryResult, final TupleQueryResult expectedResult, final boolean checkOrder) - throws QueryEvaluationException { - AbstractQueryEngineTestCase.compareTupleQueryResults(getName(), - "", store, astContainer, queryResult, expectedResult, - false, checkOrder); - } + protected void compareTupleQueryResults( + final TupleQueryResult queryResult, + final TupleQueryResult expectedResult, final boolean checkOrder) + throws QueryEvaluationException { + AbstractQueryEngineTestCase.compareTupleQueryResults(getName(), "", + store, astContainer, queryResult, expectedResult, false, + checkOrder); + + } - long loadData(final InputStream is, RDFFormat format, String uri) { - final RDFParser rdfParser = RDFParserRegistry.getInstance().get(format).getParser(); + /** + * Load data from an input stream. + * + * @param is + * The stream (required). + * @param format + * The format (required). + * @param uri + * The baseURL (required). + * @return The #of triples read from the stream. + */ + long loadData(final InputStream is, final RDFFormat format, + final String uri) { + final RDFParser rdfParser = RDFParserRegistry.getInstance() + .get(format).getParser(); + rdfParser.setValueFactory(store.getValueFactory()); rdfParser.setVerifyData(true); @@ -122,7 +149,12 @@ final AddStatementHandler handler = new AddStatementHandler(); - handler.setContext(new URIImpl(uri)); + if (getTripleStore().isQuads()) { + + // Set the default context. + handler.setContext(new URIImpl(uri)); + + } rdfParser.setRDFHandler(handler); @@ -170,7 +202,7 @@ public AddStatementHandler() { - buffer = new StatementBuffer<Statement>(store, 100/* capacity */); + buffer = new StatementBuffer<Statement>(store, 1000/* capacity */); } @@ -180,6 +212,7 @@ } + @Override public void handleStatement(final Statement stmt) throws RDFHandlerException { @@ -214,11 +247,4 @@ } - public AbstractDataAndSPARQLTestCase() { - } - - public AbstractDataAndSPARQLTestCase(String name) { - super(name); - } - } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataDrivenSPARQLTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataDrivenSPARQLTestCase.java 2014-01-07 18:30:50 UTC (rev 7744) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/AbstractDataDrivenSPARQLTestCase.java 2014-01-07 18:34:06 UTC (rev 7745) @@ -71,13 +71,11 @@ import java.net.URL; import java.util.LinkedHashSet; import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipInputStream; import org.apache.log4j.Logger; -import org.openrdf.model.Resource; import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.impl.URIImpl; import org.openrdf.query.GraphQueryResult; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.TupleQueryResult; @@ -90,23 +88,15 @@ import org.openrdf.query.resultio.TupleQueryResultFormat; import org.openrdf.query.resultio.TupleQueryResultParser; import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandlerException; import org.openrdf.rio.RDFParser; import org.openrdf.rio.RDFParser.DatatypeHandling; -import org.openrdf.rio.RDFParserFactory; -import org.openrdf.rio.RDFParserRegistry; import org.openrdf.rio.Rio; -import org.openrdf.rio.helpers.RDFHandlerBase; import org.openrdf.rio.helpers.StatementCollector; import com.bigdata.bop.engine.AbstractQueryEngineTestCase; -import com.bigdata.rdf.model.StatementEnum; -import com.bigdata.rdf.rio.StatementBuffer; import com.bigdata.rdf.sail.sparql.Bigdata2ASTSPARQLParser; import com.bigdata.rdf.sparql.ast.ASTContainer; -import com.bigdata.rdf.sparql.ast.AbstractASTEvaluationTestCase; import com.bigdata.rdf.sparql.ast.QueryRoot; -import com.bigdata.rdf.sparql.ast.eval.AbstractDataAndSPARQLTestCase.AbsHelper; import com.bigdata.rdf.store.AbstractTripleStore; /** @@ -231,8 +221,9 @@ final String[] dataFileURLs, final String resultFileURL, final boolean checkOrder) throws Exception { - super(getResourceAsString(queryFileURL)); + super(getResourceAsString(queryFileURL)); + if (log.isInfoEnabled()) log.info("\ntestURI:\n" + testURI); @@ -527,12 +518,64 @@ */ protected long loadData(final String resource) { - return loadData(getResourceAsStream(resource), RDFFormat.forFileName(resource), new File(resource).toURI().toString()); + if (log.isInfoEnabled()) + log.info("Loading " + resource); + final String baseURL = new File(resource).toURI().toString(); + + InputStream is = null; + try { + + is = getResourceAsStream(resource); + + final RDFFormat rdfFormat = RDFFormat.forFileName(resource); + + if (rdfFormat == null) + throw new RuntimeException("Unknown format: resource=" + + resource); + + // final RDFFormat rdfFormat = guessFormat(new File(resource), + // null/* default */); + + return loadData(is, rdfFormat, baseURL); + + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + log.error("Could not close: resource=" + resource, e); + } + is = null; + } + } + + } } +// private static RDFFormat guessFormat(final File file, +// final RDFFormat defaultFormat) { +// +// final String n = file.getName(); +// +// RDFFormat fmt = RDFFormat.forFileName(n); +// +// if (fmt == null && n.endsWith(".zip")) { +// fmt = RDFFormat.forFileName(n.substring(0, n.length() - 4)); +// } +// +// if (fmt == null && n.endsWith(".gz")) { +// fmt = RDFFormat.forFileName(n.substring(0, n.length() - 3)); +// } +// +// if (fmt == null) // fallback +// fmt = defaultFormat; +// +// return fmt; +// +// } private static InputStream getResourceAsStream(final String resource) { @@ -595,6 +638,20 @@ if (is == null) throw new RuntimeException("Not found: " + resource); + if (resource.toLowerCase().endsWith(".gz")) { + + try { + is = new GZIPInputStream(is); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } else if (resource.toLowerCase().endsWith(".zip")) { + + is = new ZipInputStream(is); + + } + return is; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |