From: <tho...@us...> - 2011-01-16 18:59:49
|
Revision: 4110 http://bigdata.svn.sourceforge.net/bigdata/?rev=4110&view=rev Author: thompsonbry Date: 2011-01-16 18:59:43 +0000 (Sun, 16 Jan 2011) Log Message: ----------- More work on [1]. [1] https://sourceforge.net/apps/trac/bigdata/ticket/230 Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/UnsupportedOperatorException.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -787,6 +788,20 @@ super.run(); + } catch(Throwable t) { + + // ensure query halts. + halt(t); + + if (getCause() != null) { + + // abnormal termination. wrap and rethrow. + throw new RuntimeException(t); + + } + + // otherwise ignore exception (normal termination). + } finally { /* @@ -859,11 +874,11 @@ final long begin = System.currentTimeMillis(); try { t.call(); - } catch(Throwable t) { - halt(t); + } catch(Throwable t2) { + halt(t2); // ensure query halts. if (getCause() != null) { - // Abnormal termination. - throw new RuntimeException(getCause()); + // Abnormal termination - wrap and rethrow. + throw new RuntimeException(t2); } // normal termination - swallow the exception. } finally { @@ -1251,8 +1266,17 @@ public Void call() throws Exception { if (log.isDebugEnabled()) log.debug("Running chunk: " + this); - ft.run(); // run - ft.get(); // verify success + try { + ft.run(); // run + ft.get(); // verify success + } catch (Throwable t) { + halt(t); // ensure query halts. + if (getCause() != null) { + // abnormal termination - wrap and rethrow. + throw new Exception(t); + } + // otherwise ignore exception (normal completion). + } // Done. return null; } // call() Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -444,8 +444,8 @@ } catch(Throwable t) { halt(t); if (getCause() != null) { - // Abnormal termination. - throw getCause(); + // Abnormal termination - wrap and rethrow. + throw new RuntimeException(t); } // normal termination - swallow the exception. } finally { @@ -466,10 +466,6 @@ } catch (Throwable ex1) { - // Log an error. - log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId - + ", bop=" + t.bop, ex1); - /* * Mark the query as halted on this node regardless of whether * we are able to communicate with the query controller. @@ -481,7 +477,15 @@ // ensure halted. halt(ex1); + + if (getCause() != null) { + // Log an error. + log.error("queryId=" + getQueryId() + ", bopId=" + t.bopId + + ", bop=" + t.bop, ex1); + + } + final HaltOpMessage msg = new HaltOpMessage(getQueryId(), t.bopId, -1/*partitionId*/, serviceId, getCause()/*firstCauseIfError*/, t.sinkId, 0/*t.sinkMessagesOut.get()*/, t.altSinkId, Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -385,6 +385,15 @@ * @see #getCause() */ protected boolean isNormalTerminationCause(final Throwable cause) { + if(isTerminationByInterrupt(cause)) + return true; + if (InnerCause.isInnerCause(cause, RejectedExecutionException.class)) + return true; + return false; + } + + static public boolean isTerminationByInterrupt(final Throwable cause) { + if (InnerCause.isInnerCause(cause, InterruptedException.class)) return true; if (InnerCause.isInnerCause(cause, CancellationException.class)) @@ -393,11 +402,11 @@ return true; if (InnerCause.isInnerCause(cause, BufferClosedException.class)) return true; - if (InnerCause.isInnerCause(cause, RejectedExecutionException.class)) - return true; + return false; + } - + /** * This logs all unexpected causes @ WARN (anything not reported as normal * termination by {@link #isNormalTerminationCause(Throwable)}), not just Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -118,6 +118,7 @@ import com.bigdata.striterator.Dechunkerator; import com.bigdata.striterator.DistinctFilter; import com.bigdata.striterator.IChunkedOrderedIterator; +import com.bigdata.util.concurrent.Haltable; /** * Extended to rewrite Sesame {@link TupleExpr}s onto native {@link Rule}s and @@ -393,7 +394,7 @@ } else { // allow the query to fail - throw ex; + throw new UnsupportedOperatorException(ex); } @@ -450,7 +451,7 @@ } else { // allow the query to fail - throw ex; + throw new UnsupportedOperatorException(ex); } @@ -507,7 +508,7 @@ } else { // allow the query to fail - throw ex; + throw new UnsupportedOperatorException(ex); } @@ -525,6 +526,12 @@ log.info("unrecognized value in query: " + ex.getValue()); } return new EmptyIteration<BindingSet, QueryEvaluationException>(); + } catch(UnsupportedOperatorException ex) { + /* + * Note: Do not wrap as a different exception type. The caller is + * looking for this. + */ + throw new UnsupportedOperatorException(ex); } catch (Throwable ex) { // log.error("Remove log stmt:"+ex,ex);// FIXME remove this - I am just looking for the root cause of something in the SAIL. throw new QueryEvaluationException(ex); @@ -577,7 +584,7 @@ if (sop.isRightSideLeftJoin()) { groupsToPrune.add(sopTree.getGroup(sop.getGroup())); } else { - throw ex; + throw new UnrecognizedValueException(ex); } } } @@ -652,8 +659,12 @@ if (sop.getGroup() == SOpTreeBuilder.ROOT_GROUP_ID) { sopsToPrune.add(sop); sesameFilters.add(filter); - } else { - throw ex; + } else { + /* + * Note: DO NOT wrap with a different exception type - + * the caller is looking for this. + */ + throw new UnsupportedOperatorException(ex); } } } @@ -699,14 +710,43 @@ } - return _evaluateNatively(query, bs, queryEngine, sesameFilters); - + /* + * Begin native bigdata evaluation. + */ + CloseableIteration<BindingSet, QueryEvaluationException> result = _evaluateNatively( + query, bs, queryEngine);// , sesameFilters); + + /* + * Use the basic filter iterator for any remaining filters which will be + * evaluated by Sesame. + * + * Note: Some Sesame filters may pre-fetch one or more result(s). This + * could potentially cause the IRunningQuery to be asynchronously + * terminated by an interrupt. I have lifted the code to wrap the Sesame + * filters around the bigdata evaluation out of the code which starts + * the IRunningQuery evaluation in order to help clarify such + * circumstances as they might relate to [1]. + * + * [1] https://sourceforge.net/apps/trac/bigdata/ticket/230 + */ + if (sesameFilters != null) { + for (Filter f : sesameFilters) { + if (log.isDebugEnabled()) { + log.debug("attaching sesame filter: " + f); + } + result = new FilterIterator(f, result, this); + } + } + + return result; + } - protected CloseableIteration<BindingSet, QueryEvaluationException> + private CloseableIteration<BindingSet, QueryEvaluationException> _evaluateNatively(final PipelineOp query, final BindingSet bs, - final QueryEngine queryEngine, - final Collection<Filter> sesameFilters) + final QueryEngine queryEngine +// , final Collection<Filter> sesameFilters + ) throws QueryEvaluationException { IRunningQuery runningQuery = null; @@ -717,10 +757,20 @@ /* * Wrap up the native bigdata query solution iterator as Sesame - * compatible iteration w/ any filters to be interpreted by Sesame. + * compatible iteration with materialized RDF Values. */ - return wrapQuery(runningQuery, sesameFilters); + return wrapQuery(runningQuery);//, sesameFilters); + } catch (UnsupportedOperatorException t) { + if (runningQuery != null) { + // ensure query is halted. + runningQuery.cancel(true/* mayInterruptIfRunning */); + } + /* + * Note: Do not wrap as a different exception type. The caller is + * looking for this. + */ + throw new UnsupportedOperatorException(t); } catch (Throwable t) { if (runningQuery != null) { // ensure query is halted. @@ -734,20 +784,19 @@ /** * Wrap the {@link IRunningQuery#iterator()}, returning a Sesame compatible - * iteration which will visit the materialized binding sets. + * iteration which will visit Sesame binding sets having materialized RDF + * Values. * * @param runningQuery * The query. - * @param sesameFilters - * Any filters to be applied by Sesame. - * + * * @return The iterator. * - * @throws QueryEvaluationException + * @throws QueryEvaluationException */ private CloseableIteration<BindingSet, QueryEvaluationException> wrapQuery( - final IRunningQuery runningQuery, - final Collection<Filter> sesameFilters) throws QueryEvaluationException { + final IRunningQuery runningQuery + ) throws QueryEvaluationException { // The iterator draining the query solutions. final IAsynchronousIterator<IBindingSet[]> it1 = runningQuery @@ -759,7 +808,7 @@ new Dechunkerator<IBindingSet>(it1)); // Materialize IVs as RDF Values. - CloseableIteration<BindingSet, QueryEvaluationException> result = + final CloseableIteration<BindingSet, QueryEvaluationException> result = // Monitor IRunningQuery and cancel if Sesame iterator is closed. new RunningQueryCloseableIteration<BindingSet, QueryEvaluationException>(runningQuery, // Convert bigdata binding sets to Sesame binding sets. @@ -768,16 +817,6 @@ new BigdataBindingSetResolverator(database, it2).start( database.getExecutorService()))); - // use the basic filter iterator for remaining filters - if (sesameFilters != null) { - for (Filter f : sesameFilters) { - if (log.isDebugEnabled()) { - log.debug("attaching sesame filter: " + f); - } - result = new FilterIterator(f, result, this); - } - } - return result; } @@ -2286,6 +2325,18 @@ private Value value; + /** + * Wrap another instance of this exception class. + * @param cause + */ + public UnrecognizedValueException(final UnrecognizedValueException cause) { + + super(cause); + + this.value = cause.value; + + } + public UnrecognizedValueException(final Value value) { this.value = value; } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/RunningQueryCloseableIteration.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -74,8 +74,8 @@ * Exception thrown by the runningQuery. */ if (runningQuery.getCause() != null) { - // abnormal termination. - throw (X) new QueryEvaluationException(runningQuery.getCause()); + // abnormal termination - wrap and rethrow. + throw (X) new QueryEvaluationException(e); } // otherwise this is normal termination. } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/UnsupportedOperatorException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/UnsupportedOperatorException.java 2011-01-16 16:58:40 UTC (rev 4109) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/UnsupportedOperatorException.java 2011-01-16 18:59:43 UTC (rev 4110) @@ -2,6 +2,13 @@ import org.openrdf.query.algebra.QueryModelNode; +/** + * An exception thrown when an operator can not be translated into native + * bigdata evaluation. This is used to detect such problems and then optionally + * delegate the operator to openrdf. + * + * @author mrpersonick + */ public class UnsupportedOperatorException extends RuntimeException { /** @@ -11,6 +18,16 @@ private QueryModelNode operator; + /** + * Wrap with another instance of this class. + * + * @param cause + */ + public UnsupportedOperatorException(final UnsupportedOperatorException cause) { + super(cause); + this.operator = cause.operator; + } + public UnsupportedOperatorException(final QueryModelNode operator) { this.operator = operator; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |