From: <tho...@us...> - 2014-04-22 16:22:20
|
Revision: 8137 http://sourceforge.net/p/bigdata/code/8137 Author: thompsonbry Date: 2014-04-22 16:22:14 +0000 (Tue, 22 Apr 2014) Log Message: ----------- Added test for interrupted() for every 20 solutions processed by the ConditionalRoutingOp. Added test for interrupted() for each RDF Value tested by the RegexBOp. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/RegexBOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-04-21 23:18:59 UTC (rev 8136) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-04-22 16:22:14 UTC (rev 8137) @@ -1,243 +1,251 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 25, 2010 - */ - -package com.bigdata.bop.bset; - -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstraint; -import com.bigdata.bop.NV; -import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -import cutthecrap.utils.striterators.ICloseableIterator; - -/** - * An operator for conditional routing of binding sets in a pipeline. The - * operator will copy binding sets either to the default sink (if a condition is - * satisfied) and otherwise to the alternate sink (iff one is specified). If a - * solution fails the constraint and the alternate sink is not specified, then - * the solution is dropped. - * <p> - * Conditional routing can be useful where a different data flow is required - * based on the type of an object (for example a term identifier versus an - * inline term in the RDF database) or where there is a need to jump around a - * join group based on some condition. - * <p> - * Conditional routing will cause reordering of solutions when the alternate - * sink is specified as some solutions will flow to the primary sink while - * others flow to the alterate sink. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id: ConditionalRoutingOp.java 7773 2014-01-11 12:49:05Z thompsonbry - * $ - */ -public class ConditionalRoutingOp extends PipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends PipelineOp.Annotations { - - /** - * An {@link IConstraint} which specifies the condition. When the - * condition is satisfied the binding set is routed to the default sink. - * When the condition is not satisfied, the binding set is routed to the - * alternative sink. - */ - String CONDITION = ConditionalRoutingOp.class.getName() + ".condition"; - - } - - /** - * Deep copy constructor. - * - * @param op - */ - public ConditionalRoutingOp(final ConditionalRoutingOp op) { - - super(op); - - } - - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public ConditionalRoutingOp(final BOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - } - - public ConditionalRoutingOp(final BOp[] args, final NV... anns) { - - this(args, NV.asMap(anns)); - - } - - /** - * @see Annotations#CONDITION - */ - public IConstraint getCondition() { - - return (IConstraint) getProperty(Annotations.CONDITION); - - } - - @Override - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new ConditionalRouteTask(this, context)); - - } - - /** - * Copy the source to the sink or the alternative sink depending on the - * condition. - */ - static private class ConditionalRouteTask implements Callable<Void> { - - private final BOpStats stats; - - private final IConstraint condition; - - private final ICloseableIterator<IBindingSet[]> source; - - private final IBlockingBuffer<IBindingSet[]> sink; - - private final IBlockingBuffer<IBindingSet[]> sink2; - - ConditionalRouteTask(final ConditionalRoutingOp op, - final BOpContext<IBindingSet> context) { - - this.stats = context.getStats(); - - this.condition = op.getCondition(); - - if (condition == null) - throw new IllegalArgumentException(); - - this.source = context.getSource(); - - this.sink = context.getSink(); - - this.sink2 = context.getSink2(); // MAY be null. - -// if (sink2 == null) -// throw new IllegalArgumentException(); - - if (sink == sink2) - throw new IllegalArgumentException(); - - } - - @Override - public Void call() throws Exception { - try { - while (source.hasNext()) { - - final IBindingSet[] chunk = source.next(); - - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); - - final IBindingSet[] def = new IBindingSet[chunk.length]; - final IBindingSet[] alt = sink2 == null ? null - : new IBindingSet[chunk.length]; - - int ndef = 0, nalt = 0; - - for (int i = 0; i < chunk.length; i++) { - - final IBindingSet bset = chunk[i].clone(); - - if (condition.accept(bset)) { - - // solution passes condition. default sink. - def[ndef++] = bset; - - } else if (sink2 != null) { - - // solution fails condition. alternative sink. - alt[nalt++] = bset; - - } - - } - - if (ndef > 0) { - if (ndef == def.length) - sink.add(def); - else - sink.add(Arrays.copyOf(def, ndef)); -// stats.chunksOut.increment(); -// stats.unitsOut.add(ndef); - } - - if (nalt > 0 && sink2 != null) { - if (nalt == alt.length) - sink2.add(alt); - else - sink2.add(Arrays.copyOf(alt, nalt)); -// stats.chunksOut.increment(); -// stats.unitsOut.add(nalt); - } - - } - - sink.flush(); - if (sink2 != null) - sink2.flush(); - - return null; - - } finally { - source.close(); - sink.close(); - if (sink2 != null) - sink2.close(); - - } - - } // call() - - } // ConditionalRoutingTask. - -} +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 25, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +import cutthecrap.utils.striterators.ICloseableIterator; + +/** + * An operator for conditional routing of binding sets in a pipeline. The + * operator will copy binding sets either to the default sink (if a condition is + * satisfied) and otherwise to the alternate sink (iff one is specified). If a + * solution fails the constraint and the alternate sink is not specified, then + * the solution is dropped. + * <p> + * Conditional routing can be useful where a different data flow is required + * based on the type of an object (for example a term identifier versus an + * inline term in the RDF database) or where there is a need to jump around a + * join group based on some condition. + * <p> + * Conditional routing will cause reordering of solutions when the alternate + * sink is specified as some solutions will flow to the primary sink while + * others flow to the alterate sink. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: ConditionalRoutingOp.java 7773 2014-01-11 12:49:05Z thompsonbry + * $ + */ +public class ConditionalRoutingOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * An {@link IConstraint} which specifies the condition. When the + * condition is satisfied the binding set is routed to the default sink. + * When the condition is not satisfied, the binding set is routed to the + * alternative sink. + */ + String CONDITION = ConditionalRoutingOp.class.getName() + ".condition"; + + } + + /** + * Deep copy constructor. + * + * @param op + */ + public ConditionalRoutingOp(final ConditionalRoutingOp op) { + + super(op); + + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public ConditionalRoutingOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + } + + public ConditionalRoutingOp(final BOp[] args, final NV... anns) { + + this(args, NV.asMap(anns)); + + } + + /** + * @see Annotations#CONDITION + */ + public IConstraint getCondition() { + + return (IConstraint) getProperty(Annotations.CONDITION); + + } + + @Override + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ConditionalRouteTask(this, context)); + + } + + /** + * Copy the source to the sink or the alternative sink depending on the + * condition. + */ + static private class ConditionalRouteTask implements Callable<Void> { + + private final BOpStats stats; + + private final IConstraint condition; + + private final ICloseableIterator<IBindingSet[]> source; + + private final IBlockingBuffer<IBindingSet[]> sink; + + private final IBlockingBuffer<IBindingSet[]> sink2; + + ConditionalRouteTask(final ConditionalRoutingOp op, + final BOpContext<IBindingSet> context) { + + this.stats = context.getStats(); + + this.condition = op.getCondition(); + + if (condition == null) + throw new IllegalArgumentException(); + + this.source = context.getSource(); + + this.sink = context.getSink(); + + this.sink2 = context.getSink2(); // MAY be null. + +// if (sink2 == null) +// throw new IllegalArgumentException(); + + if (sink == sink2) + throw new IllegalArgumentException(); + + } + + @Override + public Void call() throws Exception { + try { + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(chunk.length); + + final IBindingSet[] def = new IBindingSet[chunk.length]; + final IBindingSet[] alt = sink2 == null ? null + : new IBindingSet[chunk.length]; + + int ndef = 0, nalt = 0; + + for (int i = 0; i < chunk.length; i++) { + + if (i % 20 == 0 && Thread.interrupted()) { + + // Eagerly notice if the operator is interrupted. + throw new RuntimeException( + new InterruptedException()); + + } + + final IBindingSet bset = chunk[i].clone(); + + if (condition.accept(bset)) { + + // solution passes condition. default sink. + def[ndef++] = bset; + + } else if (sink2 != null) { + + // solution fails condition. alternative sink. + alt[nalt++] = bset; + + } + + } + + if (ndef > 0) { + if (ndef == def.length) + sink.add(def); + else + sink.add(Arrays.copyOf(def, ndef)); +// stats.chunksOut.increment(); +// stats.unitsOut.add(ndef); + } + + if (nalt > 0 && sink2 != null) { + if (nalt == alt.length) + sink2.add(alt); + else + sink2.add(Arrays.copyOf(alt, nalt)); +// stats.chunksOut.increment(); +// stats.unitsOut.add(nalt); + } + + } + + sink.flush(); + if (sink2 != null) + sink2.flush(); + + return null; + + } finally { + source.close(); + sink.close(); + if (sink2 != null) + sink2.close(); + + } + + } // call() + + } // ConditionalRoutingTask. + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/RegexBOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/RegexBOp.java 2014-04-21 23:18:59 UTC (rev 8136) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/RegexBOp.java 2014-04-22 16:22:14 UTC (rev 8137) @@ -44,17 +44,17 @@ * SPARQL REGEX operator. */ public class RegexBOp extends XSDBooleanIVValueExpression - implements INeedsMaterialization { + implements INeedsMaterialization { /** - * - */ - private static final long serialVersionUID = 1357420268214930143L; - - private static final transient Logger log = Logger.getLogger(RegexBOp.class); + * + */ + private static final long serialVersionUID = 1357420268214930143L; + + private static final transient Logger log = Logger.getLogger(RegexBOp.class); public interface Annotations extends XSDBooleanIVValueExpression.Annotations { - + /** * The cached regex pattern. */ @@ -64,65 +64,65 @@ } private static Map<String,Object> anns( - final IValueExpression<? extends IV> pattern, - final IValueExpression<? extends IV> flags) { - - try { - - if (pattern instanceof IConstant && - (flags == null || flags instanceof IConstant)) { - - final IV parg = ((IConstant<IV>) pattern).get(); - - final IV farg = flags != null ? - ((IConstant<IV>) flags).get() : null; - - if (parg.hasValue() && (farg == null || farg.hasValue())) { - - final Value pargVal = parg.getValue(); - - final Value fargVal = farg != null ? farg.getValue() : null; - - return NV.asMap( - new NV(Annotations.PATTERN, - getPattern(pargVal, fargVal))); - - } - - } - - } catch (Exception ex) { - - if (log.isInfoEnabled()) { - log.info("could not create pattern for: " + pattern + ", " + flags); - } - - } - - return BOp.NOANNS; - + final IValueExpression<? extends IV> pattern, + final IValueExpression<? extends IV> flags) { + + try { + + if (pattern instanceof IConstant && + (flags == null || flags instanceof IConstant)) { + + final IV parg = ((IConstant<IV>) pattern).get(); + + final IV farg = flags != null ? + ((IConstant<IV>) flags).get() : null; + + if (parg.hasValue() && (farg == null || farg.hasValue())) { + + final Value pargVal = parg.getValue(); + + final Value fargVal = farg != null ? farg.getValue() : null; + + return NV.asMap( + new NV(Annotations.PATTERN, + getPattern(pargVal, fargVal))); + + } + + } + + } catch (Exception ex) { + + if (log.isInfoEnabled()) { + log.info("could not create pattern for: " + pattern + ", " + flags); + } + + } + + return BOp.NOANNS; + } - /** - * Construct a regex bop without flags. - */ + /** + * Construct a regex bop without flags. + */ @SuppressWarnings("rawtypes") - public RegexBOp( - final IValueExpression<? extends IV> var, - final IValueExpression<? extends IV> pattern) { + public RegexBOp( + final IValueExpression<? extends IV> var, + final IValueExpression<? extends IV> pattern) { this(new BOp[] { var, pattern }, anns(pattern, null)); } - /** - * Construct a regex bop with flags. - */ - @SuppressWarnings("rawtypes") + /** + * Construct a regex bop with flags. + */ + @SuppressWarnings("rawtypes") public RegexBOp( - final IValueExpression<? extends IV> var, - final IValueExpression<? extends IV> pattern, - final IValueExpression<? extends IV> flags) { + final IValueExpression<? extends IV> var, + final IValueExpression<? extends IV> pattern, + final IValueExpression<? extends IV> flags) { this(new BOp[] { var, pattern, flags }, anns(pattern, flags)); @@ -133,8 +133,8 @@ */ public RegexBOp(final BOp[] args, final Map<String, Object> anns) { - super(args, anns); - + super(args, anns); + if (args.length < 2 || args[0] == null || args[1] == null) throw new IllegalArgumentException(); @@ -146,33 +146,34 @@ public RegexBOp(final RegexBOp op) { super(op); } - + + @Override public Requirement getRequirement() { - - return INeedsMaterialization.Requirement.SOMETIMES; - + + return INeedsMaterialization.Requirement.SOMETIMES; + } - + + @Override public boolean accept(final IBindingSet bs) { - - @SuppressWarnings("rawtypes") + final Value var = asValue(getAndCheckBound(0, bs)); - + @SuppressWarnings("rawtypes") final IV pattern = getAndCheckBound(1, bs); @SuppressWarnings("rawtypes") final IV flags = arity() > 2 ? get(2).get(bs) : null; - + if (log.isDebugEnabled()) { - log.debug("regex var: " + var); - log.debug("regex pattern: " + pattern); - log.debug("regex flags: " + flags); + log.debug("regex var: " + var); + log.debug("regex pattern: " + pattern); + log.debug("regex flags: " + flags); } - - return accept(var, pattern.getValue(), - flags != null ? flags.getValue() : null); + return accept(var, pattern.getValue(), flags != null ? flags.getValue() + : null); + } /** @@ -185,67 +186,87 @@ * REGEXBOp should cache the Pattern when it is a constant </a> */ private boolean accept(final Value arg, final Value parg, final Value farg) { - + if (log.isDebugEnabled()) { - log.debug("regex var: " + arg); - log.debug("regex pattern: " + parg); - log.debug("regex flags: " + farg); + log.debug("regex var: " + arg); + log.debug("regex pattern: " + parg); + log.debug("regex flags: " + farg); } - + if (QueryEvaluationUtil.isSimpleLiteral(arg)) { - + final String text = ((Literal) arg).getLabel(); - + try { - - // first check for cached pattern - Pattern pattern = (Pattern) getProperty(Annotations.PATTERN); - if (pattern == null) { - pattern = getPattern(parg, farg); - } + + // first check for cached pattern + Pattern pattern = (Pattern) getProperty(Annotations.PATTERN); + + if (pattern == null) { + + // resolve the pattern. NB: NOT cached. + pattern = getPattern(parg, farg); + + } + + if (Thread.interrupted()) { + + /* + * Eagerly notice if the operator is interrupted. + * + * Note: Regex can be a high latency operation for a large + * RDF Literal. Therefore we want to check for an interrupt + * before each regex test. The Pattern code itself will not + * notice an interrupt.... + */ + throw new RuntimeException(new InterruptedException()); + + } + final boolean result = pattern.matcher(text).find(); + return result; - + } catch (IllegalArgumentException ex) { - - throw new SparqlTypeErrorException(); - + + throw new SparqlTypeErrorException(); + } - + } else { - - throw new SparqlTypeErrorException(); - + + throw new SparqlTypeErrorException(); + } - + } - private static Pattern getPattern(final Value parg, final Value farg) - throws IllegalArgumentException { - + private static Pattern getPattern(final Value parg, final Value farg) + throws IllegalArgumentException { + if (log.isDebugEnabled()) { - log.debug("regex pattern: " + parg); - log.debug("regex flags: " + farg); + log.debug("regex pattern: " + parg); + log.debug("regex flags: " + farg); } if (QueryEvaluationUtil.isSimpleLiteral(parg) && (farg == null || QueryEvaluationUtil.isSimpleLiteral(farg))) { final String ptn = ((Literal) parg).getLabel(); - String flags = ""; - if (farg != null) { - flags = ((Literal)farg).getLabel(); - } - int f = 0; - for (char c : flags.toCharArray()) { - switch (c) { - case 's': - f |= Pattern.DOTALL; - break; - case 'm': - f |= Pattern.MULTILINE; - break; - case 'i': { + String flags = ""; + if (farg != null) { + flags = ((Literal)farg).getLabel(); + } + int f = 0; + for (char c : flags.toCharArray()) { + switch (c) { + case 's': + f |= Pattern.DOTALL; + break; + case 'm': + f |= Pattern.MULTILINE; + break; + case 'i': { /* * The SPARQL REGEX operator is based on the XQuery REGEX * operator. That operator should be Unicode clean by @@ -257,29 +278,29 @@ * > SPARQL REGEX operator does not perform case-folding * correctly for Unicode data </a> */ - f |= Pattern.CASE_INSENSITIVE; + f |= Pattern.CASE_INSENSITIVE; f |= Pattern.UNICODE_CASE; - break; - } - case 'x': - f |= Pattern.COMMENTS; - break; - case 'd': - f |= Pattern.UNIX_LINES; - break; - case 'u': // Implicit with 'i' flag. -// f |= Pattern.UNICODE_CASE; - break; - default: - throw new IllegalArgumentException(); - } - } + break; + } + case 'x': + f |= Pattern.COMMENTS; + break; + case 'd': + f |= Pattern.UNIX_LINES; + break; + case 'u': // Implicit with 'i' flag. +// f |= Pattern.UNICODE_CASE; + break; + default: + throw new IllegalArgumentException(); + } + } final Pattern pattern = Pattern.compile(ptn, f); return pattern; } - - throw new IllegalArgumentException(); - + + throw new IllegalArgumentException(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |