From: <tho...@us...> - 2014-01-11 12:49:13
|
Revision: 7773 http://bigdata.svn.sourceforge.net/bigdata/?rev=7773&view=rev Author: thompsonbry Date: 2014-01-11 12:49:05 +0000 (Sat, 11 Jan 2014) Log Message: ----------- @Override, final. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-01-10 23:21:17 UTC (rev 7772) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-01-11 12:49:05 UTC (rev 7773) @@ -80,8 +80,10 @@ * * @param op */ - public ConditionalRoutingOp(ConditionalRoutingOp op) { + public ConditionalRoutingOp(final ConditionalRoutingOp op) { + super(op); + } /** @@ -90,12 +92,17 @@ * @param args * @param annotations */ - public ConditionalRoutingOp(BOp[] args, Map<String, Object> annotations) { + public ConditionalRoutingOp(final BOp[] args, + final Map<String, Object> annotations) { + super(args, annotations); + } - public ConditionalRoutingOp(BOp[] args, NV... anns) { + public ConditionalRoutingOp(final BOp[] args, final NV... anns) { + this(args, NV.asMap(anns)); + } /** @@ -107,6 +114,7 @@ } + @Override public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { return new FutureTask<Void>(new ConditionalRouteTask(this, context)); @@ -153,6 +161,7 @@ } + @Override public Void call() throws Exception { try { while (source.hasNext()) { @@ -167,20 +176,20 @@ int ndef = 0, nalt = 0; - for(int i=0; i<chunk.length; i++) { + for (int i = 0; i < chunk.length; i++) { final IBindingSet bset = chunk[i].clone(); if (condition.accept(bset)) { def[ndef++] = bset; - + } else { - + alt[nalt++] = bset; - + } - + } if (ndef > 0) { @@ -217,8 +226,8 @@ } - } + } // call() - } + } // ConditionalRoutingTask. } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-01-13 14:29:09
|
Revision: 7776 http://bigdata.svn.sourceforge.net/bigdata/?rev=7776&view=rev Author: thompsonbry Date: 2014-01-13 14:29:00 +0000 (Mon, 13 Jan 2014) Log Message: ----------- Optimized the memory use of the ConditionalRoutingOp by not allocating the alt[] unless the altSink is specified. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-01-11 18:55:29 UTC (rev 7775) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/bset/ConditionalRoutingOp.java 2014-01-13 14:29:00 UTC (rev 7776) @@ -46,15 +46,22 @@ /** * An operator for conditional routing of binding sets in a pipeline. The * operator will copy binding sets either to the default sink (if a condition is - * satisfied) and to the alternate sink otherwise. + * satisfied) and otherwise to the alternate sink (iff one is specified). If a + * solution fails the constraint and the alternate sink is not specified, then + * the solution is dropped. * <p> * Conditional routing can be useful where a different data flow is required * based on the type of an object (for example a term identifier versus an * inline term in the RDF database) or where there is a need to jump around a * join group based on some condition. + * <p> + * Conditional routing will cause reordering of solutions when the alternate + * sink is specified as some solutions will flow to the primary sink while + * others flow to the alterate sink. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ + * @version $Id: ConditionalRoutingOp.java 7773 2014-01-11 12:49:05Z thompsonbry + * $ */ public class ConditionalRoutingOp extends PipelineOp { @@ -151,7 +158,7 @@ this.sink = context.getSink(); - this.sink2 = context.getSink2(); + this.sink2 = context.getSink2(); // MAY be null. // if (sink2 == null) // throw new IllegalArgumentException(); @@ -172,8 +179,9 @@ stats.unitsIn.add(chunk.length); final IBindingSet[] def = new IBindingSet[chunk.length]; - final IBindingSet[] alt = new IBindingSet[chunk.length]; - + final IBindingSet[] alt = sink2 == null ? null + : new IBindingSet[chunk.length]; + int ndef = 0, nalt = 0; for (int i = 0; i < chunk.length; i++) { @@ -182,15 +190,17 @@ if (condition.accept(bset)) { + // solution passes condition. default sink. def[ndef++] = bset; - } else { + } else if (sink2 != null) { + // solution fails condition. alternative sink. alt[nalt++] = bset; } - } + } if (ndef > 0) { if (ndef == def.length) @@ -214,16 +224,16 @@ sink.flush(); if (sink2 != null) - sink2.flush(); - + sink2.flush(); + return null; - + } finally { source.close(); sink.close(); if (sink2 != null) - sink2.close(); - + sink2.close(); + } } // call() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |