This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <mrp...@us...> - 2011-02-15 16:42:38
|
Revision: 4200 http://bigdata.svn.sourceforge.net/bigdata/?rev=4200&view=rev Author: mrpersonick Date: 2011-02-15 16:42:32 +0000 (Tue, 15 Feb 2011) Log Message: ----------- pruning variable bindings before they are materialized into BigdataValues Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2011-02-15 16:35:10 UTC (rev 4199) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2011-02-15 16:42:32 UTC (rev 4200) @@ -1,5 +1,6 @@ package com.bigdata.rdf.store; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -64,6 +65,8 @@ this.required = required; +// System.err.println("required: " + (required != null ? Arrays.toString(required) : "null")); + } /** @@ -97,6 +100,8 @@ final IBindingSet bindingSet = solution; +// System.err.println(solution); + assert bindingSet != null; if (required == null) { @@ -124,8 +129,14 @@ } else { for (IVariable v : required) { + + final IConstant c = bindingSet.get(v); - final IV iv = (IV) bindingSet.get(v).get(); + if (c == null) { + continue; + } + + final IV iv = (IV) c.get(); if (iv == null) { @@ -142,6 +153,8 @@ } +// System.err.println("resolving: " + Arrays.toString(ids.toArray())); + if (log.isInfoEnabled()) log.info("Resolving " + ids.size() + " term identifiers"); @@ -253,6 +266,8 @@ } +// System.err.println(bindingSet); + return bindingSet; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-02-15 16:35:16
|
Revision: 4199 http://bigdata.svn.sourceforge.net/bigdata/?rev=4199&view=rev Author: mrpersonick Date: 2011-02-15 16:35:10 +0000 (Tue, 15 Feb 2011) Log Message: ----------- pruning variable bindings before they are materialized into BigdataValues Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2011-02-15 16:21:09 UTC (rev 4198) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataBindingSetResolverator.java 2011-02-15 16:35:10 UTC (rev 4199) @@ -30,6 +30,8 @@ public class BigdataBindingSetResolverator extends AbstractChunkedResolverator<IBindingSet, IBindingSet, AbstractTripleStore> { + + private final IVariable[] required; /** * @@ -45,13 +47,23 @@ */ public BigdataBindingSetResolverator(final AbstractTripleStore db, final IChunkedOrderedIterator<IBindingSet> src) { + + this(db, src, null); + + } + + public BigdataBindingSetResolverator(final AbstractTripleStore db, + final IChunkedOrderedIterator<IBindingSet> src, + final IVariable[] required) { super(db, src, new BlockingBuffer<IBindingSet[]>( db.getChunkOfChunksCapacity(), db.getChunkCapacity(), db.getChunkTimeout(), TimeUnit.MILLISECONDS)); - + + this.required = required; + } /** @@ -87,28 +99,49 @@ assert bindingSet != null; - final Iterator<Map.Entry<IVariable, IConstant>> itr = bindingSet - .iterator(); + if (required == null) { + + final Iterator<Map.Entry<IVariable, IConstant>> itr = + bindingSet.iterator(); - while (itr.hasNext()) { - - final Map.Entry<IVariable, IConstant> entry = itr.next(); - - final IV iv = (IV) entry.getValue().get(); - - if (iv == null) { - - throw new RuntimeException("NULL? : var=" + entry.getKey() - + ", " + bindingSet); - - } - - ids.add(iv); - + while (itr.hasNext()) { + + final Map.Entry<IVariable, IConstant> entry = itr.next(); + + final IV iv = (IV) entry.getValue().get(); + + if (iv == null) { + + throw new RuntimeException("NULL? : var=" + entry.getKey() + + ", " + bindingSet); + + } + + ids.add(iv); + + } + + } else { + + for (IVariable v : required) { + + final IV iv = (IV) bindingSet.get(v).get(); + + if (iv == null) { + + throw new RuntimeException("NULL? : var=" + v + + ", " + bindingSet); + + } + + ids.add(iv); + + } + } } - + if (log.isInfoEnabled()) log.info("Resolving " + ids.size() + " term identifiers"); @@ -171,14 +204,19 @@ if (terms == null) throw new IllegalArgumentException(); - final IBindingSet bindingSet = solution; - - if(bindingSet == null) { + if(solution == null) { throw new IllegalStateException("BindingSet was not materialized"); } + final IBindingSet bindingSet; + if (required == null) { + bindingSet = solution; + } else { + bindingSet = solution.copy(required); + } + final Iterator<Map.Entry<IVariable, IConstant>> itr = bindingSet .iterator(); @@ -214,7 +252,7 @@ value)); } - + return bindingSet; } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-02-15 16:21:09 UTC (rev 4198) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl3.java 2011-02-15 16:35:10 UTC (rev 4199) @@ -833,7 +833,7 @@ * Begin native bigdata evaluation. */ CloseableIteration<BindingSet, QueryEvaluationException> result = doEvaluateNatively( - query, bs, queryEngine);// , sesameFilters); + query, bs, queryEngine, required);// , sesameFilters); /* * Use the basic filter iterator for any remaining filters which will be @@ -868,7 +868,7 @@ CloseableIteration<BindingSet, QueryEvaluationException> doEvaluateNatively(final PipelineOp query, final BindingSet bs, - final QueryEngine queryEngine + final QueryEngine queryEngine, final IVariable[] required // , final Collection<Filter> sesameFilters ) throws QueryEvaluationException { @@ -883,7 +883,7 @@ * Wrap up the native bigdata query solution iterator as Sesame * compatible iteration with materialized RDF Values. */ - return wrapQuery(runningQuery);//, sesameFilters); + return wrapQuery(runningQuery, required);//, sesameFilters); } catch (UnsupportedOperatorException t) { if (runningQuery != null) { @@ -919,7 +919,7 @@ * @throws QueryEvaluationException */ CloseableIteration<BindingSet, QueryEvaluationException> wrapQuery( - final IRunningQuery runningQuery + final IRunningQuery runningQuery, final IVariable[] required ) throws QueryEvaluationException { // The iterator draining the query solutions. @@ -938,7 +938,7 @@ // Convert bigdata binding sets to Sesame binding sets. new Bigdata2Sesame2BindingSetIterator<QueryEvaluationException>( // Materialize IVs as RDF Values. - new BigdataBindingSetResolverator(database, it2).start( + new BigdataBindingSetResolverator(database, it2, required).start( database.getExecutorService()))); return result; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-02-15 16:21:15
|
Revision: 4198 http://bigdata.svn.sourceforge.net/bigdata/?rev=4198&view=rev Author: martyncutcher Date: 2011-02-15 16:21:09 +0000 (Tue, 15 Feb 2011) Log Message: ----------- Fix to check for downstream send to avoid IllegalArgumentException Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2011-02-15 13:18:09 UTC (rev 4197) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2011-02-15 16:21:09 UTC (rev 4198) @@ -730,7 +730,7 @@ if (rdlen > 0) updateChk(rdlen); - if (rdlen == -1 || rdlen == 0) + if (rdlen == -1) break; rem -= rdlen; @@ -749,8 +749,11 @@ * changed, then the ReadTask is interrupted using its * Future and the WriteCacheService will handle the error by * retransmitting the current cache block. + * + * The rdlen is checked for non zero to avoid an + * IllegalArgumentException. */ - if (addrNext != null) { + if (rdlen != 0 && addrNext != null) { if (log.isTraceEnabled()) log .trace("Incremental send of " + rdlen This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-02-15 13:18:17
|
Revision: 4197 http://bigdata.svn.sourceforge.net/bigdata/?rev=4197&view=rev Author: martyncutcher Date: 2011-02-15 13:18:09 +0000 (Tue, 15 Feb 2011) Log Message: ----------- Check for zero length buffer before forwarding downstream to avoid IllegalArgumentException Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2011-02-12 16:23:42 UTC (rev 4196) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2011-02-15 13:18:09 UTC (rev 4197) @@ -730,7 +730,7 @@ if (rdlen > 0) updateChk(rdlen); - if (rdlen == -1) + if (rdlen == -1 || rdlen == 0) break; rem -= rdlen; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-12 16:23:49
|
Revision: 4196 http://bigdata.svn.sourceforge.net/bigdata/?rev=4196&view=rev Author: thompsonbry Date: 2011-02-12 16:23:42 +0000 (Sat, 12 Feb 2011) Log Message: ----------- Modified the DataLoader to report the total #of statements processed so far in its periodic log messages. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-11 21:53:40 UTC (rev 4195) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-12 16:23:42 UTC (rev 4196) @@ -1005,16 +1005,21 @@ loader.addRioLoaderListener( new RioLoaderListener() { public void processingNotification( final RioLoaderEvent e ) { - /* This reports as statements are parsed. Depending on how things are buffered, the parser can run ahead of the index writes. */ - if (log.isInfoEnabled()) { - log.info - ( e.getStatementsProcessed() + - " stmts buffered in " + - (e.getTimeElapsed() / 1000d) + - " secs, rate= " + - e.getInsertRate() + - (baseURL != null ? ", baseURL=" + baseURL : "") ); - } + /* + * This reports as statements are parsed. Depending on how + * things are buffered, the parser can run ahead of the index + * writes. + */ + if (log.isInfoEnabled()) { + log.info(e.getStatementsProcessed() + " stmts buffered in " + + (e.getTimeElapsed() / 1000d) + " secs, rate= " + + e.getInsertRate() + + (baseURL != null ? ", baseURL=" + baseURL : "") + // + (", totalStatementsSoFar="// + + (e.getStatementsProcessed()// + + totals.toldTriples.get()))// + ); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-11 21:53:46
|
Revision: 4195 http://bigdata.svn.sourceforge.net/bigdata/?rev=4195&view=rev Author: thompsonbry Date: 2011-02-11 21:53:40 +0000 (Fri, 11 Feb 2011) Log Message: ----------- Modified the data loader to provide better reporting on the file being loaded. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-11 21:43:55 UTC (rev 4194) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-11 21:53:40 UTC (rev 4195) @@ -1080,7 +1080,7 @@ if (log.isInfoEnabled()) { log.info("file:: " + stats + "; totals:: " + totals - + (baseURL != null ? "; current baseURL=" + baseURL : "")); + + (baseURL != null ? "; baseURL=" + baseURL : "")); if (buffer != null && buffer.getDatabase() instanceof AbstractLocalTripleStore) { if(log.isDebugEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-11 21:44:01
|
Revision: 4194 http://bigdata.svn.sourceforge.net/bigdata/?rev=4194&view=rev Author: thompsonbry Date: 2011-02-11 21:43:55 +0000 (Fri, 11 Feb 2011) Log Message: ----------- Modified the DataLoader to report information on which file it is currently loaded via the baseURL. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-11 18:59:41 UTC (rev 4193) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2011-02-11 21:43:55 UTC (rev 4194) @@ -1005,15 +1005,15 @@ loader.addRioLoaderListener( new RioLoaderListener() { public void processingNotification( final RioLoaderEvent e ) { - + /* This reports as statements are parsed. Depending on how things are buffered, the parser can run ahead of the index writes. */ if (log.isInfoEnabled()) { log.info ( e.getStatementsProcessed() + - " stmts added in " + + " stmts buffered in " + (e.getTimeElapsed() / 1000d) + " secs, rate= " + - e.getInsertRate() - ); + e.getInsertRate() + + (baseURL != null ? ", baseURL=" + baseURL : "") ); } } @@ -1079,7 +1079,8 @@ totals.add(stats); if (log.isInfoEnabled()) { - log.info("file:: " + stats + "; totals:: " + totals); + log.info("file:: " + stats + "; totals:: " + totals + + (baseURL != null ? "; current baseURL=" + baseURL : "")); if (buffer != null && buffer.getDatabase() instanceof AbstractLocalTripleStore) { if(log.isDebugEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-11 18:59:48
|
Revision: 4193 http://bigdata.svn.sourceforge.net/bigdata/?rev=4193&view=rev Author: thompsonbry Date: 2011-02-11 18:59:41 +0000 (Fri, 11 Feb 2011) Log Message: ----------- Added various classes to support reporting of static, dynamic, and type errors per SPARQL, XPath, and XQuery specs. Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlDynamicErrorException.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlStaticErrorException.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlTypeErrorException.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/W3CQueryLanguageException.java Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlDynamicErrorException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlDynamicErrorException.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlDynamicErrorException.java 2011-02-11 18:59:41 UTC (rev 4193) @@ -0,0 +1,61 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Feb 11, 2011 + */ + +package com.bigdata.rdf.error; + + +/** + * A SPARQL static analysis error. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class SparqlDynamicErrorException extends W3CQueryLanguageException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * @param errorCode + * The four digit error code. + */ + public SparqlDynamicErrorException(int errorCode) { + + super(LanguageFamily.SP, ErrorCategory.DY, errorCode, null/* msg */); + + } + + static protected String toURI(int errorCode) { + + return W3CQueryLanguageException.toURI(LanguageFamily.SP, + ErrorCategory.DY, errorCode); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlDynamicErrorException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlStaticErrorException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlStaticErrorException.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlStaticErrorException.java 2011-02-11 18:59:41 UTC (rev 4193) @@ -0,0 +1,59 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Feb 11, 2011 + */ + +package com.bigdata.rdf.error; + +/** + * A SPARQL dynamic (runtime) error. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class SparqlStaticErrorException extends W3CQueryLanguageException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * @param errorCode + * The four digit error code. + */ + public SparqlStaticErrorException(int errorCode) { + + super(LanguageFamily.SP, ErrorCategory.ST, errorCode, null/* msg */); + + } + + static protected String toURI(int errorCode) { + + return W3CQueryLanguageException.toURI(LanguageFamily.SP, + ErrorCategory.ST, errorCode); + + } +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlStaticErrorException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlTypeErrorException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlTypeErrorException.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlTypeErrorException.java 2011-02-11 18:59:41 UTC (rev 4193) @@ -0,0 +1,77 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Feb 11, 2011 + */ + +package com.bigdata.rdf.error; + +/** + * A SPARQL type error. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class SparqlTypeErrorException extends W3CQueryLanguageException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Generic type error. + */ + static public String SPARQL_TYPE_ERROR_0000 = toURI(0); + + /** + * Generic SPARQL type error. + * + * @see #SPARQL_TYPE_ERROR_0000 + */ + public SparqlTypeErrorException() { + + super(LanguageFamily.SP, ErrorCategory.TY, 0/* errorCode */, + SPARQL_TYPE_ERROR_0000); + + } + + /** + * @param errorCode + * The four digit error code. + */ + public SparqlTypeErrorException(int errorCode) { + + super(LanguageFamily.SP, ErrorCategory.TY, errorCode, null/* msg */); + + } + + static protected String toURI(int errorCode) { + + return W3CQueryLanguageException.toURI(LanguageFamily.SP, + ErrorCategory.TY, errorCode); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/SparqlTypeErrorException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/W3CQueryLanguageException.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/W3CQueryLanguageException.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/W3CQueryLanguageException.java 2011-02-11 18:59:41 UTC (rev 4193) @@ -0,0 +1,159 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Feb 11, 2011 + */ + +package com.bigdata.rdf.error; + +import java.util.Formatter; + +/** + * Exception Base class for errors defined by the W3C for XQuery, XPath, and + * SPARQL. + * + * @see http://www.w3.org/TR/xquery/#errors + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class W3CQueryLanguageException extends RuntimeException { + + /** + * Namespace for the error URIs. + */ + protected static final transient String err = "http://www.w3.org/2005/xqt-errors"; + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** Language family for errors. */ + public static enum LanguageFamily { + /** + * XQuery + */ + XQ, + /** + * XPath + */ + XP, + /** + * SPARQL + */ + SP + } + + /** Error category. */ + public static enum ErrorCategory { + /** Static analysis error. */ + ST, + /** Dynamic (runtime) error. */ + DY, + /** Type error. */ + TY + }; + + /** + * The {@link LanguageFamily}. + */ + public final LanguageFamily languageFamily; + + /** + * The {@link ErrorCategory}. + */ + public final ErrorCategory errorCategory; + + /** + * The four digit error code. + */ + public final int errorCode; + + /** + * + * @param languageFamily + * The {@link LanguageFamily}. + * @param errorCategory + * The {@link ErrorCategory}. + * @param errorCode + * The four digit error code. + * @param msg + * The <em>URI</em> corresponding to the error. Frequently used + * errors should use + * {@link #toURI(LanguageFamily, ErrorCategory, int)} to define + * the URI statically to avoid heap churn. + */ + public W3CQueryLanguageException(LanguageFamily languageFamily, + ErrorCategory errorCategory, int errorCode, String msg) { + + super(msg == null ? toURI(languageFamily, errorCategory, errorCode) + : msg); + + this.languageFamily = languageFamily; + + this.errorCategory = errorCategory; + + this.errorCode = errorCode; + + } + + /** + * Return the URI for the given error. This is used to avoid the runtime + * creation of strings for frequently thrown errors, such as type errors. + * Various subclasses use this method to declare concrete URIs which are + * then passed into their constructors for specific kinds of errors. + * + * @param languageFamily + * The {@link LanguageFamily}. + * @param errorCategory + * The {@link ErrorCategory}. + * @param errorCode + * The four digit error code. + * + * @return The URI. + */ + static protected String toURI(LanguageFamily languageFamily, + ErrorCategory errorCategory, int errorCode) { + + if (errorCode >= 10000 || errorCode < 0) + throw new IllegalArgumentException(); + + final StringBuffer sb = new StringBuffer(4); + + final Formatter f = new Formatter(sb); + + f.format("%04d", errorCode); + + return err + languageFamily + errorCategory + sb.toString(); + + } + + // public static void main(String[] x) { + // + // System.err.println(toURI(LanguageFamily.SP,ErrorCategory.TY,120)); + // + // } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/error/W3CQueryLanguageException.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2011-02-11 10:58:58
|
Revision: 4192 http://bigdata.svn.sourceforge.net/bigdata/?rev=4192&view=rev Author: martyncutcher Date: 2011-02-11 10:58:52 +0000 (Fri, 11 Feb 2011) Log Message: ----------- Ensure hasNext is not called a second time for same iterator Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/ctc-striterators/src/java/cutthecrap/utils/striterators/Appenderator.java Modified: branches/QUADS_QUERY_BRANCH/ctc-striterators/src/java/cutthecrap/utils/striterators/Appenderator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/ctc-striterators/src/java/cutthecrap/utils/striterators/Appenderator.java 2011-02-10 22:08:22 UTC (rev 4191) +++ branches/QUADS_QUERY_BRANCH/ctc-striterators/src/java/cutthecrap/utils/striterators/Appenderator.java 2011-02-11 10:58:52 UTC (rev 4192) @@ -52,6 +52,8 @@ public boolean hasNext() { if (m_current.hasNext()) { return true; + } else if (m_current == m_xtra) { // don't call twice + return false; } m_current = m_xtra; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-10 22:08:30
|
Revision: 4191 http://bigdata.svn.sourceforge.net/bigdata/?rev=4191&view=rev Author: thompsonbry Date: 2011-02-10 22:08:22 +0000 (Thu, 10 Feb 2011) Log Message: ----------- More work on GROUP_BY Working w/ MikeP on a problem with BOpUtility#getSpannedVariables(), which has an underlying problem in preOrderTraversal() which can take time apparently exponential in the depth of the operator tree! (This is a redo of a failed commit). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/filter/BOpFilterBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -28,6 +28,7 @@ package com.bigdata.bop; import java.io.Serializable; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -71,11 +72,20 @@ * @return The argument. */ BOp get(int index); + + /** + * The operator's arguments as an unmodified list. + * + * @todo Consider deprecating since this is much less efficient than + * {@link #argIterator()}. + */ + List<BOp> args(); /** - * The operator's arguments. + * An iterator visiting the operator's arguments. The iterator does + * not support removal. (This is more efficient than #args()). */ - List<BOp> args(); + Iterator<BOp> argIterator(); /** A shallow copy of the operator's arguments. */ BOp[] toArray(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -35,6 +35,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import com.bigdata.bop.constraint.EQ; import com.bigdata.btree.Tuple; @@ -278,9 +279,40 @@ final public List<BOp> args() { return Collections.unmodifiableList(Arrays.asList(args)); +// return Arrays.asList(args); } + // @todo unit tests. + final public Iterator<BOp> argIterator() { + + return new ArgIterator(); + + } + + /** + * An iterator visiting the arguments which does not support removal. + */ + private class ArgIterator implements Iterator<BOp> { + + private int i = 0; + + public boolean hasNext() { + return i < args.length; + } + + public BOp next() { + if (!hasNext()) + throw new NoSuchElementException(); + return args[i++]; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + } + // shallow copy public BOp[] toArray() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -44,6 +44,7 @@ import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; +import cutthecrap.utils.striterators.EmptyIterator; import cutthecrap.utils.striterators.Expander; import cutthecrap.utils.striterators.Filter; import cutthecrap.utils.striterators.SingleValueIterator; @@ -67,7 +68,7 @@ public static Iterator<BOp> preOrderIterator(final BOp op) { return new Striterator(new SingleValueIterator(op)) - .append(preOrderIterator2(op)); + .append(preOrderIterator2(0,op)); } @@ -76,14 +77,18 @@ * NOT visit this node. */ @SuppressWarnings("unchecked") - static private Iterator<AbstractNode> preOrderIterator2(final BOp op) { + static private Iterator<AbstractNode> preOrderIterator2(final int depth, final BOp op) { /* * Iterator visits the direct children, expanding them in turn with a * recursive application of the pre-order iterator. */ + + // mild optimization when no children are present. + if (op.arity() == 0) + return EmptyIterator.DEFAULT; - return new Striterator(op.args().iterator()).addFilter(new Expander() { + return new Striterator(op.argIterator()).addFilter(new Expander() { private static final long serialVersionUID = 1L; @@ -106,11 +111,13 @@ * Visit the children (recursive pre-order traversal). */ +// System.err.println("Node["+depth+"]: "+op.getClass().getName()); + final Striterator itr = new Striterator( new SingleValueIterator(child)); - // append this node in post-order position. - itr.append(preOrderIterator2(child)); + // append this node in post-order position. + itr.append(preOrderIterator2(depth+1,child)); return itr; @@ -120,10 +127,13 @@ * The child is a leaf. */ +// System.err.println("Leaf["+depth+"]: "+op.getClass().getName()); + // Visit the leaf itself. return new SingleValueIterator(child); } + } }); @@ -153,7 +163,7 @@ * recursive application of the post-order iterator. */ - return new Striterator(op.args().iterator()).addFilter(new Expander() { + return new Striterator(op.argIterator()).addFilter(new Expander() { private static final long serialVersionUID = 1L; @@ -297,7 +307,7 @@ } }); - // append the pre-order traveral of each annotation. + // append the pre-order traversal of each annotation. itr.append(itr2); return itr; @@ -307,10 +317,11 @@ } - /** - * Return all variables recursively using a pre-order traversal present - * whether in the operator tree or on annotations attached to operators. - */ + /** + * Return the distinct variables recursively using a pre-order traversal + * present whether in the operator tree or on annotations attached to + * operators. + */ @SuppressWarnings("unchecked") public static Iterator<IVariable<?>> getSpannedVariables(final BOp op) { @@ -322,7 +333,7 @@ public boolean isValid(Object arg0) { return arg0 instanceof IVariable<?>; } - }); + }).makeUnique(); } @@ -337,7 +348,7 @@ @SuppressWarnings("unchecked") static public Iterator<IVariable<?>> getArgumentVariables(final BOp op) { - return new Striterator(op.args().iterator()) + return new Striterator(op.argIterator()) .addFilter(new Filter() { private static final long serialVersionUID = 1L; @@ -357,7 +368,7 @@ */ static public int getArgumentVariableCount(final BOp op) { int nvars = 0; - final Iterator<BOp> itr = op.args().iterator(); + final Iterator<BOp> itr = op.argIterator(); while(itr.hasNext()) { final BOp arg = itr.next(); if (arg instanceof IVariable<?>) @@ -504,7 +515,7 @@ if (op == null) throw new IllegalArgumentException(); - final Iterator<BOp> itr = root.args().iterator(); + final Iterator<BOp> itr = root.argIterator(); while (itr.hasNext()) { @@ -770,8 +781,12 @@ if (bop == null) return; - for (BOp arg : bop.args()) { + final Iterator<BOp> itr = bop.argIterator(); + while(itr.hasNext()) { + + final BOp arg = itr.next(); + if (!(arg instanceof IVariableOrConstant<?>)) { toString(arg, sb, indent+1); @@ -805,7 +820,7 @@ } - private static final transient String ws = " "; + private static final transient String ws = " "; // /** // * Verify that all bops from the identified <i>startId</i> to the root are Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -1,16 +0,0 @@ -package com.bigdata.bop; - -/** - * An aggregate operator, such as SUM, COUNT, MIN, MAX, etc. - * - * @author thompsonbry - */ -public interface IAggregate<E> extends IValueExpression<E>{ - - /** - * Return the current value of the aggregate (this has a side-effect on the - * internal state of the {@link IAggregate} operator). - */ - E get(IBindingSet bset); - -} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/AggregateBase.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -0,0 +1,101 @@ +package com.bigdata.bop.aggregate; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.NV; +import com.bigdata.bop.Var; +import com.bigdata.bop.BOp.Annotations; + +/** + * Abstract base class for aggregate functions. + * + * @author thompsonbry + * + * @param <E> + */ +abstract public class AggregateBase<E> extends ImmutableBOp implements IAggregate<E> { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends ImmutableBOp.Annotations { + + /** + * Optional boolean property indicates whether the aggregate applies to + * the distinct within group solutions (default + * {@value #DEFAULT_DISTINCT}). + */ + String DISTINCT = AggregateBase.class.getName()+".distinct"; + + boolean DEFAULT_DISTINCT = false; + + } + + public AggregateBase(BOpBase op) { + super(op); + } + + public AggregateBase(BOp[] args, Map<String, Object> annotations) { + + super(args, annotations); + + if (!isWildcardAllowed() && getExpression() == Var.var("*")) { + + /* + * Only COUNT may use the wildcard '*' variable. + */ + + throw new UnsupportedOperationException("'*' not permitted."); + + } + + } + + /** + * + * @param distinct + * <code>true</code> iff the keyword DISTINCT was used, for + * example <code>COUNT(DISTINCT y)</code> + * @param expr + * The value expression to be computed, for example + * <code>x</code> in <code>COUNT(DISTINCT x)</code> or + * <code>y+x</code> in <code>MIN(x+y)</code>. + */ + public AggregateBase(final boolean distinct, final IValueExpression<E> expr) { + + this(new BOp[] { expr }, distinct ? NV.asMap(new NV( + Annotations.DISTINCT, true)) : null); + + } + + final public boolean isDistinct() { + + return getProperty(Annotations.DISTINCT, Annotations.DEFAULT_DISTINCT); + + } + + @SuppressWarnings("unchecked") + final public IValueExpression<E> getExpression() { + + return (IValueExpression<E>) get(0); + + } + + /** + * Return <code>true</code> iff the {@link IValueExpression} may be the + * special variable <code>*</code>. The default implementation always + * returns <code>false</code>. + */ + public boolean isWildcardAllowed() { + + return false; + + } + +} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java (from rev 4185, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/aggregate/IAggregate.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -0,0 +1,46 @@ +package com.bigdata.bop.aggregate; + +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; + +/** + * An aggregate operator, such as SUM, COUNT, MIN, MAX, etc. + * + * @author thompsonbry + */ +public interface IAggregate<E> extends IValueExpression<E>{ + + /** + * <code>true</code> if the aggregate is to be applied to the distinct + * solutions within the group. E.g., + * + * <pre> + * COUNT(DISTINCT x) + * </pre> + * + * <pre> + * COUNT(DISTINCT *) + * </pre> + * + * or + * + * <pre> + * SUM(DISTINCT x) + * </pre> + */ + boolean isDistinct(); + + /** + * Return the {@link IValueExpression} to be computed by the aggregate. For + * <code>COUNT</code> this may be the special variable <code>*</code>, which + * is interpreted to mean all variables declared in the source solutions. + */ + IValueExpression<E> getExpression(); + + /** + * Return the current value of the aggregate (this has a side-effect on the + * internal state of the {@link IAggregate} operator). + */ + E get(IBindingSet bset); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/filter/BOpFilterBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/filter/BOpFilterBase.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/filter/BOpFilterBase.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -81,8 +81,12 @@ final public Iterator filter(Iterator src, final Object context) { // wrap source with each additional filter from the filter chain. - for (BOp arg : args()) { + final Iterator<BOp> itr = argIterator(); + + while(itr.hasNext()) { + final BOp arg = itr.next(); + src = ((BOpFilterBase) arg).filter(src, context); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -535,10 +535,17 @@ * Visit children, but not if this is a CONTROLLER operator since * its children belong to a subquery. */ - for (BOp t : op.args()) { + final Iterator<BOp> itr = op.argIterator(); + + while(itr.hasNext()) { + + final BOp t = itr.next(); + // visit children (recursion) populateStatsMap(t); + } + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -30,8 +30,10 @@ import java.util.Map; import com.bigdata.bop.BOp; +import com.bigdata.bop.Bind; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; import com.bigdata.bop.PipelineOp; /** @@ -51,14 +53,25 @@ public interface Annotations extends PipelineOp.Annotations { /** - * The ordered set of variables declared by {@link #COMPUTE} which are - * projected out of the group by operator. + * The ordered set of {@link IVariable}s which are projected out of the + * group by operator. + * <p> + * The variables named in {@link #SELECT} must either: (a) appear the + * {@link #GROUP_BY} declaration as simple {@link IVariable} s; or (b) + * be declared by {@link #COMPUTE}. */ String SELECT = GroupByOp.class.getName() + ".select"; /** * The ordered set of {@link IValueExpression}s which are to be * computed. + * <p> + * The top-level for each element of {@link #COMPUTE} must be either an + * {@link IVariable} or a {@link Bind}. When present, the {@link Bind} + * has the effect of assigning the result of an {@link IValueExpression} + * to an {@link IVariable}. Only {@link IVariable}s declared in the + * input solutions may be referenced in a {@link #COMPUTE} + * {@link IValueExpression}. * * TODO This really needs to be VAR := EXPR. EXPR can only reference the * source variables or variables declared earlier in the ordered Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/ISortOrder.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -30,6 +30,7 @@ import java.io.Serializable; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; /** @@ -40,9 +41,19 @@ */ public interface ISortOrder<E> extends Serializable { - /** - * The variable whose values will be sorted. - */ + /** + * The variable whose values will be sorted. + * + * FIXME ORDER_BY is defined in terms of Expressions, not just Vars. Either + * this will need to be an {@link IValueExpression} which is evaluated + * during the ordering or we will have to pre-compute a hidden variable + * which can be ordered directly. Presumably BrackettedExpression provides a + * computed RDF Value while Constraint orders based on the BEV. + * + * <pre> + * [23] OrderCondition ::= ( ( 'ASC' | 'DESC' ) BrackettedExpression ) | ( Constraint | Var ) + * </pre> + */ IVariable<E> getVariable(); /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -14,12 +14,13 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.ConcurrentHashMapAnnotations; -import com.bigdata.bop.IAggregate; +import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.bop.engine.BOpStats; import com.bigdata.relation.accesspath.IAsynchronousIterator; @@ -62,20 +63,70 @@ * GROUP_CONCAT function is specified such that it combines a large #of * input solution bindings into a big string. * - * FIXME How should we handle DISTINCT semantics for GROUP_BY? (I think - * that we just insert a {@link DistinctBindingSetOp} before the - * GROUP_BY). + * FIXME How should we handle nulls (unbound variables) and type errors + * during aggregation? (LeeF suggests that they cause type errors which + * are propagated such that the aggregated value winds up unbound but I + * can not reconcile this with the language in the W3C draft which would + * appear to suggest that detail records are ignored if they result in + * type errors when computing the aggregate). * - * FIXME How should we handle nulls (missing values) during aggregation? - * (It appears that nulls and type errors are generally handled by the - * aggregate operator ignoring the detail record). - * * FIXME All of the {@link IAggregate} operators have a side-effect. In * order for them to have isolated side-effects for distinct groups, they * would have to either internalize a value map for the group or each * group would have to use a distinct instance. If the latter, then * provide for this on the operator, e.g., newInstance(), and document * why. + * + * FIXME Review all syntax/semantic: + * + * <pre> + * [17] SolutionModifier ::= GroupClause? HavingClause? OrderClause? LimitOffsetClauses? + * [18] GroupClause ::= 'GROUP' 'BY' GroupCondition+ + * [19] GroupCondition ::= ( BuiltInCall | FunctionCall | '(' Expression ( 'AS' Var )? ')' | Var ) + * [20] HavingClause ::= 'HAVING' HavingCondition+ + * [21] HavingCondition ::= Constraint + * [61] FunctionCall ::= IRIref ArgList + * [62] ArgList ::= ( NIL | '(' 'DISTINCT'? Expression ( ',' Expression )* ')' ) + * [106] BuiltInCall ::= 'STR' '(' Expression ')' .... + * </pre> + * + * FIXME The aggregate functions can have the optional keyword DISTINCT + * which forces the application to the distinct solutions within each + * group. [COUNT(DISTINCT *) appears to have some special semantics as + * well, but I can't figure out what the difference is from the use of + * DISTINCT with other aggregate operators unless it applies to the set of + * variables which are used to impose DISTINCT on the solutions within the + * group.] + * + * I've proven to my satisfaction that MySQL is behaving as per your description of the SPARQL semantics even when there are multiple columns in the aggregated solutions. It examines the distinct values within each group for the computed value of the expression within the aggregate function. + +mysql> select * from test; ++------+------+------+ +| s | i | j | ++------+------+------+ +| A | 1 | 1 | +| A | 1 | 2 | +| A | 1 | 3 | +| A | 1 | 4 | ++------+------+------+ +4 rows in set (0.00 sec) + +mysql> select sum(i), sum(j), sum(distinct i), sum(distinct j), sum(i+j), sum(distinct i+j) from test; ++--------+--------+-----------------+-----------------+----------+-------------------+ +| sum(i) | sum(j) | sum(distinct i) | sum(distinct j) | sum(i+j) | sum(distinct i+j) | ++--------+--------+-----------------+-----------------+----------+-------------------+ +| 4 | 10 | 1 | 10 | 14 | 14 | ++--------+--------+-----------------+-----------------+----------+-------------------+ +1 row in set (0.00 sec) + +mysql> select sum(i), sum(j), sum(distinct i), sum(distinct j), sum(i+j), sum(distinct i+j) from test group by s; ++--------+--------+-----------------+-----------------+----------+-------------------+ +| sum(i) | sum(j) | sum(distinct i) | sum(distinct j) | sum(i+j) | sum(distinct i+j) | ++--------+--------+-----------------+-----------------+----------+-------------------+ +| 4 | 10 | 1 | 10 | 14 | 14 | ++--------+--------+-----------------+-----------------+----------+-------------------+ +1 row in set (0.00 sec) + */ public class MemoryGroupByOp extends GroupByOp { @@ -251,17 +302,17 @@ final IValueExpression<?>[] compute) { /* - * @todo The aggregated variables are all undefined the first time a - * source binding set is presented and need to be initialized to an - * appropriate value. + * FIXME The aggregate functions have side-effects so we need to use + * a distinct instance of each function for each group. */ // synchronize for visibility. synchronized(this) { + for(IValueExpression<?> expr : compute) { + System.err.println(expr.get(bset)); + } } - throw new UnsupportedOperationException(); - } } // SolutionGroup @@ -318,7 +369,7 @@ * The ordered array of variables which define the distinct groups to * be aggregated. */ - private final IVariable<?>[] groupBy; + private final IValueExpression<?>[] groupBy; /** * The {@link IValueExpression}s used to compute each of the variables @@ -344,7 +395,7 @@ this.context = context; // must be non-null, and non-empty array w/o dups. - this.groupBy = (IVariable[]) op + this.groupBy = (IValueExpression<?>[]) op .getRequiredProperty(GroupByOp.Annotations.GROUP_BY); if (groupBy == null) @@ -378,7 +429,7 @@ // may be null or empty[]. this.having = (IConstraint[]) op - .getRequiredProperty(GroupByOp.Annotations.HAVING); + .getProperty(GroupByOp.Annotations.HAVING); /* * The variables to project out of the GROUP_BY operator. This may @@ -418,7 +469,8 @@ * * @todo write a unit test when some variables are not bound. */ - r[i] = bset.get(groupBy[i]); +// r[i] = bset.get(groupBy[i]); + r[i] = new Constant(groupBy[i].get(bset)); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -10,6 +10,7 @@ import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.engine.BOpStats; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -20,6 +21,22 @@ * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z * thompsonbry $ * + * FIXME ORDER_BY is defined in terms of Expressions, not just Vars. + * Either this will need to be an {@link IValueExpression} which is + * evaluated during the ordering or we will have to pre-compute a + * hidden variable which can be ordered directly. Presumably + * BrackettedExpression provides a computed RDF Value while Constraint + * orders based on the BEV. Write unit tests for those computed + * expressions. + * + * <pre> + * [22] OrderClause ::= 'ORDER' 'BY' OrderCondition+ + * [23] OrderCondition ::= ( ( 'ASC' | 'DESC' ) BrackettedExpression ) | ( Constraint | Var ) + * </pre> + * + * FIXME ORDER_BY should be written out of a CONSTRUCT or DESCRIBE + * query since it will not have any affect on the solutions. + * * @todo do an external merge sort operator. * @todo do a wordsort operator w/ ties broken by the {@link ComparatorOp} after * the main sort. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -33,8 +33,7 @@ import junit.framework.TestCase2; -import com.bigdata.bop.constraint.BOpConstraint; -import com.bigdata.bop.constraint.OR; +import junit.framework.TestCase2; /** * Unit tests for {@link BOpUtility}. @@ -76,10 +75,9 @@ { final BOp op1 = new BOpBase(new BOp[] { Var.var("y") }, null/* annotations */); - assertEquals(1, op1.arity()); + assertEquals(1, op1.arity()); - assertSameIterator(new Object[] { Var.var("y") }, op1.args() - .iterator()); + assertSameIterator(new Object[] { Var.var("y") }, op1.argIterator()); assertSameIterator(new Object[] { Var.var("y") }, BOpUtility .getArgumentVariables(op1)); @@ -94,7 +92,7 @@ assertEquals(2,op2.arity()); assertSameIterator(new Object[] { Var.var("x"), Var.var("y") }, op2 - .args().iterator()); + .argIterator()); assertSameIterator(new Object[] { Var.var("x"), Var.var("y") }, BOpUtility.getArgumentVariables(op2)); @@ -107,7 +105,7 @@ Var.var("y") }, null/* annotations */); assertSameIterator(new Object[] { new Constant<String>("x"), - Var.var("y") }, op3.args().iterator()); + Var.var("y") }, op3.argIterator()); assertSameIterator(new Object[] { Var.var("y") }, BOpUtility .getArgumentVariables(op3)); @@ -420,73 +418,6 @@ } /** - * Unit test for {@link BOpUtility#getSpannedVariables(BOp)}. - */ - public void test_getSpannedVariables2() { - - final IValueExpression<?> a = Var.var("a"); - - IConstraint bop = null; - - final int count = 100; - - for (int i = 0; i < count; i++) { - - final IConstraint c = new DummyConstraint( - new BOp[] { a, new Constant<Integer>(i) }, - null/*annotations*/); - - if (bop == null) { - bop = c; - } else { - bop = new OR(c, bop); - } - - } - - final Object[] expected = new Object[]{// - a,// - }; - - int i = 0; - final Iterator<IVariable<?>> itr = BOpUtility - .getSpannedVariables(bop); - while (itr.hasNext()) { - final BOp t = itr.next(); - System.out.println(i + " : " + t); -// assertTrue("index=" + i + ", expected=" + expected[i] + ", actual=" -// + t, expected[i].equals(t)); - i++; - } - - assertEquals(i, expected.length); - - assertSameIterator(expected, BOpUtility - .getSpannedVariables(bop)); - - } - - private static class DummyConstraint extends BOpConstraint { - - /** - * - */ - private static final long serialVersionUID = 1942393209821562541L; - - public DummyConstraint(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - - public DummyConstraint(BOpBase op) { - super(op); - } - - public boolean accept(IBindingSet bindingSet) { - throw new RuntimeException(); - } - } - - /** * Unit test for {@link BOpUtility#getIndex(BOp)} using valid inputs. */ public void test_getIndex() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -23,12 +23,74 @@ */ package com.bigdata.bop.solutions; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + import junit.framework.TestCase2; +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; +import com.bigdata.bop.bindingSet.ArrayBindingSet; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.BlockingBufferWithStats; +import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.MockRunningQuery; +import com.bigdata.bop.engine.TestQueryEngine; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + /** * Unit tests for {@link MemoryGroupByOp}. * * @author thompsonbry + * + * @todo correct rejection tests for various kinds of illegal expressions, such + * as having forward references to variables which have not been computed. + * There are actually several different ways in which this rule can be + * violated, including having forward references within the SELECT (or our + * COMPUTE). + * + * @todo correct rejection tests when the SELECT or HAVING clause references a + * variable not defined in the aggregated solution groups and not wrapped + * by an aggregate function. + * + * @todo test to verify that the evaluation of the aggregate functions within + * each group are independent (they have internal state to track the + * running value of the aggregate, but that state can not be shared across + * groups). + * + * @todo test with various kinds of type errors. + * + * @todo test with DISTINCT used within aggregate functions (this forces us to + * consider the distinct solutions within groups for those aggregate + * functions which make use of the DISTINCT keyword). + * + * @todo test COUNT(*) and COUNT(DISTINCT *) semantics. + * + * @todo test when some aggregate functions in a GROUP_BY use the DISTINCT + * keyword while others in the same GROUP_BY do not. + * + * @todo test with HAVING constraints. + * + * @todo test with multiple invocations of the operator (or do this in the + * integration stress test). + * + * @todo Is it possible to test these aggregation operators without testing at + * the SPARQL level? */ public class TestMemoryGroupByOp extends TestCase2 { @@ -43,4 +105,227 @@ fail("write tests"); } + /** + * Based on an example in the SPARQL 1.1 Working Draft. + * + * <pre> + * @prefix : <http://books.example/> . + * + * :org1 :affiliates :auth1, :auth2 . + * :auth1 :writesBook :book1, :book2 . + * :book1 :price 9 . + * :book2 :price 5 . + * :auth2 :writesBook :book3 . + * :book3 :price 7 . + * :org2 :affiliates :auth3 . + * :auth3 :writesBook :book4 . + * :book4 :price 7 . + * </pre> + * + * <pre> + * PREFIX : <http://books.example/> + * SELECT ?org, (SUM(?lprice) AS ?totalPrice) + * WHERE { + * ?org :affiliates ?auth . + * ?auth :writesBook ?book . + * ?book :price ?lprice . + * } + * GROUP BY ?org + * </pre> + * + * The solutions input to the GROUP_BY are: + * + * <pre> + * ?org ?auth ?book ?lprice + * org1 auth1 book1 9 + * org1 auth1 book3 5 + * org1 auth2 book3 7 + * org2 auth3 book4 7 + * </pre> + * + * The aggregated solutions groups are: + * + * <pre> + * ?org ?totalPrice + * org1 21 + * org2 7 + * </pre> + * + * @todo Do variant with <code>HAVING (SUM(?lprice) > 10)</code>. The + * solutions are: + * <pre> + * ?org ?totalPrice + * org1 21 + * </pre> + * + * @throws ExecutionException + * @throws InterruptedException + */ + public void test_simpleGroupBy() { + + final IVariable<?> org = Var.var("org"); + final IVariable<?> auth = Var.var("auth"); + final IVariable<?> book = Var.var("book"); + final IVariable<?> lprice = Var.var("lprice"); + final IVariable<?> totalPrice = Var.var("totalPrice"); + + final IConstant<String> org1 = new Constant<String>("org1"); + final IConstant<String> org2 = new Constant<String>("org2"); + final IConstant<String> auth1 = new Constant<String>("auth1"); + final IConstant<String> auth2 = new Constant<String>("auth2"); + final IConstant<String> auth3 = new Constant<String>("auth3"); + final IConstant<String> book1 = new Constant<String>("book1"); + final IConstant<String> book2 = new Constant<String>("book2"); + final IConstant<String> book3 = new Constant<String>("book3"); + final IConstant<String> book4 = new Constant<String>("book4"); + final IConstant<Integer> price5 = new Constant<Integer>(5); + final IConstant<Integer> price7 = new Constant<Integer>(7); + final IConstant<Integer> price9 = new Constant<Integer>(9); + final IConstant<Integer> price21 = new Constant<Integer>(21); + + final int groupById = 1; + + final GroupByOp query = new MemoryGroupByOp(new BOp[] {}, NV + .asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, groupById),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(PipelineOp.Annotations.PIPELINED, true),// + new NV(PipelineOp.Annotations.THREAD_SAFE, false),// + new NV(GroupByOp.Annotations.SELECT, // + new IVariable[] { org, lprice }), // + new NV(GroupByOp.Annotations.COMPUTE,// + new IValueExpression[] { new SUMInt( + false/* distinct */, + (IValueExpression) lprice) }), // + new NV(GroupByOp.Annotations.GROUP_BY,// + new IValueExpression[] { org }) // + })); + + /* the test data: + * + * ?org ?auth ?book ?lprice + * org1 auth1 book1 9 + * org1 auth1 book3 5 + * org1 auth2 book3 7 + * org2 auth3 book4 7 + */ + final IBindingSet data [] = new IBindingSet [] + { + new ArrayBindingSet ( new IVariable<?> [] { org, auth, book, lprice }, new IConstant [] { org1, auth1, book1, price9 } ) + , new ArrayBindingSet ( new IVariable<?> [] { org, auth, book, lprice }, new IConstant [] { org1, auth1, book2, price5 } ) + , new ArrayBindingSet ( new IVariable<?> [] { org, auth, book, lprice }, new IConstant [] { org1, auth2, book3, price7 } ) + , new ArrayBindingSet ( new IVariable<?> [] { org, auth, book, lprice }, new IConstant [] { org2, auth3, book4, price7 } ) + }; + + /* the expected solutions: + * + * ?org ?totalPrice + * org1 21 + * org2 7 + */ + final IBindingSet expected [] = new IBindingSet [] + { + new ArrayBindingSet ( new IVariable<?> [] { org, totalPrice }, new IConstant [] { org1, price21 } ) + , new ArrayBindingSet ( new IVariable<?> [] { org, totalPrice }, new IConstant [] { org1, price7 } ) + } ; + + final BOpStats stats = query.newStats () ; + + final IAsynchronousIterator<IBindingSet[]> source = new ThickAsynchronousIterator<IBindingSet[]> ( new IBindingSet [][] { data } ) ; + + final IBlockingBuffer<IBindingSet[]> sink = new BlockingBufferWithStats<IBindingSet[]>(query, stats); + + final IRunningQuery runningQuery = new MockRunningQuery(null/* fed */ + , null/* indexManager */ + ); + final BOpContext<IBindingSet> context = new BOpContext<IBindingSet>( + runningQuery, -1/* partitionId */ + , stats, source, sink, null/* sink2 */ + ); + // Force the solutions to be emitted. + context.setLastInvocation(); + + final FutureTask<Void> ft = query.eval(context); + // Run the query. + { + final Thread t = new Thread() { + public void run() { + ft.run(); + } + }; + t.setDaemon(true); + t.start(); + } + + try { + // Check the solutions. + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + } finally { + /* Always wait for the future afterwards and test it for errors. */ + try { + ft.get(); + } catch (Throwable ex) { + log.error("Evaluation failed: " + ex, ex); + } + } + + assertEquals(1, stats.chunksIn.get()); + assertEquals(4, stats.unitsIn.get()); + assertEquals(2, stats.unitsOut.get()); + assertEquals(1, stats.chunksOut.get()); + + } + + private static class SUMInt extends AggregateBase<Integer> implements IAggregate<Integer> { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public SUMInt(BOpBase op) { + super(op); + } + + public SUMInt(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + public SUMInt(boolean distinct, IValueExpression<Integer> expr) { + super(distinct, expr); + } + + /** + * The running aggregate value. + * <p> + * Note: SUM() returns ZERO if there are no non-error solutions + * presented. + * <p> + * Note: This field is guarded by the monitor on the {@link SUMInt} + * instance. + */ + private transient int aggregated = 0; + + @Override + synchronized + public Integer get(final IBindingSet bindingSet) { + + final IValueExpression<Integer> var = (IValueExpression<Integer>) get(0); + + final Integer val = (Integer) var.get(bindingSet); + + if (val != null) { + + // aggregate non-null values. + aggregated += val; + + } + + return Integer.valueOf(aggregated); + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -27,6 +27,8 @@ package com.bigdata.bop.solutions; +import java.util.concurrent.FutureTask; + import junit.framework.TestCase2; import com.bigdata.bop.BOp; @@ -53,6 +55,8 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ + * + * FIXME This needs to test for computed expressions as well. */ public class TestMemorySortOp extends TestCase2 { @@ -69,7 +73,7 @@ super ( name ) ; } - public void testEval () + public void testEval () { final IVariable<?> x = Var.var ( "x" ) ; final IVariable<?> y = Var.var ( "y" ) ; @@ -154,9 +158,29 @@ , stats, source, sink, null/* sink2 */ ); - query.eval ( context ).run () ; + final FutureTask<Void> ft = query.eval(context); + // Run the query. + { + final Thread t = new Thread() { + public void run() { + ft.run(); + } + }; + t.setDaemon(true); + t.start(); + } - TestQueryEngine.assertSameSolutions ( expected, sink.iterator () ) ; + try { + // Check the solutions. + TestQueryEngine.assertSameSolutions(expected, sink.iterator()); + } finally { + /* Always wait for the future afterwards and test it for errors. */ + try { + ft.get(); + } catch (Throwable ex) { + log.error("Evaluation failed: " + ex, ex); + } + } assertEquals ( 1, stats.chunksIn.get () ) ; assertEquals ( 10, stats.unitsIn.get () ) ; Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -27,10 +27,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.XSDLongIV; @@ -39,8 +40,11 @@ * sets for the given variable. * * @author thompsonbry + * + * FIXME The application of COUNT(*) must explicitly recognize the + * special variable <code>*</code> */ -public class COUNT extends ImmutableBOp implements IAggregate<IV> { +public class COUNT extends AggregateBase<IV> implements IAggregate<IV> { /** * @@ -51,21 +55,14 @@ super(op); } - /** - * FIXME This must also accept '*' in lieu of a variable. When given a '*' - * (which could be modeled as a special variable name), we count all detail - * records without regard to their bound values. - * - * @param var - */ - public COUNT(IVariable<IV> var) { - this(new BOp[] { var }, null/* annotations */); - } - public COUNT(BOp[] args, Map<String, Object> annotations) { super(args, annotations); } + public COUNT(final boolean distinct, IValueExpression<IV> expr) { + super(distinct, expr); + } + /** * The running aggregate value. * <p> @@ -98,4 +95,14 @@ } + /** + * Overridden to allow <code>COUNT(*)</code>. + */ + @Override + final public boolean isWildcardAllowed() { + + return true; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -30,11 +30,13 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.NV; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; /** * Operator combines the string values over the presented binding sets for the @@ -46,17 +48,32 @@ * FIXME This must only operate on variables which are known to be * materialized RDF Values. */ -public class GROUP_CONCAT extends ImmutableBOp implements IAggregate<Literal> { +public class GROUP_CONCAT extends AggregateBase<Literal> implements IAggregate<Literal> { /** * */ private static final long serialVersionUID = 1L; + public interface Annotations extends AggregateBase.Annotations { + + /** + * Required string property provides the separator used when combining + * the {@link IValueExpression} computed for each solution within the + * group. + */ + String SEPARATOR = AggregateBase.class.getName()+".separator"; + + } + public GROUP_CONCAT(BOpBase op) { super(op); } + public GROUP_CONCAT(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + /** * * @param var @@ -64,14 +81,14 @@ * @param sep * The separator string. */ - public GROUP_CONCAT(IVariable<Literal> var, IConstant<String> sep) { - this(new BOp[] { var, sep }, null/* annotations */); + public GROUP_CONCAT(final boolean distinct, + final IValueExpression<Literal> expr, final IConstant<String> sep) { + this(new BOp[] { expr }, NV.asMap(// + new NV(Annotations.DISTINCT, distinct),// + new NV(Annotations.SEPARATOR, sep)// + )); } - public GROUP_CONCAT(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - /** * The running concatenation of observed bound values. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -29,10 +29,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.IVUtility; @@ -45,10 +46,11 @@ * * @todo What is reported if there are no non-null observations? * - * FIXME This must handle comparisons when the value is not an IV, e.g., - * using {@link ValueComparator}. + * FIXME MIN (and MAX) are defined in terms of the ORDER_BY semantics for + * SPARQL. Therefore, this must handle comparisons when the value is not + * an IV, e.g., using {@link ValueComparator}. */ -public class MAX extends ImmutableBOp implements IAggregate<IV> { +public class MAX extends AggregateBase<IV> implements IAggregate<IV> { /** * @@ -59,14 +61,14 @@ super(op); } - public MAX(IVariable<IV> var) { - this(new BOp[] { var }, null/* annotations */); - } - public MAX(BOp[] args, Map<String, Object> annotations) { super(args, annotations); } + public MAX(boolean distinct, IValueExpression<IV> expr) { + super(distinct, expr); + } + /** * The maximum observed value and initially <code>null</code>. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -29,10 +29,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.IVUtility; @@ -45,10 +46,11 @@ * * @todo What is reported if there are no non-null observations? * - * FIXME This must handle comparisons when the value is not an IV, e.g., - * using {@link ValueComparator}. + * FIXME MIN (and MAX) are defined in terms of the ORDER_BY semantics for + * SPARQL. Therefore, this must handle comparisons when the value is not + * an IV, e.g., using {@link ValueComparator}. */ -public class MIN extends ImmutableBOp implements IAggregate<IV> { +public class MIN extends AggregateBase<IV> implements IAggregate<IV> { /** * @@ -59,14 +61,14 @@ super(op); } - public MIN(IVariable<IV> var) { - this(new BOp[] { var }, null/* annotations */); - } - public MIN(BOp[] args, Map<String, Object> annotations) { super(args, annotations); } + public MIN(boolean distinct, IValueExpression<IV> expr) { + super(distinct, expr); + } + /** * The minimum observed value and initially <code>null</code>. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -27,10 +27,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.rdf.internal.IV; /** @@ -40,7 +41,7 @@ * * @author thompsonbry */ -public class SAMPLE extends ImmutableBOp implements IAggregate<IV> { +public class SAMPLE extends AggregateBase<IV> implements IAggregate<IV> { /** * @@ -51,14 +52,14 @@ super(op); } - public SAMPLE(IVariable<IV> var) { - this(new BOp[] { var }, null/* annotations */); - } - public SAMPLE(BOp[] args, Map<String, Object> annotations) { super(args, annotations); } + public SAMPLE(boolean distinct, IValueExpression<IV> expr) { + super(distinct, expr); + } + /** * The sampled value and initially <code>null</code>. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -29,10 +29,11 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpBase; -import com.bigdata.bop.IAggregate; import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IValueExpression; import com.bigdata.bop.IVariable; -import com.bigdata.bop.ImmutableBOp; +import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.aggregate.IAggregate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.IVUtility; import com.bigdata.rdf.internal.XSDLongIV; @@ -49,7 +50,7 @@ * the numeric values - perhaps we should just get rid of the option to * not inline and require people to export/import for an upgrade). */ -public class SUM extends ImmutableBOp implements IAggregate<IV> { +public class SUM extends AggregateBase<IV> implements IAggregate<IV> { /** * @@ -60,14 +61,14 @@ super(op); } - public SUM(IVariable<IV> var) { - this(new BOp[] { var }, null/* annotations */); - } - public SUM(BOp[] args, Map<String, Object> annotations) { super(args, annotations); } + public SUM(boolean distinct, IValueExpression<IV> expr) { + super(distinct, expr); + } + /** * The running aggregate value. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java 2011-02-10 19:46:37 UTC (rev 4190) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java 2011-02-10 22:08:22 UTC (rev 4191) @@ -39,12 +39,9 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstraint; import com.bigdata.bop.IValueExpression; -import com.bigdata.bop.IVariable; -import com.bigdata.bop.NV; import com.bigdata.bop.Var; import com.bigdata.bop.constraint.BOpConstraint; import com.bigdata.bop.constraint.OR; -import com.bigdata.rdf.internal.constraints.SameTermBOp; /** * Unit tests for {@link BOpUtility}. @@ -64,6 +61,19 @@ super(name); } + private void eatData(/*final int expectedLength, */final Iterator<?> itr) { + int i = 1; + while (itr.hasNext()) { + final Object t = itr.next(); +// System.err.print(i+" ");// + " : " + t); +// assertTrue("index=" + i + ", expected=" + expected[i] + ", actual=" +// + t, expected[i].equals(t)); + i++; + } +// System.err.println(""); +// assertEquals("#visited", expectedLength, i); + } + /** * Unit test for {@link BOpUtility#getSpannedVariables(BOp)}. */ @@ -73,7 +83,7 @@ IConstraint bop = null; - final int count = 100; + final int count = 99; for (int i = 0; i < count; i++) { @@ -93,22 +103,20 @@ a,// }; - int i = 0; - final Iterator<IVariable<?>> itr = BOpUtility - .getSpannedVariables(bop); - while (itr.hasNext()) { - final BOp t = itr.next(); - System.out.println(i + " : " + t); -// assertTrue("index=" + i + ", expected=" + expected[i] + ", actual=" -// + t, expected[i].equals(t)); - i++; - } + System.err.println("preOrderIterator"); + eatData(BOpUtility.preOrderIterator(bop)); - assertEquals(i, expected.length); + System.err.println("preOrderIteratorWithAnnotations"); + eatData(BOpUtility.preOrderIteratorWithAnnotations(bop)); - assertSameIterator(expected, BOpUtility - .getSpannedVariables(bop)); + System.err.println("getSpannedVariables"); + eatData(BOpUtility.getSpannedVariables(bop)); + + // @todo make the returned set distinct? + // @todo verify the actual data visited. + assertSameIterator(expected, BOpUtility.getSpannedVariables(bop)); + } private static class DummyConstraint extends BOpConstraint { This was sent by the SourceForge.net col... [truncated message content] |
From: <mrp...@us...> - 2011-02-10 19:46:43
|
Revision: 4190 http://bigdata.svn.sourceforge.net/bigdata/?rev=4190&view=rev Author: mrpersonick Date: 2011-02-10 19:46:37 +0000 (Thu, 10 Feb 2011) Log Message: ----------- unit test for spanned variables Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2011-02-10 19:43:29 UTC (rev 4189) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java 2011-02-10 19:46:37 UTC (rev 4190) @@ -31,16 +31,11 @@ import java.util.Map; import java.util.concurrent.FutureTask; -import com.bigdata.bop.IPredicate.Annotations; -import com.bigdata.bop.ap.E; -import com.bigdata.bop.ap.Predicate; -import com.bigdata.bop.bset.StartOp; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.bop.solutions.SliceOp; -import com.bigdata.journal.ITx; - import junit.framework.TestCase2; +import com.bigdata.bop.constraint.BOpConstraint; +import com.bigdata.bop.constraint.OR; + /** * Unit tests for {@link BOpUtility}. * @@ -425,6 +420,73 @@ } /** + * Unit test for {@link BOpUtility#getSpannedVariables(BOp)}. + */ + public void test_getSpannedVariables2() { + + final IValueExpression<?> a = Var.var("a"); + + IConstraint bop = null; + + final int count = 100; + + for (int i = 0; i < count; i++) { + + final IConstraint c = new DummyConstraint( + new BOp[] { a, new Constant<Integer>(i) }, + null/*annotations*/); + + if (bop == null) { + bop = c; + } else { + bop = new OR(c, bop); + } + + } + + final Object[] expected = new Object[]{// + a,// + }; + + int i = 0; + final Iterator<IVariable<?>> itr = BOpUtility + .getSpannedVariables(bop); + while (itr.hasNext()) { + final BOp t = itr.next(); + System.out.println(i + " : " + t); +// assertTrue("index=" + i + ", expected=" + expected[i] + ", actual=" +// + t, expected[i].equals(t)); + i++; + } + + assertEquals(i, expected.length); + + assertSameIterator(expected, BOpUtility + .getSpannedVariables(bop)); + + } + + private static class DummyConstraint extends BOpConstraint { + + /** + * + */ + private static final long serialVersionUID = 1942393209821562541L; + + public DummyConstraint(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + public DummyConstraint(BOpBase op) { + super(op); + } + + public boolean accept(IBindingSet bindingSet) { + throw new RuntimeException(); + } + } + + /** * Unit test for {@link BOpUtility#getIndex(BOp)} using valid inputs. */ public void test_getIndex() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-02-10 19:43:35
|
Revision: 4189 http://bigdata.svn.sourceforge.net/bigdata/?rev=4189&view=rev Author: mrpersonick Date: 2011-02-10 19:43:29 +0000 (Thu, 10 Feb 2011) Log Message: ----------- unit test for spanned variables Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java Added: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestBOpUtility.java 2011-02-10 19:43:29 UTC (rev 4189) @@ -0,0 +1,134 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 27, 2010 + */ + +package com.bigdata.bop.rdf; + +import java.util.Iterator; +import java.util.Map; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpBase; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.Var; +import com.bigdata.bop.constraint.BOpConstraint; +import com.bigdata.bop.constraint.OR; +import com.bigdata.rdf.internal.constraints.SameTermBOp; + +/** + * Unit tests for {@link BOpUtility}. + */ +public class TestBOpUtility extends TestCase2 { + + /** + * + */ + public TestBOpUtility() { + } + + /** + * @param name + */ + public TestBOpUtility(String name) { + super(name); + } + + /** + * Unit test for {@link BOpUtility#getSpannedVariables(BOp)}. + */ + public void test_getSpannedVariables() { + + final IValueExpression<?> a = Var.var("a"); + + IConstraint bop = null; + + final int count = 100; + + for (int i = 0; i < count; i++) { + + final IConstraint c = new DummyConstraint( + new BOp[] { a, new Constant<Integer>(i) }, + null/*annotations*/); + + if (bop == null) { + bop = c; + } else { + bop = new OR(c, bop); + } + + } + + final Object[] expected = new Object[]{// + a,// + }; + + int i = 0; + final Iterator<IVariable<?>> itr = BOpUtility + .getSpannedVariables(bop); + while (itr.hasNext()) { + final BOp t = itr.next(); + System.out.println(i + " : " + t); +// assertTrue("index=" + i + ", expected=" + expected[i] + ", actual=" +// + t, expected[i].equals(t)); + i++; + } + + assertEquals(i, expected.length); + + assertSameIterator(expected, BOpUtility + .getSpannedVariables(bop)); + + } + + private static class DummyConstraint extends BOpConstraint { + + /** + * + */ + private static final long serialVersionUID = 1942393209821562541L; + + public DummyConstraint(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + public DummyConstraint(BOpBase op) { + super(op); + } + + public boolean accept(IBindingSet bindingSet) { + throw new RuntimeException(); + } + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-02-10 16:39:31
|
Revision: 4188 http://bigdata.svn.sourceforge.net/bigdata/?rev=4188&view=rev Author: mrpersonick Date: 2011-02-10 16:39:25 +0000 (Thu, 10 Feb 2011) Log Message: ----------- disabled select variable computation for now Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-02-09 17:18:18 UTC (rev 4187) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-02-10 16:39:25 UTC (rev 4188) @@ -513,8 +513,8 @@ } // the variables to be retained for each join. - final IVariable<?>[][] selectVars = RuleState - .computeRequiredVarsForEachTail(rule, order); +// final IVariable<?>[][] selectVars = RuleState +// .computeRequiredVarsForEachTail(rule, order); /* * Map the constraints from the variables they use. This way, we can This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-09 17:18:24
|
Revision: 4187 http://bigdata.svn.sourceforge.net/bigdata/?rev=4187&view=rev Author: thompsonbry Date: 2011-02-09 17:18:18 +0000 (Wed, 09 Feb 2011) Log Message: ----------- Fixed problem in the federated query engine unit test for optional joins which goes back to a bug fix to the pipeline join in how it applied CONSTRAINT[]s. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2011-02-09 17:12:31 UTC (rev 4186) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2011-02-09 17:18:18 UTC (rev 4187) @@ -57,7 +57,6 @@ import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.LocalChunkMessage; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.engine.ChunkedRunningQuery; import com.bigdata.bop.engine.TestQueryEngine; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.bop.solutions.SliceOp; @@ -1298,14 +1297,20 @@ new Constant<String>("Paul") }// ), /* - * Plus anything we read from the first access path which - * did not pass the 2nd join. + * No. The CONSTRAINT on the 2nd join [x == y] filters all + * solutions. For solutions where the optional join fails, [y] is + * not bound. Since [y] is part of the constraint on that join we DO + * NOT observe those solutions which only join on the first access + * path. + * +// * Plus anything we read from the first access path which +// * did not pass the 2nd join. */ - new ArrayBindingSet(// - new IVariable[] { Var.var("x"), Var.var("y") },// - new IConstant[] { new Constant<String>("Mary"), - new Constant<String>("Paul") }// - ), +// new ArrayBindingSet(// +// new IVariable[] { Var.var("x"), Var.var("y") },// +// new IConstant[] { new Constant<String>("Mary"), +// new Constant<String>("Paul") }// +// ), }; TestQueryEngine.assertSameSolutionsAnyOrder(expected, @@ -1369,7 +1374,7 @@ // verify query solution stats details. // assertEquals(1L, stats.chunksIn.get()); assertEquals(5L, stats.unitsIn.get()); - assertEquals(5L, stats.unitsOut.get()); + assertEquals(4L, stats.unitsOut.get()); // assertEquals(1L, stats.chunksOut.get()); } @@ -1382,8 +1387,8 @@ // verify query solution stats details. // assertEquals(2L, stats.chunksIn.get()); - assertEquals(5L, stats.unitsIn.get()); - assertEquals(5L, stats.unitsOut.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); // assertEquals(1L, stats.chunksOut.get()); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-09 17:12:37
|
Revision: 4186 http://bigdata.svn.sourceforge.net/bigdata/?rev=4186&view=rev Author: thompsonbry Date: 2011-02-09 17:12:31 +0000 (Wed, 09 Feb 2011) Log Message: ----------- Fixed the deep copy constructor signature for Bind(). Fixed the set of unit tests run by build.xml (CI) (I reconciled them with com.bigdata.TestAll). Fixed unit test for DistinctBindingSets to specify the necessary annotations. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java branches/QUADS_QUERY_BRANCH/build.xml Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-09 17:00:01 UTC (rev 4185) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-09 17:12:31 UTC (rev 4186) @@ -18,7 +18,7 @@ /** * Required deep copy constructor. */ - public Bind(BOpBase op) { + public Bind(Bind<E> op) { super(op); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2011-02-09 17:00:01 UTC (rev 4185) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestDistinctBindingSets.java 2011-02-09 17:12:31 UTC (rev 4186) @@ -36,6 +36,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.Constant; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IConstant; @@ -178,6 +179,10 @@ NV.asMap(new NV[]{// new NV(DistinctBindingSetOp.Annotations.BOP_ID,distinctId),// new NV(DistinctBindingSetOp.Annotations.VARIABLES,new IVariable[]{x}),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// +// new NV(MemorySortOp.Annotations.SHARED_STATE, +// true),// })); // the expected solutions Modified: branches/QUADS_QUERY_BRANCH/build.xml =================================================================== --- branches/QUADS_QUERY_BRANCH/build.xml 2011-02-09 17:00:01 UTC (rev 4185) +++ branches/QUADS_QUERY_BRANCH/build.xml 2011-02-09 17:12:31 UTC (rev 4186) @@ -1706,29 +1706,27 @@ <test name="com.bigdata.striterator.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.counters.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.rawstore.TestAll" todir="${test.results.dir}" unless="testName" /> - <test name="com.bigdata.btree.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.concurrent.TestAll" todir="${test.results.dir}" unless="testName" /> - <test name="com.bigdata.quorum.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.ha.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.io.writecache.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.journal.TestAll" todir="${test.results.dir}" unless="testName" /> -<!-- To run only specific test suites. - <test name="com.bigdata.journal.TestWORMStrategy" todir="${test.results.dir}" unless="testName" /> - <test name="com.bigdata.rwstore.TestRWJournal" todir="${test.results.dir}" unless="testName" /> - --> <!-- Performance of this test suite has regressed and needs to be investigated. - <test name="com.bigdata.journal.ha.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.journal.ha.TestAll" todir="${test.results.dir}" unless="testName" /> --> -<!-- end of specific journal test suite runs. --> - <test name="com.bigdata.resources.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.relation.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.bop.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.relation.rule.eval.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.mdi.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.service.TestAll" todir="${test.results.dir}" unless="testName" /> + <test name="com.bigdata.bop.fed.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.sparse.TestAll" todir="${test.results.dir}" unless="testName" /> <test name="com.bigdata.search.TestAll" todir="${test.results.dir}" unless="testName" /> - <test name="com.bigdata.relation.TestAll" todir="${test.results.dir}" unless="testName" /> + <!-- not suppported yet. + <test name="com.bigdata.bfs.TestAll" todir="${test.results.dir}" unless="testName" /> + --> <!-- See https://sourceforge.net/apps/trac/bigdata/ticket/53 --> <test name="com.bigdata.jini.TestAll" todir="${test.results.dir}" unless="testName" /> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-09 17:00:10
|
Revision: 4185 http://bigdata.svn.sourceforge.net/bigdata/?rev=4185&view=rev Author: thompsonbry Date: 2011-02-09 17:00:01 +0000 (Wed, 09 Feb 2011) Log Message: ----------- More work on the GROUP_BY operator. Defined various aggregate operators (MIN, MAX, SUM, COUNT, etc). They all need unit tests. The semantics of many of these operators needs to be reviewed. Defined BIND(var,expr) operator, which binds the variable to the result of evaluating the value expression as side-effect. Modified the ORDER_BY stress test to verify the ordering imposed. Modified DistinctBindingSetOp to pass the hash map as shared state (it was only distinct for each invocation). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_SortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/TestAll.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemoryGroupByOp.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/COUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/GROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/SUM.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestCOUNT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestGROUP_CONCAT.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestSAMPLE.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestSUM.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -27,11 +27,15 @@ */ package com.bigdata.bop; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; +import com.bigdata.bop.engine.IQueryClient; import com.bigdata.bop.engine.IRunningQuery; +import com.bigdata.bop.engine.QueryEngine; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; @@ -59,6 +63,39 @@ private final IBlockingBuffer<E[]> sink2; + private final AtomicBoolean lastInvocation = new AtomicBoolean(false); + + /** + * Set by the {@link QueryEngine} when the criteria specified by + * {@link #isLastInvocation()} are satisfied. + */ + public void setLastInvocation() { + lastInvocation.set(true); + } + + /** + * <code>true</code> iff this is the last invocation of the operator. The + * property is only set to <code>true</code> for operators which: + * <ol> + * <li>{@link BOp.Annotations#EVALUATION_CONTEXT} is + * {@link BOpEvaluationContext#CONTROLLER}</li> + * <li>{@link PipelineOp.Annotations#THREAD_SAFE} is <code>false</code></li> + * </ol> + * Under these circumstances, it is possible for the {@link IQueryClient} to + * atomically decide that a specific invocation of the operator task for the + * query will be the last invocation for that task. This is not possible if + * the operator allows concurrent evaluation tasks. Sharded operators are + * intrinsically concurrent since they can evaluate at each shard in + * parallel. This is why the evaluation context is locked to the query + * controller. In addition, the operator must declare that it is NOT thread + * safe in order for the query engine to serialize its evaluation tasks. + * + * @return + */ + public boolean isLastInvocation() { + return lastInvocation.get(); + } + /** * The interface for a running query. * <p> Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Bind.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,66 @@ +package com.bigdata.bop; + +import java.util.Map; + +/** + * Operator causes a variable to be bound to the result of its evaluation as a + * side-effect. + * + * @author thompsonbry + */ +public class Bind<E> extends ImmutableBOp implements IValueExpression<E> { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Required deep copy constructor. + */ + public Bind(BOpBase op) { + super(op); + } + + /** + * @param var + * The {@link IVariable} which will be bound to result of + * evaluating the associated value expression. + * @param expr + * The {@link IValueExpression} to be evaluated. + */ + public Bind(IVariable<E> var, IValueExpression<E> expr) { + + this(new BOp[] { var, expr }, null/* annotations */); + + } + + /** + * Required shallow copy constructor. + * @param args + * @param annotations + */ + public Bind(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + @SuppressWarnings("unchecked") + @Override + public E get(final IBindingSet bindingSet) { + + final IVariable<E> var = (IVariable<E>) get(0); + + final IValueExpression<E> expr = (IValueExpression<E>) get(1); + + // evaluate the value expression. + E val = expr.get(bindingSet); + + // bind the variable as a side-effect. + bindingSet.set(var, new Constant<E>(val)); + + // return the evaluated value + return val; + + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IAggregate.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,16 @@ +package com.bigdata.bop; + +/** + * An aggregate operator, such as SUM, COUNT, MIN, MAX, etc. + * + * @author thompsonbry + */ +public interface IAggregate<E> extends IValueExpression<E>{ + + /** + * Return the current value of the aggregate (this has a side-effect on the + * internal state of the {@link IAggregate} operator). + */ + E get(IBindingSet bset); + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IValueExpression.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -2,6 +2,11 @@ import java.io.Serializable; +/** + * An expression which may be evaluated to a value. + * + * @author mrpersonick + */ public interface IValueExpression<E> extends BOp, Serializable { /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -109,6 +109,10 @@ * @todo Unit tests for {@link ChunkedRunningQuery} to verify that it * eventually schedules operator tasks which were deferred to * prevent concurrent evaluation. + * + * @todo This is currently not used. However, it could simplify the + * logic for operators, such as SLICE, which otherwise depend on + * {@link #SHARED_STATE} to provide their own synchronization. */ String THREAD_SAFE = PipelineOp.class.getName() + ".threadSafe"; @@ -334,7 +338,27 @@ return getProperty(PipelineOp.Annotations.PIPELINED, PipelineOp.Annotations.DEFAULT_PIPELINED); } - + + /** + * Return <code>true</code> iff concurrent invocations of the operator are + * permitted. + * <p> + * Note: Operators which are not thread-safe still permit concurrent + * evaluation for <em>distinct</em> partitions. In order to ensure that all + * invocations of the operator within a query are serialized (no more than + * one concurrent invocation) you must also specify + * {@link BOpEvaluationContext#CONTROLLER}. + * + * @see Annotations#THREAD_SAFE + * @see BOp.Annotations#EVALUATION_CONTEXT + */ + public boolean isThreadSafe() { + + return getProperty(Annotations.THREAD_SAFE, + Annotations.DEFAULT_THREAD_SAFE); + + } + /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/DistinctBindingSetOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -14,13 +14,17 @@ import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.bindingSet.HashBindingSet; +import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.bop.engine.BOpStats; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; /** * A pipelined DISTINCT operator based on a hash table. + * <p> + * Note: This implementation is a pipelined operator which inspects each chunk + * of solutions as they arrive and those solutions which are distinct for each + * chunk processed. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z @@ -58,8 +62,23 @@ public DistinctBindingSetOp(final BOp[] args, final Map<String, Object> annotations) { - super(args, annotations); + super(args, annotations); + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + + // shared state is used to share the hash table. + if (isSharedState()) { + throw new UnsupportedOperationException(Annotations.SHARED_STATE + + "=" + isSharedState()); + } + } /** @@ -101,6 +120,12 @@ } + public BOpStats newStats() { + + return new DistinctStats(this); + + } + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { return new FutureTask<Void>(new DistinctTask(this, context)); @@ -145,6 +170,37 @@ return true; } } + + /** + * Extends {@link BOpStats} to provide the shared state for the distinct + * solution groups across multiple invocations of the DISTINCT operator. + */ + private static class DistinctStats extends BOpStats { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private final ConcurrentHashMap<Solution, Solution> map; + + public DistinctStats(final DistinctBindingSetOp op) { + + this.map = new ConcurrentHashMap<Solution, Solution>( + op.getInitialCapacity(), op.getLoadFactor(), + op.getConcurrencyLevel()); + + } + + } /** * Task executing on the node. @@ -153,12 +209,12 @@ private final BOpContext<IBindingSet> context; - /** - * A concurrent map whose keys are the bindings on the specified - * variables (the keys and the values are the same since the map - * implementation does not allow <code>null</code> values). - */ - private /*final*/ ConcurrentHashMap<Solution, Solution> map; + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + */ + private final ConcurrentHashMap<Solution, Solution> map; /** * The variables used to impose a distinct constraint. @@ -178,9 +234,8 @@ if (vars.length == 0) throw new IllegalArgumentException(); - this.map = new ConcurrentHashMap<Solution, Solution>( - op.getInitialCapacity(), op.getLoadFactor(), - op.getConcurrencyLevel()); + // The map is shared state across invocations of this operator task. + this.map = ((DistinctStats) context.getStats()).map; } @@ -235,6 +290,7 @@ stats.chunksIn.increment(); stats.unitsIn.add(a.length); + // The distinct solutions accepted from this chunk. final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); int naccepted = 0; @@ -243,14 +299,26 @@ // System.err.println("considering: " + bset); + /* + * Test to see if this solution is distinct from those + * already seen. + */ final IConstant<?>[] vals = accept(bset); if (vals != null) { + /* + * This is a distinct solution. Copy only the + * variables used to select distinct solutions into + * a new binding set and add that to the set of + * [accepted] binding sets which will be emitted by + * this operator. + */ + // System.err.println("accepted: " // + Arrays.toString(vals)); - final HashBindingSet tmp = new HashBindingSet(); + final ListBindingSet tmp = new ListBindingSet(); for (int i = 0; i < vars.length; i++) { @@ -268,12 +336,19 @@ if (naccepted > 0) { + /* + * At least one solution was accepted as distinct, so + * copy the selected solutions to the output of the + * operator. + */ + final IBindingSet[] b = accepted .toArray(new IBindingSet[naccepted]); // System.err.println("output: " // + Arrays.toString(b)); + // copy the distinct solutions to the output. sink.add(b); // stats.unitsOut.add(naccepted); @@ -285,6 +360,23 @@ sink.flush(); + if(context.isLastInvocation()) { + + /* + * Discard the map. + * + * Note: The map can not be discarded (or cleared) until the + * last invocation. However, we only get the benefit of the + * lastInvocation signal if the operator is single threaded + * and running on the query controller. That is not a + * requirement for this DISTINCT implementation, so the map + * is not going to be cleared until the query goes out of + * scope and is swept by GC. + */ + map.clear(); + + } + // done. return null; @@ -292,9 +384,6 @@ sink.close(); - // discard the map. - map = null; - } } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/GroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,111 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 4, 2010 + */ + +package com.bigdata.bop.solutions; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.PipelineOp; + +/** + * Base class for operators which perform aggregation operations on binding + * sets. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: SortOp.java 3665 2010-09-28 16:53:22Z thompsonbry $ + */ +abstract public class GroupByOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The ordered set of variables declared by {@link #COMPUTE} which are + * projected out of the group by operator. + */ + String SELECT = GroupByOp.class.getName() + ".select"; + + /** + * The ordered set of {@link IValueExpression}s which are to be + * computed. + * + * TODO This really needs to be VAR := EXPR. EXPR can only reference the + * source variables or variables declared earlier in the ordered + * collection. If an EXPR references a source variable, then it must + * wrap that source variable with an aggregation operator (SUM, COUNT, + * MIN, MAX, AVG, GROUP_CONCAT, or SAMPLE). Only source variables and + * constants may appear as operands of aggregation operators. [We need a + * BIND() operator for this, which might wind up being the same as a + * LET.] + * + * TODO Decide how we will handle AVG. + */ + String COMPUTE = GroupByOp.class.getName() + ".compute"; + + /** + * The ordered set of or one or more variables defining the aggregation + * groups (required). The variables named in this collection MUST be + * variables declared for the incoming solutions. + */ + String GROUP_BY = GroupByOp.class.getName() + ".groupBy"; + + /** + * An {@link IConstraint}[] applied to the aggregated solutions + * (optional). The {@link IConstraint}s MAY NOT include aggregation + * operators and may only reference variables declared by + * {@link #COMPUTE}. + * + * TODO Should be the BEV of an {@link IValueExpression}, which might or + * might not be an {@link IConstraint}. + */ + String HAVING = GroupByOp.class.getName() + ".having"; + + } + + /** + * @param op + */ + public GroupByOp(final GroupByOp op) { + super(op); + } + + /** + * @param args + * @param annotations + */ + public GroupByOp(final BOp[] args, final Map<String, Object> annotations) { + super(args, annotations); + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,551 @@ +package com.bigdata.bop.solutions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; + +import org.apache.log4j.Logger; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.ConcurrentHashMapAnnotations; +import com.bigdata.bop.IAggregate; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.IValueExpression; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * An in-memory GROUP_BY for binding sets. + * <p> + * Note: This implementation is a pipelined operator which aggregates each chunk + * of solutions as they arrive and outputs empty messages (containing no + * solutions) until the last chunk is consumed. This operator relies on + * {@link BOpContext#isLastInvocation()} in order to decide when to write its + * output solutions, which requires the operator to (a) be evaluated on the + * controller and (b) declare itself as NOT thread-safe. In addition, the + * operator must be marked as SHARED_STATE := true such that the hash table + * associated with the {@link BOpStats} is shared across multiple invocations of + * this operator for a given query. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: DistinctElementFilter.java 3466 2010-08-27 14:28:04Z + * thompsonbry $ + * + * @todo GROUP_BY implementation which depends on an ORDER_BY operator to setup + * the correct order and then performs the aggregations in a single pass + * over the ordered data. + * + * @todo GROUP_BY implementation using an HTree suitable for use when the #of + * groups is very large. The HTree would be associated with the allocation + * context for the (queryId,bopId(,shardId))). (The shardId would be used + * iff the GROUP_BY operator was hash partitioned across the nodes.) + * + * @todo In scale-out, we can hash partition the GROUP_BY operator over the + * nodes as long as all of the aggregation functions can be combined from + * the partitions. If AVG is used, then it needs to be replaced by SUM and + * COUNT in the GROUP_BY operator and the use of the AVG in the SELECT + * needs to be rewritten as (SUM(v)/COUNT(v)). + * + * @todo As a special twist, there can also be memory burdens, even with a small + * #of groups, when the aggregated solution data is very large and a + * GROUP_CONCAT function is specified such that it combines a large #of + * input solution bindings into a big string. + * + * FIXME How should we handle DISTINCT semantics for GROUP_BY? (I think + * that we just insert a {@link DistinctBindingSetOp} before the + * GROUP_BY). + * + * FIXME How should we handle nulls (missing values) during aggregation? + * (It appears that nulls and type errors are generally handled by the + * aggregate operator ignoring the detail record). + * + * FIXME All of the {@link IAggregate} operators have a side-effect. In + * order for them to have isolated side-effects for distinct groups, they + * would have to either internalize a value map for the group or each + * group would have to use a distinct instance. If the latter, then + * provide for this on the operator, e.g., newInstance(), and document + * why. + */ +public class MemoryGroupByOp extends GroupByOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final transient Logger log = Logger + .getLogger(MemoryGroupByOp.class); + + public interface Annotations extends GroupByOp.Annotations, + ConcurrentHashMapAnnotations { + + } + + /** + * Required deep copy constructor. + */ + public MemoryGroupByOp(final MemoryGroupByOp op) { + super(op); + } + + /** + * Required shallow copy constructor. + */ + public MemoryGroupByOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + switch (getEvaluationContext()) { + case CONTROLLER: + break; + default: + throw new UnsupportedOperationException( + Annotations.EVALUATION_CONTEXT + "=" + + getEvaluationContext()); + } + + // shared state is used to share the hash table. + if (isSharedState()) { + throw new UnsupportedOperationException(Annotations.SHARED_STATE + + "=" + isSharedState()); + } + + // single threaded required for pipelining w/ isLastInvocation() hook. + if (isThreadSafe()) { + throw new UnsupportedOperationException(Annotations.THREAD_SAFE + + "=" + isThreadSafe()); + } + + // operator is pipelined, but relies on isLastEvaluation() hook. + if (!isPipelined()) { + throw new UnsupportedOperationException(Annotations.PIPELINED + "=" + + isPipelined()); + } + + } + + /** + * @see Annotations#INITIAL_CAPACITY + */ + public int getInitialCapacity() { + + return getProperty(Annotations.INITIAL_CAPACITY, + Annotations.DEFAULT_INITIAL_CAPACITY); + + } + + /** + * @see Annotations#LOAD_FACTOR + */ + public float getLoadFactor() { + + return getProperty(Annotations.LOAD_FACTOR, + Annotations.DEFAULT_LOAD_FACTOR); + + } + + /** + * @see Annotations#CONCURRENCY_LEVEL + */ + public int getConcurrencyLevel() { + + return getProperty(Annotations.CONCURRENCY_LEVEL, + Annotations.DEFAULT_CONCURRENCY_LEVEL); + + } + + public BOpStats newStats() { + + return new GroupByStats(this); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new GroupByTask(this, context)); + + } + + /** + * Wrapper used for the solution groups in the {@link ConcurrentHashMap}. + */ + private static class SolutionGroup { + + /** The precomputed hash code for {@link #vals}. */ + private final int hash; + + /** The values for the groupBy variables which define a distinct group. */ + private final IConstant<?>[] vals; + + /** + * The values for the variables which are being computed by the + * aggregation. The binding set is when the {@link SolutionGroup} is + * first constructed. + * <p> + * Note: Updates to this binding set MUST be protected by synchronizing + * on {@link SolutionGroup}. + */ + private final IBindingSet aggregatedBSet; + + public String toString() { + return super.toString() + // + "{group=" + Arrays.toString(vals) + // + ",solution=" + aggregatedBSet + // + "}"; + } + + public SolutionGroup(final IConstant<?>[] vals) { + this.vals = vals; + this.hash = java.util.Arrays.hashCode(vals); + this.aggregatedBSet = new ListBindingSet(); + } + + public int hashCode() { + return hash; + } + + public boolean equals(final Object o) { + if (this == o) + return true; + if (!(o instanceof SolutionGroup)) { + return false; + } + final SolutionGroup t = (SolutionGroup) o; + if (vals.length != t.vals.length) + return false; + for (int i = 0; i < vals.length; i++) { + // @todo verify that this allows for nulls with a unit test. + if (vals[i] == t.vals[i]) + continue; + if (vals[i] == null) + return false; + if (!vals[i].equals(t.vals[i])) + return false; + } + return true; + } + + /** + * Apply the {@link IValueExpression}s to compute the updated variable + * bindings in the {@link SolutionGroup}. + * + * @param bset + * An input solution. + * @param compute + * The ordered array of {@link IValueExpression}s which + * define the aggregated variables. + */ + public void aggregate(final IBindingSet bset, + final IValueExpression<?>[] compute) { + + /* + * @todo The aggregated variables are all undefined the first time a + * source binding set is presented and need to be initialized to an + * appropriate value. + */ + + // synchronize for visibility. + synchronized(this) { + } + + throw new UnsupportedOperationException(); + + } + + } // SolutionGroup + + /** + * Extends {@link BOpStats} to provide the shared state for the solution + * groups across multiple invocations of the GROUP_BY operator. + */ + private static class GroupByStats extends BOpStats { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private /*final*/ ConcurrentHashMap<SolutionGroup, SolutionGroup> map; + + public GroupByStats(final MemoryGroupByOp op) { + + this.map = new ConcurrentHashMap<SolutionGroup, SolutionGroup>( + op.getInitialCapacity(), op.getLoadFactor(), + op.getConcurrencyLevel()); + + } + + } + + /** + * Task executing on the node. + */ + static private class GroupByTask implements Callable<Void> { + + private final BOpContext<IBindingSet> context; + + /** + * A concurrent map whose keys are the bindings on the specified + * variables (the keys and the values are the same since the map + * implementation does not allow <code>null</code> values). + * <p> + * Note: The map is shared state and can not be discarded or cleared + * until the last invocation!!! + */ + private final ConcurrentHashMap<SolutionGroup, SolutionGroup> map; + + /** + * The ordered array of variables which define the distinct groups to + * be aggregated. + */ + private final IVariable<?>[] groupBy; + + /** + * The {@link IValueExpression}s used to compute each of the variables + * in the aggregated solutions. + */ + private final IValueExpression<?>[] compute; + + /** + * Optional constraints applied to the aggregated solutions. + */ + private final IConstraint[] having; + + /** + * Optional set of variables to be projected out of the GROUP_BY + * operator. When <code>null</code>, all variables will be projected + * out. + */ + private final IVariable<?>[] select; + + GroupByTask(final MemoryGroupByOp op, + final BOpContext<IBindingSet> context) { + + this.context = context; + + // must be non-null, and non-empty array w/o dups. + this.groupBy = (IVariable[]) op + .getRequiredProperty(GroupByOp.Annotations.GROUP_BY); + + if (groupBy == null) + throw new IllegalArgumentException(); + + if (groupBy.length == 0) + throw new IllegalArgumentException(); + + /* + * Must be non-null, and non-empty array. Any variables in the + * source solutions may only appear within aggregation operators + * such as SUM, COUNT, etc. Variables declared in [compute] may be + * referenced inside the value expressions as long as they do not + * appear within an aggregation function, but they they must be + * defined earlier in the ordered compute[]. The value expressions + * must include an assignment to the appropriate aggregate variable. + * + * FIXME This must include a LET or BIND to assign the computed + * value to the appropriate variable. + * + * FIXME verify references to unaggregated and aggregated variables. + */ + this.compute = (IValueExpression<?>[]) op + .getRequiredProperty(GroupByOp.Annotations.COMPUTE); + + if (compute == null) + throw new IllegalArgumentException(); + + if (compute.length == 0) + throw new IllegalArgumentException(); + + // may be null or empty[]. + this.having = (IConstraint[]) op + .getRequiredProperty(GroupByOp.Annotations.HAVING); + + /* + * The variables to project out of the GROUP_BY operator. This may + * be null, but not empty[]. + * + * TODO Variables may only appear once and must be distinct from the + * source variables. + */ + this.select = (IVariable[]) op + .getRequiredProperty(GroupByOp.Annotations.SELECT); + + if (select != null && select.length == 0) + throw new IllegalArgumentException(); + + // The map is shared state across invocations of this operator task. + this.map = ((GroupByStats) context.getStats()).map; + + } + + /** + * Return the "row" for the groupBy variables. + * + * @param bset + * The binding set to be filtered. + * + * @return The distinct as bound values -or- <code>null</code> if the + * binding set duplicates a solution which was already accepted. + */ + private SolutionGroup accept(final IBindingSet bset) { + + final IConstant<?>[] r = new IConstant<?>[groupBy.length]; + + for (int i = 0; i < groupBy.length; i++) { + + /* + * Note: This allows null's. + * + * @todo write a unit test when some variables are not bound. + */ + r[i] = bset.get(groupBy[i]); + + } + + final SolutionGroup s = new SolutionGroup(r); + + map.putIfAbsent(s, s); + + return s; + + } + + public Void call() throws Exception { + + final BOpStats stats = context.getStats(); + + final boolean isLastInvocation = context.isLastInvocation(); + + final IAsynchronousIterator<IBindingSet[]> itr = context + .getSource(); + + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + try { + + /* + * Present each source solution in turn, identifying the group + * into which it falls and then applying the value expressions + * to update the aggregated variable bindings for that group. + */ + while (itr.hasNext()) { + + final IBindingSet[] a = itr.next(); + + stats.chunksIn.increment(); + stats.unitsIn.add(a.length); + + for (IBindingSet bset : a) { + + // identify the solution group. + final SolutionGroup solutionGroup = accept(bset); + + // aggregate the bindings + solutionGroup.aggregate(bset, compute); + + } + + } + + if (isLastInvocation) { + + /* + * Write aggregated solutions on the sink, applying the + * [having] filter to remove any solutions which do not + * satisfy its constraints. + */ + + final List<IBindingSet> accepted = new LinkedList<IBindingSet>(); + + int naccepted = 0; + + for(SolutionGroup solutionGroup: map.values()) { + + synchronized(solutionGroup) { + + IBindingSet bset = solutionGroup.aggregatedBSet; + + // verify optional constraint(s) + if (having != null + && !BOpUtility.isConsistent(having, bset)) { + + // skip this group. + continue; + + } + + /* + * We will accept this solution group, so filter out + * any variables which are not being projected out + * of this operator. + */ + if (log.isDebugEnabled()) + log.debug("accepted: " + solutionGroup); + + // optionally strip off unnecessary variables. + bset = select == null ? bset : bset + .copy(select); + + accepted.add(bset); + + naccepted++; + + } + + } + + /* + * Output the aggregated bindings for the accepted + * solutions. + */ + if (naccepted > 0) { + + final IBindingSet[] b = accepted + .toArray(new IBindingSet[naccepted]); + + sink.add(b); + + // flush the output. + sink.flush(); + + // discard the map. + map.clear(); + + } + + } + + // done. + return null; + + } finally { + + sink.close(); + + } + + } // call() + + } // GroupByTask + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -79,6 +79,9 @@ // pure binding set operators. suite.addTest(com.bigdata.bop.bset.TestAll.suite()); + // bind(var,expr) + suite.addTestSuite(TestBind.class); + // index operators. suite.addTest(com.bigdata.bop.ndx.TestAll.suite()); Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBind.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,72 @@ +/** + * + */ +package com.bigdata.bop; + +import com.bigdata.bop.bindingSet.ListBindingSet; + +import junit.framework.TestCase2; + +/** + * Unit tests for {@link Bind}. + * + * @author thompsonbry + * + * @todo Write a test where the {@link IValueExpression} given to bind is more + * complex than an {@link IVariable} or an {@link IConstant}. + */ +public class TestBind extends TestCase2 { + + /** + * + */ + public TestBind() { + } + + /** + * @param name + */ + public TestBind(String name) { + super(name); + } + + /** + * Unit test of bind(var,constant). + */ + public void test_bind_constant() { + + final IBindingSet bset = new ListBindingSet(); + + final IVariable<?> y = Var.var("y"); + + // verify bind() returns the value of the constant. + assertEquals(Integer.valueOf(12), new Bind(y, new Constant<Integer>( + Integer.valueOf(12))).get(bset)); + + // verify side-effect on the binding set. + assertEquals(new Constant<Integer>(Integer.valueOf(12)), bset.get(y)); + + } + + /** + * Unit test of bind(var,otherVar). + */ + public void test_bind_var() { + + final IBindingSet bset = new ListBindingSet(); + + final IVariable<?> x = Var.var("x"); + + final IVariable<?> y = Var.var("y"); + + bset.set(x, new Constant<Integer>(12)); + + // verify bind() returns the value of the other variable. + assertEquals(Integer.valueOf(12), new Bind(y, x).get(bset)); + + // verify side-effect on the binding set. + assertEquals(new Constant<Integer>(Integer.valueOf(12)), bset.get(y)); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestDeepCopy.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -85,6 +85,7 @@ Constant.class,// Var.class,// QuoteOp.class,// + Bind.class,// // com.bigdata.bop.constraint EQ.class,// NE.class,// Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2011-02-08 17:50:40 UTC (rev 4184) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -110,17 +110,17 @@ * multiple chunks of solutions. */ - // stress test for SliceOp. + // stress test for SLICE suite.addTestSuite(TestQueryEngine_Slice.class); - // ORDER BY implementations. + // stress test for ORDER_BY suite.addTestSuite(TestQueryEngine_SortOp.class); - // @todo DISTINCT implementations. -// suite.addTestSuite(TestQueryEngine_SortOp.class); + // stress test for DISTINCT. + suite.addTestSuite(TestQueryEngine_DistinctOp.class); - // @todo GROUP BY implementations. -// suite.addTestSuite(TestQueryEngine_SortOp.class); + // stress test for GROUP_BY. + suite.addTestSuite(TestQueryEngine_GroupByOp.class); return suite; Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_DistinctOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,306 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.solutions.ComparatorOp; +import com.bigdata.bop.solutions.ISortOrder; +import com.bigdata.bop.solutions.MemorySortOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.bop.solutions.SortOrder; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * Test suite for DISTINCT solution operators when integrated with the query + * engine. This test suite is designed to examine cases where the DISTINCT + * operator will have to buffer multiple chunks of solutions before finally + * reporting the aggregated solutions. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: TestQueryEngine2.java 3489 2010-09-01 18:27:35Z thompsonbry $ + * + * @todo Test each DISTINCT implementation here. + */ +public class TestQueryEngine_DistinctOp extends TestCase2 { + + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + Journal jnl; + QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * + */ + public TestQueryEngine_DistinctOp() { + } + + /** + * @param name + */ + public TestQueryEngine_DistinctOp(String name) { + super(name); + } + + public void testStressThreadSafe() throws Exception { + + for (int i = 0; i < 100; i++) { + + try { + + test_distinct_threadSafe(); + + } catch (Throwable t) { + + fail("Failed after " + i + " trials", t); + + } + + } + + } + + /** + * @todo Unit test for DISTINCT. How to judge correctness? + */ + public void test_distinct_threadSafe() throws Exception { + + final long timeout = 10000; // ms + + final int ntrials = 10000; + + final int poolSize = 10; + + doDistinctTest(10000/* maxInt */, timeout, ntrials, poolSize); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * + * @param timeout + * @param ntrials + * @param poolSize + * + * @return The #of successful trials. + * + * @throws Exception + */ + protected void doDistinctTest(final int maxInt, + final long timeout, final int ntrials, final int poolSize) + throws Exception { + + fail("write test helper"); + + int ngiven = 0; + final IVariable<?> a = Var.var("a"); + final IBindingSet[][] chunks = new IBindingSet[ntrials][]; + { + final Random r = new Random(); + for (int i = 0; i < chunks.length; i++) { + // random non-zero chunk size + chunks[i] = new IBindingSet[r.nextInt(10) + 1]; + for (int j = 0; j < chunks[i].length; j++) { + final IBindingSet bset = new ListBindingSet(); + bset.set(a, new Constant<Integer>(r.nextInt(maxInt))); + chunks[i][j] = bset; + ngiven++; + } + } + } + + final int startId = 1; + final int sortId = 2; + + /* + * Note: The StartOp breaks up the initial set of chunks into multiple + * IChunkMessages, which results in multiple invocations of the SortOp. + */ + final PipelineOp startOp = new StartOp(new BOp[]{}, NV.asMap(new NV[]{// + new NV(SliceOp.Annotations.BOP_ID, startId),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = new MemorySortOp(new BOp[] {startOp}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, sortId),// + new NV(MemorySortOp.Annotations.COMPARATOR, + new IntegerComparatorOp( + new ISortOrder[] { new SortOrder(a, + true) })),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + new NV(MemorySortOp.Annotations.PIPELINED, false),// + })); + + final UUID queryId = UUID.randomUUID(); + final IRunningQuery q = queryEngine.eval(queryId, query, + new LocalChunkMessage<IBindingSet>(queryEngine, queryId, + startId, -1/* partitionId */, + newBindingSetIterator(chunks))); + + // consume solutions. + int nsolutions = 0; + final IAsynchronousIterator<IBindingSet[]> itr = q.iterator(); + while (itr.hasNext()) { + nsolutions += itr.next().length; + } + + // wait for the query to terminate. + q.get(); + + // Verify stats. + final BOpStats stats = (BOpStats) q.getStats().get(sortId); + if (log.isInfoEnabled()) + log.info(getClass().getName() + "." + getName() + " : " + stats); + assertNotNull(stats); + assertEquals(ngiven, nsolutions); + assertEquals(ngiven, stats.unitsIn.get()); + assertEquals(ngiven, stats.unitsOut.get()); + + } + + /** + * Helper class for comparing solution sets having variables which evaluate + * to {@link Integer} values. + */ + static private class IntegerComparatorOp extends ComparatorOp + { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** The sort order. */ + final private ISortOrder<?> [] _sors; + + public IntegerComparatorOp ( final ISortOrder<?> sors [] ) + { + super ( new BOp [] {}, NV.asMap ( new NV [] { new NV ( ComparatorOp.Annotations.ORDER, sors ) } ) ) ; + _sors = sors ; + } + + public int compare ( IBindingSet o1, IBindingSet o2 ) + { + for ( ISortOrder<?> sor : _sors ) + { + int ret = compare ( sor, o1, o2 ) ; + if ( 0 != ret ) + return ret ; + } + return 0 ; + } + + private int compare ( ISortOrder<?> sor, IBindingSet lhs, IBindingSet rhs ) + { + int compare = 0 ; + + IConstant<?> lhsv = lhs.get ( sor.getVariable () ) ; + IConstant<?> rhsv = rhs.get ( sor.getVariable () ) ; + + if ( null == lhsv && null == rhsv ) + return 0 ; + else if ( null == lhsv ) + compare = -1 ; + else if ( null == rhsv ) + compare = 1 ; + else + compare = ((Integer) lhsv.get()).compareTo(((Integer) rhsv + .get())) ; + + return compare * ( sor.isAscending () ? 1 : -1 ) ; + } + + } + +} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_GroupByOp.java 2011-02-09 17:00:01 UTC (rev 4185) @@ -0,0 +1,306 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Sep 1, 2010 + */ + +package com.bigdata.bop.engine; + +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import junit.framework.TestCase2; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.Constant; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstant; +import com.bigdata.bop.IVariable; +import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.Var; +import com.bigdata.bop.bindingSet.ListBindingSet; +import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.solutions.ComparatorOp; +import com.bigdata.bop.solutions.ISortOrder; +import com.bigdata.bop.solutions.MemorySortOp; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.bop.solutions.SortOrder; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.Journal; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * Test suite for GROUP_BY operators when integrated with the query engine. This + * test suite is designed to examine cases where the GROUP_BY operator will have + * to buffer multiple chunks of solutions before finally reporting the aggregated + * solutions. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id: TestQueryEngine2.java 3489 2010-09-01 18:27:35Z thompsonbry $ + * + * @todo Test each GROUP_BY implementation here. + */ +public class TestQueryEngine_GroupByOp extends TestCase2 { + + public Properties getProperties() { + + final Properties p = new Properties(super.getProperties()); + + p.setProperty(Journal.Options.BUFFER_MODE, BufferMode.Transient + .toString()); + + return p; + + } + + Journal jnl; + QueryEngine queryEngine; + + public void setUp() throws Exception { + + jnl = new Journal(getProperties()); + + queryEngine = new QueryEngine(jnl); + + queryEngine.init(); + + } + + public void tearDown() throws Exception { + + if (queryEngine != null) { + queryEngine.shutdownNow(); + queryEngine = null; + } + + if (jnl != null) { + jnl.destroy(); + jnl = null; + } + + } + + /** + * + */ + public TestQueryEngine_GroupByOp() { + } + + /** + * @param name + */ + public TestQueryEngine_GroupByOp(String name) { + super(name); + } + + public void testStressThreadSafe() throws Exception { + + for (int i = 0; i < 100; i++) { + + try { + + test_groupBy_threadSafe(); + + } catch (Throwable t) { + + fail("Failed after " + i + " trials", t); + + } + + } + + } + + /** + * @todo Unit test for GROUP BY. How to judge correctness? + */ + public void test_groupBy_threadSafe() throws Exception { + + final long timeout = 10000; // ms + + final int ntrials = 10000; + + final int poolSize = 10; + + doGroupByTest(10000/* maxInt */, timeout, ntrials, poolSize); + + } + + /** + * Return an {@link IAsynchronousIterator} that will read a single, chunk + * containing all of the specified {@link IBindingSet}s. + * + * @param bindingSetChunks + * the chunks of binding sets. + */ + protected ThickAsynchronousIterator<IBindingSet[]> newBindingSetIterator( + final IBindingSet[][] bindingSetChunks) { + + return new ThickAsynchronousIterator<IBindingSet[]>(bindingSetChunks); + + } + + /** + * + * @param timeout + * @param ntrials + * @param poolSize + * + * @return The #of successful trials. + * + * @throws Exception + */ + protected void doGroupByTest(final int maxInt, + final long timeout, final int ntrials, final int poolSize) + throws Exception { + + fail("write test helper"); + + int ngiven = 0; + final IVariable<?> a = Var.var("a"); + final IBindingSet[][] chunks = new IBindingSet[ntrials][]; + { + final Random r = new Random(); + for (int i = 0; i < chunks.length; i++) { + // random non-zero chunk size + chunks[i] = new IBindingSet[r.nextInt(10) + 1]; + for (int j = 0; j < chunks[i].length; j++) { + final IBindingSet bset = new ListBindingSet(); + bset.set(a, new Constant<Integer>(r.nextInt(maxInt))); + chunks[i][j] = bset; + ngiven++; + } + } + } + + final int startId = 1; + final int sortId = 2; + + /* + * Note: The StartOp breaks up the initial set of chunks into multiple + * IChunkMessages, which results in multiple invocations of the SortOp. + */ + final PipelineOp startOp = new StartOp(new BOp[]{}, NV.asMap(new NV[]{// + new NV(SliceOp.Annotations.BOP_ID, startId),// + new NV(MemorySortOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = new MemorySortOp(new BOp[] {startOp}, NV.asMap(new NV[] {// + new NV(SliceOp.Annotations.BOP_ID, sortId),// + new NV(MemorySortOp.Annotations.COMPARATOR, + new IntegerComparatorOp( + new ISortOrder[... [truncated message content] |
From: <tho...@us...> - 2011-02-08 17:50:51
|
Revision: 4184 http://bigdata.svn.sourceforge.net/bigdata/?rev=4184&view=rev Author: thompsonbry Date: 2011-02-08 17:50:40 +0000 (Tue, 08 Feb 2011) Log Message: ----------- Added support for "at-once" operator evaluation to the QueryEngine. This is only supported for ChunkedRunningQuery (and its subclass, FederatedRunningQuery), but that is the more efficient evaluation strategy. This is not yet integrated with the NIO buffers used in scale-out so all data is materialized on the Java heap for the moment and "blocked" evaluation of operators based on a memory threshold is not yet supported. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/AllocationContextKey.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/MemorySortOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/solutions/SparqlBindingSetComparatorOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_Slice.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestRunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/solutions/TestMemorySortOp.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine_SortOp.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine2.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -301,7 +301,15 @@ return a; } - /** deep copy the arguments. */ + /** + * Deep copy the arguments. + * + * @todo As long as we stick to the immutable semantics for bops, we can + * just make a shallow copy of the arguments in the "copy" constructor + * and then modify them within the specific operator constructor + * before returning control to the caller. This would result in less + * heap churn. + */ static protected BOp[] deepCopy(final BOp[] a) { if (a == NOARGS) { // fast path for zero arity operators. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -32,12 +32,9 @@ import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.btree.ILocalBTreeView; -import com.bigdata.journal.IIndexManager; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; -import com.bigdata.service.IBigdataFederation; /** * The evaluation context for the operator (NOT serializable). @@ -137,56 +134,40 @@ return sink2; } - /** - * - * @param fed - * The {@link IBigdataFederation} IFF the operator is being - * evaluated on an {@link IBigdataFederation}. When evaluating - * operations against an {@link IBigdataFederation}, this - * reference provides access to the scale-out view of the indices - * and to other bigdata services. - * @param indexManager - * The <strong>local</strong> {@link IIndexManager}. Query - * evaluation occurs against the local indices. In scale-out, - * query evaluation proceeds shard wise and this - * {@link IIndexManager} MUST be able to read on the - * {@link ILocalBTreeView}. - * @param readTimestamp - * The timestamp or transaction identifier against which the - * query is reading. - * @param writeTimestamp - * The timestamp or transaction identifier against which the - * query is writing. - * @param partitionId - * The index partition identifier -or- <code>-1</code> if the - * index is not sharded. - * @param stats - * The object used to collect statistics about the evaluation of - * this operator. - * @param source - * Where to read the data to be consumed by the operator. - * @param sink - * Where to write the output of the operator. - * @param sink2 - * Alternative sink for the output of the operator (optional). - * This is used by things like SPARQL optional joins to route - * failed joins outside of the join group. - * - * @throws IllegalArgumentException - * if the <i>stats</i> is <code>null</code> - * @throws IllegalArgumentException - * if the <i>source</i> is <code>null</code> (use an empty - * source if the source will be ignored). - * @throws IllegalArgumentException - * if the <i>sink</i> is <code>null</code> - * - * @todo modify to accept {@link IChunkMessage} or an interface available - * from getChunk() on {@link IChunkMessage} which provides us with - * flexible mechanisms for accessing the chunk data. - * <p> - * When doing that, modify to automatically track the {@link BOpStats} - * as the <i>source</i> is consumed. - */ + /** + * + * @param runningQuery + * The {@link IRunningQuery}. + * @param partitionId + * The index partition identifier -or- <code>-1</code> if the + * index is not sharded. + * @param stats + * The object used to collect statistics about the evaluation of + * this operator. + * @param source + * Where to read the data to be consumed by the operator. + * @param sink + * Where to write the output of the operator. + * @param sink2 + * Alternative sink for the output of the operator (optional). + * This is used by things like SPARQL optional joins to route + * failed joins outside of the join group. + * + * @throws IllegalArgumentException + * if the <i>stats</i> is <code>null</code> + * @throws IllegalArgumentException + * if the <i>source</i> is <code>null</code> (use an empty + * source if the source will be ignored). + * @throws IllegalArgumentException + * if the <i>sink</i> is <code>null</code> + * + * @todo modify to accept {@link IChunkMessage} or an interface available + * from getChunk() on {@link IChunkMessage} which provides us with + * flexible mechanisms for accessing the chunk data. + * <p> + * When doing that, modify to automatically track the {@link BOpStats} + * as the <i>source</i> is consumed. + */ public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -42,9 +42,6 @@ /** * Base class for the bigdata operation evaluation context (NOT serializable). - * - * @param <E> - * The generic type of the objects processed by the operator. */ public class BOpContextBase { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -661,7 +661,7 @@ static public IBindingSet[] toArray(final Iterator<IBindingSet[]> itr, final BOpStats stats) { - final List<IBindingSet[]> list = new LinkedList<IBindingSet[]>(); + final List<IBindingSet[]> list = new LinkedList<IBindingSet[]>(); int nchunks = 0, nelements = 0; { @@ -676,8 +676,6 @@ nelements += a.length; - list.add(a); - } stats.chunksIn.add(nchunks); @@ -699,19 +697,27 @@ final IBindingSet[] a = new IBindingSet[nelements]; - final Iterator<IBindingSet[]> itr2 = list.iterator(); - - while (itr2.hasNext()) { - - final IBindingSet[] t = itr2.next(); - - System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, - n/* destPos */, t.length/* length */); - - n += t.length; - - } + final Iterator<IBindingSet[]> itr2 = list.iterator(); + while (itr2.hasNext()) { + + final IBindingSet[] t = itr2.next(); + try { + System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, + n/* destPos */, t.length/* length */); + } catch (IndexOutOfBoundsException ex) { + // Provide some more detail in the stack trace. + final IndexOutOfBoundsException ex2 = new IndexOutOfBoundsException( + "t.length=" + t.length + ", a.length=" + a.length + + ", n=" + n); + ex2.initCause(ex); + throw ex2; + } + + n += t.length; + + } + return a; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -27,12 +27,18 @@ package com.bigdata.bop; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; +import org.apache.log4j.Logger; + import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.ChunkedRunningQuery; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.solutions.SliceOp; +import com.bigdata.relation.accesspath.IAsynchronousIterator; /** * Abstract base class for pipeline operators where the data moving along the @@ -55,6 +61,9 @@ */ private static final long serialVersionUID = 1L; + private final static transient Logger log = Logger + .getLogger(PipelineOp.class); + public interface Annotations extends BOp.Annotations, BufferAnnotations { /** @@ -91,6 +100,63 @@ boolean DEFAULT_SHARED_STATE = false; + /** + * Annotation may be used to indicate operators which are not thread + * safe (default {@value #DEFAULT_THREAD_SAFE}). Concurrent invocations + * of the evaluation task will not be scheduled for a given shard for an + * operator which is not thread safe. + * + * @todo Unit tests for {@link ChunkedRunningQuery} to verify that it + * eventually schedules operator tasks which were deferred to + * prevent concurrent evaluation. + */ + String THREAD_SAFE = PipelineOp.class.getName() + ".threadSafe"; + + boolean DEFAULT_THREAD_SAFE = true; + + /** + * Annotation used to mark pipelined (aka vectored) operators. When + * <code>false</code> the operator will use either "at-once" or + * "blocked" evaluation depending on how it buffers its data for + * evaluation. + */ + String PIPELINED = PipelineOp.class.getName() + ".pipelined"; + + boolean DEFAULT_PIPELINED = true; + + /** + * For non-{@link #PIPELINED} operators, this non-negative value + * specifies the maximum #of bytes which the operator may buffer on the + * native heap before evaluation of the operator is triggered -or- ZERO + * (0) if the operator buffers the data on the Java heap (default + * {@value #DEFAULT_MAX_MEMORY}). When non-zero, the #of bytes specified + * should be a multiple of 4k. For a shared operation, the value is the + * maximum #of bytes which may be buffered per shard. + * <p> + * Operator "at-once" evaluation will be used if either (a) the operator + * is buffering data on the Java heap; or (b) the operator is buffering + * data on the native heap and the amount of buffered data does not + * exceed the specified value for {@link #MAX_MEMORY}. For convenience, + * the value {@link Integer#MAX_VALUE} may be specified to indicate that + * "at-once" evaluation is required. + * <p> + * When data are buffered on the Java heap, "at-once" evaluation is + * implied and the data will be made available to the operator as a + * single {@link IAsynchronousIterator} when the operator is invoked. + * <p> + * When {@link #MAX_MEMORY} is positive, data are marshaled in + * {@link ByteBuffer}s and the operator will be invoked once either (a) + * its memory threshold for the buffered data has been exceeded; or (b) + * no predecessor of the operator is running (or can be triggered) -and- + * all inputs for the operator have been materialized on this node. Note + * that some operators DO NOT support multiple pass evaluation + * semantics. Such operators MUST throw an exception if the value of + * this annotation could result in multiple evaluation passes. + */ + String MAX_MEMORY = PipelineOp.class.getName() + ".maxMemory"; + + int DEFAULT_MAX_MEMORY = 0; + // /** // * Annotation used to mark a set of (non-optional) joins which may be // * freely reordered by the query optimizer in order to minimize the @@ -224,17 +290,52 @@ } + /** + * Return <code>true</code> if the operator is pipelined (versus using + * "at-once" or blocked evaluation as discussed below). + * <dl> + * <dt>Pipelined</dt> + * <dd>Pipelined operators stream chunks of intermediate results from one + * operator to the next using producer / consumer pattern. Each time a set + * of intermediate results is available for a pipelined operator, it is + * evaluated against those inputs producing another set of intermediate + * results for its target operator(s). Pipelined operators may be evaluated + * many times during a given query and often have excellent parallelism due + * to the concurrent evaluation of the different operators on different sets + * of intermediate results.</dd> + * <dt>At-Once</dt> + * <dd> + * An "at-once" operator will run exactly once and must wait for all of its + * inputs to be assembled before it runs. There are some operations for + * which "at-once" evaluation is always required, such as ORDER_BY. Other + * operations MAY use operator-at-once evaluation in order to benefit from a + * combination of more efficient IO patterns and simpler design. At-once + * operators may either buffer their data on the Java heap (which is not + * scalable due to the heap pressure exerted on the garbage collector) or + * buffer their data on the native heap (which does scale).</dd> + * <dt>Blocked</dt> + * <dd>Blocked operators buffer large amounts of data on the native heap and + * run each time they exceed some threshold #of bytes of buffered data. A + * blocked operator is basically an "at-once" operator which buffers its + * data on the native heap and which can be evaluated in multiple passes. + * For example, a hash join could use a blocked operator design while an + * ORDER_BY operator can not. By deferring their evaluation until some + * threshold amount of data has been materialized, they may be evaluated + * once or more than once, depending on the data scale, but still retain + * many of the benefits of "at-once" evaluation in terms of IO patterns. + * Whether or not an operator can be used as a "blocked" operator is a + * matter of the underlying operator implementation.</dd> + * </dl> + * + * @see Annotations#PIPELINED + * @see Annotations#MAX_MEMORY + */ + public boolean isPipelined() { + return getProperty(PipelineOp.Annotations.PIPELINED, + PipelineOp.Annotations.DEFAULT_PIPELINED); + } + /** - * Return the {@link PipelineType} of the operator (default - * {@link PipelineType#Vectored}). - */ - public PipelineType getPipelineType() { - - return PipelineType.Vectored; - - } - - /** * Return <code>true</code> iff {@link #newStats()} must be shared across * all invocations of {@link #eval(BOpContext)} for this operator for a * given query. @@ -247,12 +348,21 @@ Annotations.DEFAULT_SHARED_STATE); } - - /** - * Return a new object which can be used to collect statistics on the - * operator evaluation (this may be overridden to return a more specific - * class depending on the operator). - */ + + /** + * Return a new object which can be used to collect statistics on the + * operator evaluation. This may be overridden to return a more specific + * class depending on the operator. + * <p> + * Some operators may use this to share state across multiple invocations of + * the operator within a given query (e.g., {@link SliceOp}). Another + * mechanism for sharing state is to use the same named allocation context + * for the memory manager across the operator invocation instances. + * <p> + * Operator life cycle events support pre-/post-operator behaviors. Such + * events can be used to processed buffered solutions accumulated within + * some shared state across multiple operator invocations. + */ public BOpStats newStats() { return new BOpStats(); @@ -283,7 +393,7 @@ // getChunkTimeout(), Annotations.chunkTimeoutUnit, stats); // // } - + /** * Return a {@link FutureTask} which computes the operator against the * evaluation context. The caller is responsible for executing the @@ -302,5 +412,31 @@ * return the ForkJoinTask. */ abstract public FutureTask<Void> eval(BOpContext<IBindingSet> context); - + + /** + * Hook to setup any resources associated with the operator (temporary + * files, memory manager allocation contexts, etc.). This hook is invoked + * exactly once and before any instance task for the operator is evaluated. + */ + public void setUp() throws Exception { + + if (log.isTraceEnabled()) + log.trace("bopId=" + getId()); + + } + + /** + * Hook to tear down any resources associated with the operator (temporary + * files, memory manager allocation contexts, etc.). This hook is invoked + * exactly once no later than when the query is cancelled. If the operator + * is known to be done executing, then this hook will be invoked at that + * time. + */ + public void tearDown() throws Exception { + + if (log.isTraceEnabled()) + log.trace("bopId=" + getId()); + + } + } Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineType.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -1,68 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Sep 21, 2010 - */ - -package com.bigdata.bop; - -/** - * Return the type of pipelining supported by an operator. - * <p> - * Note: bigdata does not support tuple-at-a-time processing. Only vectored and - * operator-at-a-time processing. Tuple at a time processing is generally very - * inefficient. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public enum PipelineType { - - /** - * Vectored operators stream chunks of intermediate results from one - * operator to the next using producer / consumer pattern. Each time a set - * of intermediate results is available for a vectored operator, it is - * evaluated against those inputs producing another set of intermediate - * results for its target operator(s). Vectored operators may be evaluated - * many times during a given query and often have excellent parallelism due - * to the concurrent evaluation of the different operators on different sets - * of intermediate results. - */ - Vectored, - - /** - * The operator will run exactly once and must wait for all of its inputs to - * be assembled before it runs. - * <p> - * There are some operations for which this is always true, such as SORT. - * Other operations MAY use operator-at-once evaluation in order to benefit - * from a combination of more efficient IO patterns and simpler design. - * However, pipelined operators using large memory blocks have many of the - * benefits of operator-at-once evaluation. By deferring their evaluation - * until some minimum number of source data blocks are available, they may - * be evaluated once or more than once, depending on the data scale. - */ - OneShot; - -} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/QuoteOp.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -38,10 +38,10 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ * - * @todo I think that we can avoid quoting operators by using annotations (for - * some cases) and through explicit interaction between operators for - * others (such as between a join and a predicate). If that proves to be - * true then this class will be dropped. + * @deprecated I think that we can avoid quoting operators by using annotations + * (for some cases) and through explicit interaction between + * operators for others (such as between a join and a predicate). If + * that proves to be true then this class will be dropped. */ public class QuoteOp extends BOpBase { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/AbstractRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -29,6 +29,8 @@ import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -235,6 +237,16 @@ */ private final AtomicBoolean didQueryTearDown = new AtomicBoolean(false); + /** + * A collection reporting on whether or not a given operator has been torn + * down. This collection is used to provide the guarantee that an operator + * is torn down exactly once, regardless of the #of invocations of the + * operator or the #of errors which might occur during query processing. + * + * @see PipelineOp#tearDown() + */ + private final Map<Integer/* bopId */, AtomicBoolean> tornDown = new LinkedHashMap<Integer, AtomicBoolean>(); + /** * Set the query deadline. The query will be cancelled when the deadline is * passed. If the deadline is passed, the query is immediately cancelled. @@ -601,8 +613,23 @@ try { - if (runState.startOp(msg)) + if (runState.startOp(msg)) { + + /* + * Set a flag in this collection so we will know that this + * operator needs to be torn down (we do not bother to tear down + * operators which have never been setup). + */ + tornDown.put(msg.bopId, new AtomicBoolean(false)); + + /* + * TODO It is a bit dangerous to hold the lock while we do this + * but this needs to be executed before any other thread can + * start an evaluation task for that operator. + */ lifeCycleSetUpOperator(msg.bopId); + + } } catch (TimeoutException ex) { @@ -616,19 +643,19 @@ } - /** - * Message provides notice that the operator has ended execution. The - * termination conditions for the query are checked. (For scale-out, the - * node node controlling the query needs to be involved for each operator - * start/stop in order to make the termination decision atomic). - * - * @param msg - * The {@link HaltOpMessage} - * - * @throws UnsupportedOperationException - * If this node is not the query coordinator. - */ - final protected void haltOp(final HaltOpMessage msg) { + /** + * Message provides notice that the operator has ended execution. The + * termination conditions for the query are checked. (For scale-out, the + * node controlling the query needs to be involved for each operator + * start/stop in order to make the termination decision atomic). + * + * @param msg + * The {@link HaltOpMessage} + * + * @throws UnsupportedOperationException + * If this node is not the query coordinator. + */ + /*final*/ protected void haltOp(final HaltOpMessage msg) { if (!controller) throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); @@ -653,13 +680,20 @@ if (runState.haltOp(msg)) { - /* - * No more chunks can appear for this operator so invoke its end - * of life cycle hook. - */ + /* + * No more chunks can appear for this operator so invoke its end + * of life cycle hook IFF it has not yet been invoked. + */ - lifeCycleTearDownOperator(msg.bopId); + final AtomicBoolean tornDown = AbstractRunningQuery.this.tornDown + .get(msg.bopId); + if (tornDown.compareAndSet(false/* expect */, true/* update */)) { + + lifeCycleTearDownOperator(msg.bopId); + + } + if (runState.isAllDone()) { // Normal termination. @@ -681,6 +715,69 @@ } + /** + * Return <code>true</code> iff the preconditions have been satisfied for + * the "at-once" invocation of the specified operator (no predecessors are + * running or could be triggered and the operator has not been evaluated). + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + */ + protected boolean isAtOnceReady(final int bopId) { + + lock.lock(); + + try { + +// if (isDone()) { +// // The query has already halted. +// throw new InterruptedException(); +// } + + return runState.isAtOnceReady(bopId); + + } finally { + + lock.unlock(); + + } + + } + +// /** +// * Return <code>true</code> iff there is already an instance of the operator +// * running. +// * +// * @param bopId +// * The bopId of the operator. +// * +// * @return True iff there is at least one instance of the operator running +// * (globally for this query). +// */ +// public boolean isOperatorRunning(final int bopId) { +// +// lock.lock(); +// +// try { +// +// final AtomicLong nrunning = runState.runningMap.get(bopId); +// +// if (nrunning == null) +// return false; +// +// return nrunning.get() > 0; +// +// } finally { +// +// lock.unlock(); +// +// } +// +// } + /** * Hook invoked the first time the given operator is evaluated for the * query. This may be used to set up life cycle resources for the operator, @@ -690,27 +787,53 @@ * @param bopId * The operator identifier. */ - protected void lifeCycleSetUpOperator(final int bopId) { + protected void lifeCycleSetUpOperator(final int bopId) { - if (log.isTraceEnabled()) - log.trace("queryId=" + queryId + ", bopId=" + bopId); + final BOp op = getBOpIndex().get(bopId); - } + if (op instanceof PipelineOp) { - /** - * Hook invoked the after the given operator has been evaluated for the - * query for what is known to be the last time. This may be used to tear - * down life cycle resources for the operator, such as a distributed hash - * table on a set of nodes identified by annotations of the operator. - * - * @param bopId - * The operator identifier. - */ - protected void lifeCycleTearDownOperator(final int bopId) { + try { - if (log.isTraceEnabled()) - log.trace("queryId=" + queryId + ", bopId=" + bopId); + ((PipelineOp) op).setUp(); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + } + + } + + /** + * Hook invoked the after the given operator has been evaluated for the + * query for what is known to be the last time. This may be used to tear + * down life cycle resources for the operator, such as a distributed hash + * table on a set of nodes identified by annotations of the operator. + * + * @param bopId + * The operator identifier. + */ + protected void lifeCycleTearDownOperator(final int bopId) { + + final BOp op = getBOpIndex().get(bopId); + + if (op instanceof PipelineOp) { + + try { + + ((PipelineOp) op).tearDown(); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + + } + } /** @@ -730,6 +853,27 @@ */ protected void lifeCycleTearDownQuery() { + final Iterator<Map.Entry<Integer/* bopId */, AtomicBoolean/* tornDown */>> itr = tornDown + .entrySet().iterator(); + + while(itr.hasNext()) { + + final Map.Entry<Integer/* bopId */, AtomicBoolean/* tornDown */> entry = itr + .next(); + + final AtomicBoolean tornDown = entry.getValue(); + + if (tornDown.compareAndSet(false/* expect */, true/* update */)) { + + /* + * Guaranteed one time tear down for this operator. + */ + lifeCycleTearDownOperator(entry.getKey()/* bopId */); + + } + + } + if (log.isTraceEnabled()) log.trace("queryId=" + queryId); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/BOpStats.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -30,6 +30,7 @@ import java.io.Serializable; import com.bigdata.bop.BOp; +import com.bigdata.bop.PipelineOp; import com.bigdata.counters.CAT; /** @@ -58,6 +59,13 @@ * The #of instances of a given operator which have been created for a given * query. This provides interesting information about the #of task instances * for each operator which were required to execute a query. + * + * TODO Due to the way this is incremented, this is always ONE (1) if + * {@link PipelineOp.Annotations#SHARED_STATE} is <code>true</code> (it + * reflects the #of times {@link #add(BOpStats)} was invoked plus one for + * the ctor rather than the #of times the operator task was invoked). This + * should be changed to reflect the #of operator task instances created + * instead. */ final public CAT opCount = new CAT(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/ChunkedRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -35,7 +35,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -50,6 +49,7 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.fed.FederatedRunningQuery; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.relation.accesspath.BufferClosedException; @@ -642,19 +642,42 @@ } } - /** - * Examine the input queue for the (bopId,partitionId). If there is work - * available and no task is currently running, then drain the work queue and - * submit a task to consume that work. - * - * @param bundle - * The (bopId,partitionId). - * - * @return <code>true</code> if a new task was started. - */ + /** + * Overridden to attempt to consume another chunk each time an operator + * reports that it has halted evaluation. This is necessary because the + * haltOp() message can arrive asynchronously, so we need to test the work + * queues in case there are "at-once" operators awaiting the termination of + * their predecessor(s) in the pipeline. + */ + @Override + protected void haltOp(final HaltOpMessage msg) { + super.haltOp(msg); + consumeChunk(); + } + + /** + * Examine the input queue for the (bopId,partitionId). If there is work + * available, then drain the work queue and submit a task to consume that + * work. This handles {@link PipelineOp.Annotations#THREAD_SAFE}, + * {@link PipelineOp.Annotations#PIPELINED} as special cases. + * + * + * @param bundle + * The (bopId,partitionId). + * + * @return <code>true</code> if a new task was started. + * + * @todo Also handle {@link PipelineOp.Annotations#MAX_MEMORY} here by + * handshaking with the {@link FederatedRunningQuery}. + */ private boolean scheduleNext(final BSBundle bundle) { if (bundle == null) throw new IllegalArgumentException(); + final BOp bop = getBOpIndex().get(bundle.bopId); + final boolean threadSafe = bop.getProperty( + PipelineOp.Annotations.THREAD_SAFE, + PipelineOp.Annotations.DEFAULT_THREAD_SAFE); + final boolean pipelined = ((PipelineOp)bop).isPipelined();; lock.lock(); try { // Make sure the query is still running. @@ -664,14 +687,31 @@ ConcurrentHashMap<ChunkFutureTask, ChunkFutureTask> map = operatorFutures .get(bundle); if (map != null) { - int nrunning = 0; +// // #of instances of the operator already running. +// int nrunning = 0; for (ChunkFutureTask cft : map.keySet()) { - if (cft.isDone()) + if (cft.isDone()) { + // Remove tasks which have already terminated. map.remove(cft); - nrunning++; + } +// nrunning++; } - if (map.isEmpty()) + if (map.isEmpty()) { + // No tasks running for this operator. operatorFutures.remove(bundle); + } else { + // At least one task is running for this operator. + if (!threadSafe) { + /* + * This operator is not thread-safe, so reject + * concurrent execution for the same (bopId,shardId). + */ + if (log.isDebugEnabled()) + log.debug("Rejecting concurrent execution: " + + bundle + ", #running=" + map.size()); + return false; + } + } /* * FIXME If we allow a limit on the concurrency then we need to * manage things in order to guarantee that deadlock can not @@ -709,26 +749,53 @@ // } // // } - // Remove the work queue for that (bopId,partitionId). + // Get the work queue for that (bopId,partitionId). final BlockingQueue<IChunkMessage<IBindingSet>> queue = operatorQueues - .remove(bundle); - if (queue == null || queue.isEmpty()) { + .get(bundle); + if (queue == null) { // no work return false; } + if (!pipelined && !queue.isEmpty() && !isAtOnceReady(bundle.bopId)) { + /* + * This operator is not pipelined, so we need to wait until all + * of its input solutions have been materialized (no prior + * operator in the pipeline is running or has inputs available + * which could cause it to run). + * + * TODO This is where we should examine MAX_MEMORY and the + * buffered data to see whether or not to trigger an evaluation + * pass for the operator based on the data already materialized + * for that operator. + */ + if (log.isDebugEnabled()) + log.debug("Waiting on producer(s): bopId=" + bundle.bopId); + return false; + } + // Remove the work queue for that (bopId,partitionId). + operatorQueues.remove(bundle); + if (queue.isEmpty()) { + // no work + return false; + } // Drain the work queue for that (bopId,partitionId). final List<IChunkMessage<IBindingSet>> messages = new LinkedList<IChunkMessage<IBindingSet>>(); queue.drainTo(messages); final int nmessages = messages.size(); - /* - * Combine the messages into a single source to be consumed by a - * task. - */ - int nchunks = 1; - final IMultiSourceAsynchronousIterator<IBindingSet[]> source = new MultiSourceSequentialAsynchronousIterator<IBindingSet[]>(messages.remove(0).getChunkAccessor().iterator()); + /* + * Combine the messages into a single source to be consumed by a + * task. + * + * @todo We could limit the #of chunks combined here by leaving the + * rest on the work queue. + */ +// int nchunks = 1; + final IMultiSourceAsynchronousIterator<IBindingSet[]> source = new MultiSourceSequentialAsynchronousIterator<IBindingSet[]>(// + messages.remove(0).getChunkAccessor().iterator()// + ); for (IChunkMessage<IBindingSet> msg : messages) { source.add(msg.getChunkAccessor().iterator()); - nchunks++; +// nchunks++; } /* * Create task to consume that source. @@ -748,6 +815,9 @@ /* * Submit task for execution (asynchronous). */ + if (log.isDebugEnabled()) + log.debug("Running task: bop=" + bundle.bopId + ", nmessages=" + + nmessages); getQueryEngine().execute(cft); return true; } finally { @@ -1409,9 +1479,6 @@ */ synchronized (this) { - if (smallChunks == null) - smallChunks = new LinkedList<IBindingSet[]>(); - if (chunkSize + e.length > chunkCapacity) { // flush the buffer first. @@ -1419,6 +1486,9 @@ } + if (smallChunks == null) + smallChunks = new LinkedList<IBindingSet[]>(); + smallChunks.add(e); chunkSize += e.length; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -9,15 +9,15 @@ lic...@bi... This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by +it under the terms of the GNU General License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +GNU General License for more details. -You should have received a copy of the GNU General Public License +You should have received a copy of the GNU General License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -114,26 +114,45 @@ * @return The query deadline (milliseconds since the epoch) and * {@link Long#MAX_VALUE} if no explicit deadline was specified. */ - public long getDeadline(); + long getDeadline(); /** * The timestamp (ms) when the query began execution. */ - public long getStartTime(); + long getStartTime(); /** * The timestamp (ms) when the query was done and ZERO (0) if the query is * not yet done. */ - public long getDoneTime(); + long getDoneTime(); /** * The elapsed time (ms) for the query. This will be updated for each call * until the query is done executing. */ - public long getElapsed(); - - /** + long getElapsed(); + +// /** +// * Return <code>true</code> if there are no operators which could +// * (re-)trigger the specified operator. +// * <p> +// * Note: This is intended to be invoked synchronously from within the +// * evaluation of the operator in order to determine whether or not the +// * operator can be invoked again for this running query. +// * +// * @param bopId +// * The specified operator. +// * @param nconsumed +// * The #of {@link IChunkMessage} consumed by the operator during +// * its current invocation. +// * +// * @return <code>true</code> iff it is not possible for the specified +// * operator to be retriggered. +// */ +// boolean isLastInvocation(final int bopId,final int nconsumed); + +// /** // * Cancel the running query (normal termination). // * <p> // * Note: This method provides a means for an operator to indicate that the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/PipelineUtility.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -47,42 +47,45 @@ private static final Logger log = Logger.getLogger(PipelineUtility.class); - /** - * Return <code>true</code> iff <i>availableChunkMap</i> map is ZERO (0) for - * the given operator and its descendants AND the <i>runningCountMap</i> is - * ZERO (0) for the operator and all descendants of the operator. For the - * purposes of this method, only {@link BOp#args() operands} are considered - * as descendants. - * <p> - * Note: The movement of the intermediate binding set chunks during query - * processing forms an acyclic directed graph. We can decide whether or not - * a {@link BOp} in the query plan can be triggered by the current activity - * pattern by inspecting the {@link BOp} and its operands recursively. - * - * @param bopId - * The identifier for an operator which appears in the query - * plan. - * @param queryPlan - * The query plan. - * @param queryIndex - * An index for the query plan as constructed by - * {@link BOpUtility#getIndex(BOp)}. - * @param runningCountMap - * A map reporting the #of instances of each operator which are - * currently being evaluated (distinct evaluations are performed - * for each chunk and shard). - * @param availableChunkCountMap - * A map reporting the #of chunks available for each operator in - * the pipeline (we only report chunks for pipeline operators). - * - * @return <code>true</code> iff the {@link BOp} can not be triggered given - * the query plan and the activity map. - * - * @throws IllegalArgumentException - * if any argument is <code>null</code>. - * @throws NoSuchBOpException - * if <i>bopId</i> is not found in the query index. - */ + /** + * Return <code>true</code> iff the running query state is such that it is + * no longer possible for an operator to run which could cause solutions to + * be propagated to the operator identified by the <i>bopId</i>. + * Specifically, this returns true iff <i>availableChunkMap</i> map is ZERO + * (0) for the given operator and its descendants AND the + * <i>runningCountMap</i> is ZERO (0) for the operator and all descendants + * of the operator. For the purposes of this method, only {@link BOp#args() + * operands} are considered as descendants. + * <p> + * Note: The movement of the intermediate binding set chunks during query + * processing forms an acyclic directed graph. We can decide whether or not + * a {@link BOp} in the query plan can be triggered by the current activity + * pattern by inspecting the {@link BOp} and its operands recursively. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the {@link BOp} can not be triggered given + * the query plan and the activity map. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + */ static public boolean isDone(final int bopId, final BOp queryPlan, final Map<Integer, BOp> queryIndex, final Map<Integer, AtomicLong> runningCountMap, @@ -127,8 +130,8 @@ if (runningCount != null && runningCount.get() != 0) { - if (log.isInfoEnabled()) - log.info("Operator can be triggered: op=" + op + if (log.isDebugEnabled()) + log.debug("Operator can be triggered: op=" + op + ", possible trigger=" + t + " is running."); return false; @@ -150,8 +153,8 @@ if (availableChunkCount != null && availableChunkCount.get() != 0) { - if (log.isInfoEnabled()) - log.info("Operator can be triggered: op=" + op + if (log.isDebugEnabled()) + log.debug("Operator can be triggered: op=" + op + ", possible trigger=" + t + " has " + availableChunkCount + " chunks available."); @@ -170,4 +173,145 @@ } + /** + * Return <code>true</code> iff the running query state is such that the + * "at-once" evaluation of the specified operator may proceed. The specific + * requirements are: (a) the operator is not running and has not been + * started; (b) no predecessor in the pipeline is running; and (c) no + * predecessor in the pipeline can be triggered. + * + * @param bopId + * The identifier for an operator which appears in the query + * plan. + * @param queryPlan + * The query plan. + * @param queryIndex + * An index for the query plan as constructed by + * {@link BOpUtility#getIndex(BOp)}. + * @param runningCountMap + * A map reporting the #of instances of each operator which are + * currently being evaluated (distinct evaluations are performed + * for each chunk and shard). + * @param availableChunkCountMap + * A map reporting the #of chunks available for each operator in + * the pipeline (we only report chunks for pipeline operators). + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + * + * @throws IllegalArgumentException + * if any argument is <code>null</code>. + * @throws NoSuchBOpException + * if <i>bopId</i> is not found in the query index. + * + * TODO Unit tests. + */ + static public boolean isAtOnceReady(final int bopId, final BOp queryPlan, + final Map<Integer, BOp> queryIndex, + final Map<Integer, AtomicLong> runningCountMap, + final Map<Integer, AtomicLong> availableChunkCountMap) { + + if (queryPlan == null) + throw new IllegalArgumentException(); + + if (queryIndex == null) + throw new IllegalArgumentException(); + + if (availableChunkCountMap == null) + throw new IllegalArgumentException(); + + final BOp op = queryIndex.get(bopId); + + if (op == null) + throw new NoSuchBOpException(bopId); + + final boolean didStart = runningCountMap.get(bopId) != null; + + if(didStart) { + + // Evaluation has already run (or begun) for this operator. + if (log.isInfoEnabled()) + log.info("Already ran/running: " + bopId); + + return false; + + } + + final Iterator<BOp> itr = BOpUtility.preOrderIterator(op); + + while (itr.hasNext()) { + + final BOp t = itr.next(); + + final Integer id = (Integer) t.getProperty(BOp.Annotations.BOP_ID); + + if (id == null) // TODO Why allow ops w/o bopId here? + continue; + + if(bopId == id.intValue()) { + + // Ignore self. + continue; + + } + + { + + /* + * If any descendants (aka predecessors) of the operator are + * running, then they could cause produce additional solutions + * so the operator is not ready for "at-once" evaluation. + */ + + final AtomicLong runningCount = runningCountMap.get(id); + + if (runningCount != null && runningCount.get() != 0) { + + if (log.isDebugEnabled()) + log.debug("Predecessor running: predecessorId=" + id + + ", predecessorRunningCount=" + runningCount); + + return false; + + } + + } + + { + + /* + * Any chunks available for a descendant (aka predecessor) of + * the operator could produce additional solutions as inputs to + * the operator so it is not ready for "at-once" evaluation. + */ + + final AtomicLong availableChunkCount = availableChunkCountMap + .get(id); + + if (availableChunkCount != null + && availableChunkCount.get() != 0) { + /* + * We are looking at some other predecessor of the specified + * operator. + */ + if (log.isDebugEnabled()) + log.debug("Predecessor can be triggered: predecessorId=" + + id + " has " + availableChunkCount + + " chunks available."); + + return false; + } + + } + + } + + // Success. + if (log.isInfoEnabled()) + log.info("Ready for 'at-once' evaluation: " + bopId); + + return true; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -541,7 +541,7 @@ * Since it is defacto done when [isAllDone] is satisfied, this tests * for that condition first and then for isOperatorDone(). */ - final boolean isOpDone = isAllDone||isOperatorDone(msg.bopId); + final boolean isOpDone = isAllDone || isOperatorDone(msg.bopId); // if (isAllDone && !isOpDone) // throw new RuntimeException("Whoops!: "+this); @@ -575,23 +575,24 @@ } - /** - * Return <code>true</code> the specified operator can no longer be - * triggered by the query. The specific criteria are that no operators which - * are descendants of the specified operator are running or have chunks - * available against which they could run. Under those conditions it is not - * possible for a chunk to show up which would cause the operator to be - * executed. - * - * @param bopId - * Some operator identifier. - * - * @return <code>true</code> if the operator can not be triggered given the - * current query activity. - * - * @throws IllegalMonitorStateException - * unless the {@link #runStateLock} is held by the caller. - */ + /** + * Return <code>true</code> the specified operator can no longer be + * triggered by the query. The specific criteria are that no operators which + * are descendants of the specified operator are running or have chunks + * available against which they could run. Under those conditions it is not + * possible for a chunk to show up which would cause the operator to be + * executed. + * <p> + * Note: The caller MUST hold a lock across this operation in order for it + * to be atomic with respect to the concurrent evaluation of other operators + * for the same query. + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> if the operator can not be triggered given the + * current query activity. + */ private boolean isOperatorDone(final int bopId) { return PipelineUtility.isDone(bopId, query, bopIndex, runningMap, @@ -599,6 +600,24 @@ } + /** + * Return <code>true</code> iff the preconditions have been satisfied for + * the "at-once" invocation of the specified operator (no predecessors are + * running or could be triggered and the operator has not been evaluated). + * + * @param bopId + * Some operator identifier. + * + * @return <code>true</code> iff the "at-once" evaluation of the operator + * may proceed. + */ + boolean isAtOnceReady(final int bopId) { + + return PipelineUtility.isAtOnceReady(bopId, query, bopIndex, + runningMap, availableMap); + + } + /** * Update the {@link RunState} to reflect that fact that a new evaluation * phase has begun for an operator. Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-02-08 17:42:19 UTC (rev 4183) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StandaloneChainedRunningQuery.java 2011-02-08 17:50:40 UTC (rev 4184) @@ -27,7 +27,6 @@ package com.bigdata.bop.engine; -import java.nio.channels.ClosedByInterruptException; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -46,11 +45,9 @@ import com.bigdata.bop.NoSuchBOpException; import com.bigdata.bop.PipelineOp; import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.relation.accesspath.BufferClosedException; import com.bigdata.relation.accesspath.IAsynchronousIterator; import com.bigdata.relation.accesspath.IBlockingBuffer; import com.bigdata.relation.accesspath.MultiplexBlockingBuffer; -import com.bigdata.util.InnerCause; /** * An {@link IRunningQuery} implementation for a standalone database in which a @@ -68,7 +65,9 @@ * sources for the sink have been closed. * <p> * This implementation does not use {@link IChunkMessage}s, can not be used with - * scale-out, and does not support sharded indices. + * scale-out, and does not support sharded indices. This implementation ONLY + * supports pipelined operators. If "at-once" evaluation semantics are required, + * then use {@link ChunkedRunningQuery}. * * @todo Since each operator task runs exactly once there is less potential * parallelism in the operator task execution when compared to @@ -81,8 +80,16 @@ * @todo Run all unit tests of the query engine against the appropriate * strategies. * + * @todo Since this class does not support "at-once" evaluation, it can not be + * used with ORDER BY operator implementations. That really restricts its + * ... [truncated message content] |
From: <mrp...@us...> - 2011-02-08 17:42:25
|
Revision: 4183 http://bigdata.svn.sourceforge.net/bigdata/?rev=4183&view=rev Author: mrpersonick Date: 2011-02-08 17:42:19 +0000 (Tue, 08 Feb 2011) Log Message: ----------- fixed some free text search tests Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailEvaluationStrategyImpl.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNamedGraphs.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailEvaluationStrategyImpl.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailEvaluationStrategyImpl.java 2011-02-08 14:57:32 UTC (rev 4182) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailEvaluationStrategyImpl.java 2011-02-08 17:42:19 UTC (rev 4183) @@ -711,7 +711,7 @@ "where { " + " ?s <"+RDF.TYPE+"> <"+person+"> . " + // [160, 8, 164], [156, 8, 164] " ?s <"+RDFS.LABEL+"> ?label . " + // [160, 148, 174], [156, 148, 170] - " ?label <"+search+"> \"Mi\" . " + // [174, 0, 0] + " ?label <"+search+"> \"Mi*\" . " + // [174, 0, 0] "}"; { // evalute it once so i can see it Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNamedGraphs.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNamedGraphs.java 2011-02-08 14:57:32 UTC (rev 4182) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNamedGraphs.java 2011-02-08 17:42:19 UTC (rev 4183) @@ -1636,7 +1636,7 @@ final String query = "select ?x ?y " + "where { " + - " ?y <"+ BD.SEARCH+"> \"Chris\" . " + + " ?y <"+ BD.SEARCH+"> \"Chris*\" . " + " ?x <"+ RDFS.LABEL.stringValue() + "> ?y . " + "}"; @@ -1669,7 +1669,7 @@ "select ?x ?y " + "where { " + " graph <http://example.org> { " + - " ?y <"+ BD.SEARCH+"> \"Chris\" . " + + " ?y <"+ BD.SEARCH+"> \"Chris*\" . " + " ?x <"+ RDFS.LABEL.stringValue() + "> ?y ." + " } . " + "}"; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-08 14:57:39
|
Revision: 4182 http://bigdata.svn.sourceforge.net/bigdata/?rev=4182&view=rev Author: thompsonbry Date: 2011-02-08 14:57:32 +0000 (Tue, 08 Feb 2011) Log Message: ----------- BT/MC lint dialog Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/AllocationContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/IMemoryManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/ISectorManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManager.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/SectorAllocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManagerResourceError.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/AllocationContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/AllocationContext.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/AllocationContext.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -28,40 +28,49 @@ import java.util.HashSet; /** - * The AllocationContext is used to maintain a handle on allocations made - * within some specific environment (context). + * The {@link AllocationContext} is used to maintain a handle on allocations + * made within some specific environment (context). * * In this way, clearing a context will return all allocations to the more * general pool. * - * There are two obvious implementaiton strategies: - * 1) Retaining set of addresses allocated - * 2) Retaining a copy of the allocated bits + * There are two obvious implementation strategies: + * <ol> + * <li>Retaining set of addresses allocated</li> + * <li>Retaining a copy of the allocated bits</li> + * </ol> * - * If it was not for the BLOB implementations which require length - * data to manage the freeing of an allocation, it would be efficient to - * maintain copies of the allocation bits. This remoains an option for the - * future but requires a relatively complex callback protocol. + * If it was not for the BLOB implementations which require length data to + * manage the freeing of an allocation, it would be efficient to maintain copies + * of the allocation bits. This remains an option for the future but requires a + * relatively complex callback protocol. * * For this reason, the initial implementation maintains a set of allocated * addresses. * * @author Martyn Cutcher - * */ public class AllocationContext implements IMemoryManager { - final IMemoryManager m_parent; + private final IMemoryManager m_parent; - SectorAllocation m_head = null; + private HashSet<Long> m_addresses = new HashSet<Long>(); - HashSet<Long> m_addresses = new HashSet<Long>(); - - public AllocationContext(IMemoryManager parent) { + public AllocationContext(final IMemoryManager parent) { + + if(parent == null) + throw new IllegalArgumentException(); + m_parent = parent; + } + synchronized public long allocate(final ByteBuffer data) { + + if (data == null) + throw new IllegalArgumentException(); + final long addr = m_parent.allocate(data); // getSectorAllocation(addr).allocate(addr); @@ -70,10 +79,7 @@ return addr; } - /** - * The main reason for the AllocationContext is to be - * able to atomically release the associated allocations - */ + synchronized public void clear() { for (Long addr : m_addresses) { m_parent.free(addr); @@ -82,6 +88,7 @@ m_addresses.clear(); } + synchronized public void free(final long addr) { // getSectorAllocation(addr).free(addr); m_addresses.remove(Long.valueOf(addr)); @@ -89,62 +96,80 @@ m_parent.free(addr); } - public ByteBuffer[] get(long addr) { + synchronized + public ByteBuffer[] get(final long addr) { return m_parent.get(addr); } - public AllocationContext createAllocationContext() { + public IMemoryManager createAllocationContext() { return new AllocationContext(this); } - - private int segmentID(final long addr) { - final int rwaddr = MemoryManager.getAllocationAddress(addr); - - return SectorAllocator.getSectorIndex(rwaddr); - } - - private int segmentOffset(final long addr) { - final int rwaddr = MemoryManager.getAllocationAddress(addr); - - return SectorAllocator.getSectorOffset(rwaddr); - } - - SectorAllocation getSectorAllocation(final long addr) { - final int index = segmentID(addr); - if (m_head == null) { - m_head = new SectorAllocation(index); - } - SectorAllocation sa = m_head; - while (sa.m_index != index) { - if (sa.m_next == null) { - sa.m_next = new SectorAllocation(index); - } - sa = sa.m_next; - } - - return sa; - } - - class SectorAllocation { - final int m_index; - final int[] m_bits = new int[SectorAllocator.NUM_ENTRIES]; - SectorAllocation m_next = null; - - SectorAllocation(final int index) { - m_index = index; - } - public void allocate(long addr) { - assert !SectorAllocator.tstBit(m_bits, segmentOffset(addr)); - - SectorAllocator.setBit(m_bits, segmentOffset(addr)); - } +// private SectorAllocation m_head = null; +// +// /** +// * Return the index of {@link SectorAllocator} for the given address. +// * +// * @param addr +// * The given address. +// * @return The index of the {@link SectorAllocator} containing that address. +// */ +// private int segmentID(final long addr) { +// final int rwaddr = MemoryManager.getAllocationAddress(addr); +// +// return SectorAllocator.getSectorIndex(rwaddr); +// } +// +// /** +// * Return the bit offset into the bit map of {@link SectorAllocator} for the +// * given address. +// * +// * @param addr +// * The given address. +// * @return +// */ +// private int segmentOffset(final long addr) { +// final int rwaddr = MemoryManager.getAllocationAddress(addr); +// +// return SectorAllocator.getSectorOffset(rwaddr); +// } +// +// SectorAllocation getSectorAllocation(final long addr) { +// final int index = segmentID(addr); +// if (m_head == null) { +// m_head = new SectorAllocation(index); +// } +// SectorAllocation sa = m_head; +// while (sa.m_index != index) { +// if (sa.m_next == null) { +// sa.m_next = new SectorAllocation(index); +// } +// sa = sa.m_next; +// } +// +// return sa; +// } +// +// class SectorAllocation { +// final int m_index; +// final int[] m_bits = new int[SectorAllocator.NUM_ENTRIES]; +// SectorAllocation m_next = null; +// +// SectorAllocation(final int index) { +// m_index = index; +// } +// +// public void allocate(long addr) { +// assert !SectorAllocator.tstBit(m_bits, segmentOffset(addr)); +// +// SectorAllocator.setBit(m_bits, segmentOffset(addr)); +// } +// +// public void free(long addr) { +// assert SectorAllocator.tstBit(m_bits, segmentOffset(addr)); +// +// SectorAllocator.clrBit(m_bits, segmentOffset(addr)); +// } +// } - public void free(long addr) { - assert SectorAllocator.tstBit(m_bits, segmentOffset(addr)); - - SectorAllocator.clrBit(m_bits, segmentOffset(addr)); - } - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/IMemoryManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/IMemoryManager.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/IMemoryManager.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -26,18 +26,33 @@ import java.nio.ByteBuffer; +/** + * Abstraction for managing data in {@link ByteBuffer}s. Typically those buffers + * will be allocated on the native process heap. + * + * @author martyncutcher + */ public interface IMemoryManager { + /** * Allocates space on the backing resource and copies the provided data. * - * @param data - will be copied to the backing resource + * @param data + * The data will be copied to the backing resource + * * @return the address to be passed to the get method to retrieve the data + * + * @throws IllegalArgumentException + * if the argument is <code>null</code>. + * + * FIXME Replace with allocate(int nbytes):ByteBuffer[] so the + * caller can do zero copy NIO. */ public long allocate(ByteBuffer data); - + /** - * The ByteBuffer[] return enables the handling of blobs that span more - * than a single slot, without the need to create an intermediate ByteBuffer. + * The ByteBuffer[] return enables the handling of blobs that span more than + * a single slot, without the need to create an intermediate ByteBuffer. * * This will support transfers directly to other direct ByteBuffers, for * example for network IO. @@ -45,13 +60,18 @@ * Using ByteBuffer:put the returned array can be efficiently copied to * another ByteBuffer: * + * <pre> * ByteBuffer mybb; * ByteBuffer[] bufs = get(addr); * for (ByteBuffer b : bufs) { - * mybb.put(b); + * mybb.put(b); * } + * </pre> * - * @param addr previouslt returned by allocate + * @param addr + * An address previously returned by + * {@link #allocate(ByteBuffer)}. + * * @return array of ByteBuffers */ public ByteBuffer[] get(long addr); @@ -67,9 +87,11 @@ * Clears all current allocations */ public void clear(); - + /** - * Clears all current allocations + * Create a child allocation context within which the caller may make and + * release allocations. */ - public AllocationContext createAllocationContext(); + public IMemoryManager createAllocationContext(); + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/ISectorManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/ISectorManager.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/ISectorManager.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -25,55 +25,60 @@ package com.bigdata.rwstore.sector; /** - * The SectorManager defines the contract required to manage a set of - * SectorAllocators. + * The {@link ISectorManager} defines the contract required to manage a set of + * {@link SectorAllocator}s. * - * The SectorManager is passed to the SectorAllocator constructors and they - * will callback to manage their free list availability, and to trim the - * allocated storage if required. + * The {@link ISectorManager} is passed to the {@link SectorAllocator} + * constructors and they will callback to manage their free list availability, + * and to trim the allocated storage if required. * * @author Martyn Cutcher - * */ public interface ISectorManager { /** - * This request is made when the sectorAllocator no longer has a full set - * of block allocations available. + * This request is made when the sectorAllocator no longer has a full set of + * block allocations available. * * The allocator will issue this callback to help the SectorManager manage * an effective freelist of available allocators. * - * @param sectorAllocator to be removed + * @param sectorAllocator + * to be removed */ void removeFromFreeList(SectorAllocator sectorAllocator); /** - * When suficient alocations have been freed for recycling that a threshold - * of availability of reached for all block sizes, then the allocator - * calls back to the SectorManager to signal it is available to be returned - * to the free list. + * When sufficient allocations have been freed for recycling that a + * threshold of availability of reached for all block sizes, then the + * allocator calls back to the SectorManager to signal it is available to be + * returned to the free list. * - * @param sectorAllocator to be added + * @param sectorAllocator + * to be added */ void addToFreeList(SectorAllocator sectorAllocator); - + /** * When a sector is first created, it will remain at the head of the free * list until one of two conditions has been reached: + * <ol> * - * 1) The allocation has been saturated - * 2) The bit space has been filled + * <li>The allocation has been saturated.</li> + * <li>The bit space has been filled. + * <li> + * </ol> * - * In the case of (2), then it is possible that significant allocation - * space cannot be utilised - which will happen if the average allocation - * is less than 1K. In this situation, the sector can be trimmed and the - * space made available to the next sector. + * In the case of (2), then it is possible that significant allocation space + * cannot be utilized - which will happen if the average allocation is less + * than 1K. In this situation, the sector can be trimmed and the space made + * available to the next sector. * * trimSector will only be called in this condition - on the first occasion * that the allocator is removed from the freeList. * - * @param trim - the amount by which the sector allocation can be reduced + * @param trim + * - the amount by which the sector allocation can be reduced */ void trimSector(long trim, SectorAllocator sector); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManager.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManager.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -31,49 +31,50 @@ import org.apache.log4j.Logger; import com.bigdata.io.DirectBufferPool; -import com.bigdata.io.IByteArraySlice; /** - * The MemoryManager manages an off-heap Direct Buffer. It uses the new - * SectorAllocator to allocate slots within the address range. + * The MemoryManager manages an off-heap Direct {@link ByteBuffer}. It uses the + * new SectorAllocator to allocate slots within the address range. * - * The interface is designed to support efficient transfer between NIO - * buffers. + * The interface is designed to support efficient transfer between NIO buffers. * * The most complex aspect of the implementation is the BLOB representation, - * requiring a mapping across multiple allocation slots. This is managed - * using recursive calls in the main three methods: allocate, free and get. + * requiring a mapping across multiple allocation slots. This is managed using + * recursive calls in the main three methods: allocate, free and get. * * @author Martyn Cutcher - * */ public class MemoryManager implements IMemoryManager, ISectorManager { - protected static final Logger log = Logger + private static final Logger log = Logger .getLogger(MemoryManager.class); - final ByteBuffer m_resource; + private final ByteBuffer m_resource; final private ReentrantLock m_allocationLock = new ReentrantLock(); int m_allocation = 0; - final int m_sectorSize; - final int m_maxResource; + private final int m_sectorSize; + private final int m_maxResource; - final ArrayList<SectorAllocator> m_sectors = new ArrayList<SectorAllocator>(); - final ArrayList<SectorAllocator> m_free = new ArrayList<SectorAllocator>(); + private final ArrayList<SectorAllocator> m_sectors = new ArrayList<SectorAllocator>(); + private final ArrayList<SectorAllocator> m_free = new ArrayList<SectorAllocator>(); public MemoryManager(final int maxResource, final int sectorSize) { +// m_resource = DirectBufferPool.INSTANCE.acquire(); m_resource = ByteBuffer.allocateDirect(maxResource); m_sectorSize = sectorSize; m_maxResource = maxResource; } - - public class MemoryManagerResourceError extends RuntimeException { - protected MemoryManagerResourceError() {} + + protected void finalize() throws Throwable { + // release to pool. + DirectBufferPool.INSTANCE.release(m_resource); } public long allocate(final ByteBuffer data) { + if (data == null) + throw new IllegalArgumentException(); m_allocationLock.lock(); try { final int size = data.remaining(); @@ -142,28 +143,9 @@ } } - /** - * The ByteBuffer[] return enables the handling of blobs that span more - * than a single slot, without the need to create an intermediate ByteBuffer. - * - * This will support transfers directly to other direct ByteBuffers, for - * example for network IO. - * - * Using ByteBuffer:put the returned array can be efficiently copied to - * another ByteBuffer: - * - * ByteBuffer mybb; - * ByteBuffer[] bufs = get(addr); - * for (ByteBuffer b : bufs) { - * mybb.put(b); - * } - * - * @param addr - * @return - */ public ByteBuffer[] get(final long addr) { - int rwaddr = getAllocationAddress(addr); - int size = getAllocationSize(addr); + final int rwaddr = getAllocationAddress(addr); + final int size = getAllocationSize(addr); if (size <= SectorAllocator.BLOB_SIZE) { return new ByteBuffer[] { getBuffer(rwaddr, size) }; @@ -219,7 +201,7 @@ return ret; } - private SectorAllocator getSector(int rwaddr) { + private SectorAllocator getSector(final int rwaddr) { final int index = SectorAllocator.getSectorIndex(rwaddr); if (index >= m_sectors.size()) throw new IllegalStateException("Address: " + rwaddr + " yields index: " + index + " >= sector:size(): " + m_sectors.size()); @@ -231,7 +213,7 @@ return (int) (addr >> 32L); } - static int getAllocationSize(long addr) { + static int getAllocationSize(final long addr) { return (int) (addr & 0xFFFFL); } @@ -281,20 +263,25 @@ } public void clear() { - m_sectors.clear(); - m_free.clear(); - m_allocation = 0; + m_allocationLock.lock(); + try { + m_sectors.clear(); + m_free.clear(); + m_allocation = 0; + } finally { + m_allocationLock.unlock(); + } } - public void releaseResources() throws InterruptedException { - DirectBufferPool.INSTANCE.release(m_resource); - } +// public void releaseResources() throws InterruptedException { +// DirectBufferPool.INSTANCE.release(m_resource); +// } public void addToFreeList(final SectorAllocator sector) { m_free.add(sector); } - public void removeFromFreeList(SectorAllocator sector) { + public void removeFromFreeList(final SectorAllocator sector) { assert m_free.get(0) == sector; m_free.remove(sector); @@ -306,7 +293,7 @@ m_allocation -= trim; } - public AllocationContext createAllocationContext() { + public IMemoryManager createAllocationContext() { return new AllocationContext(this); } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManagerResourceError.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManagerResourceError.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/MemoryManagerResourceError.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -0,0 +1,13 @@ +/** + * + */ +package com.bigdata.rwstore.sector; + +public class MemoryManagerResourceError extends RuntimeException { + /** + * + */ + private static final long serialVersionUID = 1L; + + protected MemoryManagerResourceError() {} +} \ No newline at end of file Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/SectorAllocator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/SectorAllocator.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/rwstore/sector/SectorAllocator.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -31,23 +31,22 @@ import org.apache.log4j.Logger; -import com.bigdata.io.DirectBufferPool; import com.bigdata.rwstore.FixedOutputStream; import com.bigdata.rwstore.IWriteCacheManager; /** * The SectorAllocator is designed as an alternative the the standard RWStore * FixedAllocators. - * + * <p> * The idea of the SectorAllocator is to efficiently contain within a single * region as dense a usage as possible. Since a SectorAllocator is able to * allocate a full range of slot sizes, it should be able to service several - * thousand allocations and maximise disk locality on write. - * + * thousand allocations and maximize disk locality on write. + * <p> * Furthermore, it presents an option to be synced with the backing store - * similarly to a MappedFile, in which case a single write for the entire * sector could be made for update. - * + * <p> * What we do not want is to run out of bits and to leave significant unused * space in the sector. This could happen if we primarily allocated small * slots - say on average 512 bytes. In this case, the maximum 1636 entries @@ -63,20 +62,19 @@ * sectors. Implying a maximum addressable store file of 64M * 64K, * = 4TB of full sectors. If the average sector only requires 32M, then the * total store would be reduced appropriately. - * + * <p> * The maximum theoretical storage is yielded by MAX_INT * AVG_SLOT_SIZE, so * 2GB * 2K (avg) would equate to the optimal maximum addressable allocations * and file size. An AVG of > 2K yields fewer allocations and an AVG of < 2K * a reduced file size. * * TODO: add parameterisation of META_SIZE for exploitation by MemoryManager. - * TODO: cache block starts in m_addresses to simplify/optimise bit2Offset * * @author Martyn Cutcher - * */ public class SectorAllocator { - protected static final Logger log = Logger + + private static final Logger log = Logger .getLogger(SectorAllocator.class); static final int getBitMask(int bits) { @@ -86,28 +84,29 @@ return ret; } - static final int SECTOR_INDEX_BITS = 16; - static final int SECTOR_OFFSET_BITS = 32-SECTOR_INDEX_BITS; - static final int SECTOR_OFFSET_MASK = getBitMask(SECTOR_OFFSET_BITS); + + private static final int SECTOR_INDEX_BITS = 16; + private static final int SECTOR_OFFSET_BITS = 32-SECTOR_INDEX_BITS; + private static final int SECTOR_OFFSET_MASK = getBitMask(SECTOR_OFFSET_BITS); - static final int META_SIZE = 8192; // 8K + private static final int META_SIZE = 8192; // 8K - final static int SECTOR_SIZE = 64 * 1024 * 1024; // 10M - final static int NUM_ENTRIES = (META_SIZE - 12) / (4 + 1); // 8K - index - address / (4 + 1) bits plus tag - final int[] BIT_MASKS = {0x1, 0x3, 0x7, 0xF, 0xFF, 0xFFFF, 0xFFFFFFFF}; +// private final static int SECTOR_SIZE = 64 * 1024 * 1024; // 10M + private final static int NUM_ENTRIES = (META_SIZE - 12) / (4 + 1); // 8K - index - address / (4 + 1) bits plus tag +// private final int[] BIT_MASKS = {0x1, 0x3, 0x7, 0xF, 0xFF, 0xFFFF, 0xFFFFFFFF}; final static int BLOB_SIZE = 4096; - final static int BLOB_CHAIN_OFFSET = BLOB_SIZE - 4; - final static int[] ALLOC_SIZES = {64, 128, 256, 512, 1024, 2048, BLOB_SIZE}; - final static int[] ALLOC_BITS = {32, 32, 32, 32, 32, 32, 32}; - int m_index; - long m_sectorAddress; - int m_maxSectorSize; - byte[] m_tags = new byte[NUM_ENTRIES]; - int[] m_bits = new int[NUM_ENTRIES]; // 128 - sectorAddress(1) - m_tags(4) +// private final static int BLOB_CHAIN_OFFSET = BLOB_SIZE - 4; + private final static int[] ALLOC_SIZES = {64, 128, 256, 512, 1024, 2048, BLOB_SIZE}; + private final static int[] ALLOC_BITS = {32, 32, 32, 32, 32, 32, 32}; + private int m_index; + private long m_sectorAddress; + private int m_maxSectorSize; + private final byte[] m_tags = new byte[NUM_ENTRIES]; + private final int[] m_bits = new int[NUM_ENTRIES]; // 128 - sectorAddress(1) - m_tags(4) - int[] m_transientbits = new int[NUM_ENTRIES]; - int[] m_commitbits = new int[NUM_ENTRIES]; - int[] m_addresses = new int[NUM_ENTRIES]; + private int[] m_transientbits = new int[NUM_ENTRIES]; + private int[] m_commitbits = new int[NUM_ENTRIES]; + private final int[] m_addresses = new int[NUM_ENTRIES]; // maintain count against each alloc size, this provides ready access to be // able to check the minimum number of bits for all tag sizes. No @@ -118,17 +117,18 @@ // only the total number of bits, but the average number of bits for the // tag, dividing the numebr of free bits by the total (number of blocks) // for each tag. - int[] m_free = new int[ALLOC_SIZES.length]; - int[] m_total = new int[ALLOC_SIZES.length]; - int[] m_allocations = new int[ALLOC_SIZES.length]; - int[] m_recycles = new int[ALLOC_SIZES.length]; + final private int[] m_free = new int[ALLOC_SIZES.length]; + final private int[] m_total = new int[ALLOC_SIZES.length]; + final private int[] m_allocations = new int[ALLOC_SIZES.length]; + final private int[] m_recycles = new int[ALLOC_SIZES.length]; - final ISectorManager m_store; - boolean m_onFreeList = false; - private long m_diskAddr; + private final ISectorManager m_store; private final IWriteCacheManager m_writes; - public SectorAllocator(ISectorManager store, IWriteCacheManager writes) { + private boolean m_onFreeList = false; + private long m_diskAddr; + + public SectorAllocator(final ISectorManager store, final IWriteCacheManager writes) { m_store = store; m_writes = writes; } @@ -217,7 +217,7 @@ int allocated = 0; for (int i = 0; i < m_tags.length; i++) { if (m_tags[i] == -1) { - int block = this.ALLOC_SIZES[tag] * 32; + int block = SectorAllocator.ALLOC_SIZES[tag] * 32; if ((allocated + block) <= m_maxSectorSize) { m_tags[i] = tag; m_free[tag] += 32; @@ -553,15 +553,15 @@ m_store.addToFreeList(this); } - public static int getSectorIndex(int rwaddr) { + public static int getSectorIndex(final int rwaddr) { return (-rwaddr) >> SECTOR_OFFSET_BITS; } - public static int getSectorOffset(int rwaddr) { + public static int getSectorOffset(final int rwaddr) { return (-rwaddr) & SECTOR_OFFSET_MASK; } - public static int getBlobBlockCount(int size) { + public static int getBlobBlockCount(final int size) { final int nblocks = (size + BLOB_SIZE - 1) / BLOB_SIZE; return nblocks; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java 2011-02-07 20:30:43 UTC (rev 4181) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/rwstore/sector/TestMemoryManager.java 2011-02-08 14:57:32 UTC (rev 4182) @@ -10,7 +10,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.bigdata.rwstore.sector.MemoryManager.MemoryManagerResourceError; import com.bigdata.util.concurrent.DaemonThreadFactory; import junit.framework.TestCase; @@ -84,7 +83,7 @@ public void testAllocationContexts() { installMemoryManager(); - AllocationContext context = manager.createAllocationContext(); + final IMemoryManager context = manager.createAllocationContext(); for (int i = 0; i < 500; i++) { doStressAllocations(context, false, 5000, 5 + r.nextInt(3000)); context.clear(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2011-02-07 20:30:50
|
Revision: 4181 http://bigdata.svn.sourceforge.net/bigdata/?rev=4181&view=rev Author: mrpersonick Date: 2011-02-07 20:30:43 +0000 (Mon, 07 Feb 2011) Log Message: ----------- fixing a problem with nested unions Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTreeBuilder.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNestedUnions.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-02-07 16:46:53 UTC (rev 4180) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java 2011-02-07 20:30:43 UTC (rev 4181) @@ -349,6 +349,10 @@ new NV(SliceOp.Annotations.EVALUATION_CONTEXT, BOpEvaluationContext.CONTROLLER),// })),queryHints); + + if (rule.getTailCount() == 0) { + return startOp; + } /* * First put the tails in the correct order based on the logic in @@ -569,12 +573,10 @@ for (int i = 0; i < order.length; i++) { - final int joinId = idFactory.incrementAndGet(); - // assign a bop id to the predicate Predicate<?> pred = (Predicate<?>) rule.getTail(order[i]).setBOpId( idFactory.incrementAndGet()); - + /* * Decorate the predicate with the assigned index (this is purely * informative). @@ -585,7 +587,7 @@ Annotations.ORIGINAL_INDEX, keyOrder[order[i]]); } - // decorate the predicate with the cardinality estimate. + // decorate the predicate with the cardinality estimate. if (cardinality != null) { pred = (Predicate<?>) pred.setProperty( Annotations.ESTIMATED_CARDINALITY, @@ -628,107 +630,9 @@ } } - // annotations for this join. - final List<NV> anns = new LinkedList<NV>(); - - anns.add(new NV(BOp.Annotations.BOP_ID, joinId)); + left = join(queryEngine, left, pred, constraints, context, + idFactory, queryHints); -// anns.add(new NV(PipelineJoin.Annotations.SELECT, -// selectVars[order[i]])); - - // No. The join just looks at the Predicate's optional annotation. -// if (pred.isOptional()) -// anns.add(new NV(PipelineJoin.Annotations.OPTIONAL, pred -// .isOptional())); - - /* - * Pull off annotations before we clear them from the predicate. - */ - final Scope scope = (Scope) pred.getProperty(Annotations.SCOPE); - - // true iff this is a quads access path. - final boolean quads = pred.getProperty(Annotations.QUADS, - Annotations.DEFAULT_QUADS); - - // pull of the Sesame dataset before we strip the annotations. - final Dataset dataset = (Dataset) pred - .getProperty(Annotations.DATASET); - - // strip off annotations that we do not want to propagate. - pred = pred.clearAnnotations(new String[] { Annotations.SCOPE, - Annotations.QUADS, Annotations.DATASET }); - - if (!constraints.isEmpty()) { -// // decorate the predicate with any constraints. -// pred = (Predicate<?>) pred.setProperty( -// IPredicate.Annotations.CONSTRAINTS, constraints -// .toArray(new IConstraint[constraints.size()])); - // add constraints to the join for that predicate. - anns.add(new NV(PipelineJoin.Annotations.CONSTRAINTS, - constraints - .toArray(new IConstraint[constraints.size()]))); - - } - - if (quads) { - - /* - * Quads mode. - */ - - if (enableDecisionTree) { - /* - * Strip off the named graph or default graph expander (in - * the long term it will simply not be generated.) - */ - pred = pred - .clearAnnotations(new String[] { IPredicate.Annotations.ACCESS_PATH_EXPANDER }); - - switch (scope) { - case NAMED_CONTEXTS: - left = namedGraphJoin(queryEngine, context, idFactory, - left, anns, pred, dataset, queryHints); - break; - case DEFAULT_CONTEXTS: - left = defaultGraphJoin(queryEngine, context, idFactory, - left, anns, pred, dataset, queryHints); - break; - default: - throw new AssertionError(); - } - - } else { - - /* - * This is basically the old way of handling quads query - * using expanders which were attached by - * BigdataEvaluationStrategyImpl. - */ - - final boolean scaleOut = queryEngine.isScaleOut(); - if (scaleOut) - throw new UnsupportedOperationException(); - - anns.add(new NV(Predicate.Annotations.EVALUATION_CONTEXT, - BOpEvaluationContext.ANY)); - - anns.add(new NV(PipelineJoin.Annotations.PREDICATE,pred)); - - left = applyQueryHints(new PipelineJoin(new BOp[] { left }, - anns.toArray(new NV[anns.size()])), queryHints); - - } - - } else { - - /* - * Triples or provenance mode. - */ - - left = triplesModeJoin(queryEngine, left, anns, pred, queryHints); - - } - } // if (rule.getConstraintCount() > 0) { @@ -758,7 +662,130 @@ return left; } + + public static PipelineOp join(final QueryEngine queryEngine, + PipelineOp left, Predicate pred, final AtomicInteger idFactory, + final Properties queryHints) { + + return join(queryEngine, left, pred, null, + new BOpContextBase(queryEngine), idFactory, queryHints); + + } + + public static PipelineOp join(final QueryEngine queryEngine, + PipelineOp left, Predicate pred, + final Collection<IConstraint> constraints, + final BOpContextBase context, final AtomicInteger idFactory, + final Properties queryHints) { + + final int joinId = idFactory.incrementAndGet(); + + // annotations for this join. + final List<NV> anns = new LinkedList<NV>(); + + anns.add(new NV(BOp.Annotations.BOP_ID, joinId)); +// anns.add(new NV(PipelineJoin.Annotations.SELECT, +// selectVars[order[i]])); + + // No. The join just looks at the Predicate's optional annotation. +// if (pred.isOptional()) +// anns.add(new NV(PipelineJoin.Annotations.OPTIONAL, pred +// .isOptional())); + + if (constraints != null && !constraints.isEmpty()) { +// // decorate the predicate with any constraints. +// pred = (Predicate<?>) pred.setProperty( +// IPredicate.Annotations.CONSTRAINTS, constraints +// .toArray(new IConstraint[constraints.size()])); + + // add constraints to the join for that predicate. + anns.add(new NV( + PipelineJoin.Annotations.CONSTRAINTS, + constraints.toArray(new IConstraint[constraints.size()]))); + + } + + /* + * Pull off annotations before we clear them from the predicate. + */ + final Scope scope = (Scope) pred.getProperty(Annotations.SCOPE); + + // true iff this is a quads access path. + final boolean quads = pred.getProperty(Annotations.QUADS, + Annotations.DEFAULT_QUADS); + + // pull of the Sesame dataset before we strip the annotations. + final Dataset dataset = (Dataset) pred + .getProperty(Annotations.DATASET); + + // strip off annotations that we do not want to propagate. + pred = pred.clearAnnotations(new String[] { Annotations.SCOPE, + Annotations.QUADS, Annotations.DATASET }); + + if (quads) { + + /* + * Quads mode. + */ + + if (enableDecisionTree) { + /* + * Strip off the named graph or default graph expander (in + * the long term it will simply not be generated.) + */ + pred = pred + .clearAnnotations(new String[] { IPredicate.Annotations.ACCESS_PATH_EXPANDER }); + + switch (scope) { + case NAMED_CONTEXTS: + left = namedGraphJoin(queryEngine, context, idFactory, + left, anns, pred, dataset, queryHints); + break; + case DEFAULT_CONTEXTS: + left = defaultGraphJoin(queryEngine, context, idFactory, + left, anns, pred, dataset, queryHints); + break; + default: + throw new AssertionError(); + } + + } else { + + /* + * This is basically the old way of handling quads query + * using expanders which were attached by + * BigdataEvaluationStrategyImpl. + */ + + final boolean scaleOut = queryEngine.isScaleOut(); + if (scaleOut) + throw new UnsupportedOperationException(); + + anns.add(new NV(Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.ANY)); + + anns.add(new NV(PipelineJoin.Annotations.PREDICATE,pred)); + + left = applyQueryHints(new PipelineJoin(new BOp[] { left }, + anns.toArray(new NV[anns.size()])), queryHints); + + } + + } else { + + /* + * Triples or provenance mode. + */ + + left = triplesModeJoin(queryEngine, left, anns, pred, queryHints); + + } + + return left; + + } + /** * Generate a {@link PipelineJoin} for a triples mode access path. * Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-02-07 16:46:53 UTC (rev 4180) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOp2BOpUtility.java 2011-02-07 20:30:43 UTC (rev 4181) @@ -40,6 +40,7 @@ import org.openrdf.query.algebra.StatementPattern; import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContextBase; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.BOpUtility; import com.bigdata.bop.IConstraint; @@ -132,7 +133,8 @@ private static boolean isNonOptionalJoinGroup(final SOpGroup sopGroup) { - return !(isUnion(sopGroup) || isOptional(sopGroup)); + return sopGroup.size() > 0 && + !(isUnion(sopGroup) || isOptional(sopGroup)); } @@ -184,14 +186,37 @@ * Start with left=<this join group> and add a SubqueryOp for each * sub group. */ +// final SOpGroups children = join.getChildren(); +// if (children != null) { +// for (SOpGroup child : children) { +// if (isSingleOptional(child)) { +// // handled by the rule() conversion above +// continue; +// } +// final PipelineOp subquery = convert( +// child, idFactory, db, queryEngine, queryHints); +// final boolean optional = isOptional(child); +// final int subqueryId = idFactory.incrementAndGet(); +// left = new SubqueryOp(new BOp[]{left}, +// new NV(Predicate.Annotations.BOP_ID, subqueryId),// +// new NV(SubqueryOp.Annotations.SUBQUERY, subquery),// +// new NV(SubqueryOp.Annotations.OPTIONAL,optional)// +// ); +// if (log.isInfoEnabled()) { +// log.info("adding a subquery: " + subqueryId + "\n" + left); +// } +// } +// } final SOpGroups children = join.getChildren(); if (children != null) { - for (SOpGroup child : join.getChildren()) { - if (isSingleOptional(child)) { - // handled by the rule() conversion above + /* + * First do the non-optional subqueries (UNIONs) + */ + for (SOpGroup child : children) { + if (!isUnion(child)) continue; - } + final PipelineOp subquery = convert( child, idFactory, db, queryEngine, queryHints); final boolean optional = isOptional(child); @@ -205,6 +230,39 @@ log.info("adding a subquery: " + subqueryId + "\n" + left); } } + + /* + * Next do the optional subqueries and optional tails + */ + for (SOpGroup child : children) { + if (isUnion(child)) + continue; + + if (isSingleOptional(child)) { + final SOp sop = child.getSingletonSOp(); + final BOp bop = sop.getBOp(); + Predicate pred = (Predicate) bop.setProperty( + IPredicate.Annotations.OPTIONAL, Boolean.TRUE); + pred = pred.setBOpId(idFactory.incrementAndGet()); + left = Rule2BOpUtility.join( + queryEngine, left, pred, + idFactory, + queryHints); + } else { + final PipelineOp subquery = convert( + child, idFactory, db, queryEngine, queryHints); + final boolean optional = isOptional(child); + final int subqueryId = idFactory.incrementAndGet(); + left = new SubqueryOp(new BOp[]{left}, + new NV(Predicate.Annotations.BOP_ID, subqueryId),// + new NV(SubqueryOp.Annotations.SUBQUERY, subquery),// + new NV(SubqueryOp.Annotations.OPTIONAL,optional)// + ); + if (log.isInfoEnabled()) { + log.info("adding a subquery: " + subqueryId + "\n" + left); + } + } + } } for (IConstraint c : postConditionals) { @@ -313,23 +371,23 @@ } } - /* - * The way that the Sesame operator tree is parsed, optional tails - * become single-operator (predicate) join groups without any children - * of their own. - */ - final SOpGroups children = group.getChildren(); - if (children != null) { - for (SOpGroup child : group.getChildren()) { - if (isSingleOptional(child)) { - final SOp sop = child.getSingletonSOp(); - final BOp bop = sop.getBOp(); - final IPredicate pred = (IPredicate) bop.setProperty( - IPredicate.Annotations.OPTIONAL, Boolean.TRUE); - preds.add(pred); - } - } - } +// /* +// * The way that the Sesame operator tree is parsed, optional tails +// * become single-operator (predicate) join groups without any children +// * of their own. +// */ +// final SOpGroups children = group.getChildren(); +// if (children != null) { +// for (SOpGroup child : children) { +// if (isSingleOptional(child)) { +// final SOp sop = child.getSingletonSOp(); +// final BOp bop = sop.getBOp(); +// final IPredicate pred = (IPredicate) bop.setProperty( +// IPredicate.Annotations.OPTIONAL, Boolean.TRUE); +// preds.add(pred); +// } +// } +// } /* * Gather up all the variables used by predicates in this group Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java 2011-02-07 16:46:53 UTC (rev 4180) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTree.java 2011-02-07 20:30:43 UTC (rev 4181) @@ -43,10 +43,14 @@ } group.add(sop); } + if (!groups.containsKey(0)) { + // need a dummy root group + groups.put(0, new LinkedList<SOp>()); + } for (Integer g : groups.keySet()) { final List<SOp> group = groups.get(g); - final int pg = group.get(0).getParentGroup(); + final int pg = group.isEmpty() ? -1 : group.get(0).getParentGroup(); allGroups.put(g, new SOpGroup(g, pg, group)); } @@ -195,7 +199,10 @@ } sb.append("SOp -> parent:").append(nl); for (Map.Entry<Integer, SOpGroup> e : this.parents.entrySet()) { - sb.append(e.getKey() + " -> " + e.getValue().getGroup()).append(nl); + sb.append(e.getKey()); + sb.append(" -> "); + sb.append(e.getValue() == null ? "null" : e.getValue().getGroup()); + sb.append(nl); } sb.append("SOp -> children:").append(nl); for (Map.Entry<Integer, SOpGroups> e : this.children.entrySet()) { @@ -206,7 +213,9 @@ } sb2.setLength(sb2.length()-2); sb.append(e.getKey() + " -> {" + sb2.toString() + "}"); + sb.append(nl); } + sb.setLength(sb.length()-1); return sb.toString(); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTreeBuilder.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTreeBuilder.java 2011-02-07 16:46:53 UTC (rev 4180) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/sop/SOpTreeBuilder.java 2011-02-07 20:30:43 UTC (rev 4181) @@ -121,6 +121,8 @@ collectSOps(sops, (LeftJoin) left, rslj, g, pg); } else if (left instanceof SingletonSet){ // do nothing + } else if (left instanceof Union){ + collectSOps(sops, (Union) left, rslj, groupId.incrementAndGet(), g); } else { throw new UnsupportedOperatorException(left); } Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNestedUnions.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNestedUnions.java 2011-02-07 16:46:53 UTC (rev 4180) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNestedUnions.java 2011-02-07 20:30:43 UTC (rev 4181) @@ -28,14 +28,12 @@ import java.util.Collection; import java.util.LinkedList; -import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; -import org.openrdf.model.Literal; import org.openrdf.model.URI; import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; import org.openrdf.query.BindingSet; @@ -48,17 +46,9 @@ import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.sail.SailTupleQuery; -import com.bigdata.bop.BOpUtility; -import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.engine.QueryEngine; import com.bigdata.rdf.axioms.NoAxioms; -import com.bigdata.rdf.sail.sop.SOp; -import com.bigdata.rdf.sail.sop.SOp2BOpUtility; import com.bigdata.rdf.sail.sop.SOpTree; -import com.bigdata.rdf.sail.sop.SOpTree.SOpGroup; -import com.bigdata.rdf.sail.sop.SOpTree.SOpGroups; import com.bigdata.rdf.sail.sop.SOpTreeBuilder; -import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.rdf.store.BD; import com.bigdata.rdf.vocab.NoVocabulary; @@ -229,4 +219,118 @@ } + public void testNestedUnionWithOptionals() throws Exception { + +// final Sail sail = new MemoryStore(); +// sail.initialize(); +// final Repository repo = new SailRepository(sail); + + final BigdataSail sail = getSail(); + try { + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + + final RepositoryConnection cxn = repo.getConnection(); + + try { + + cxn.setAutoCommit(false); + + final ValueFactory vf = sail.getValueFactory(); + + /* + * Create some terms. + */ + final URI john = vf.createURI(BD.NAMESPACE + "john"); + final URI mary = vf.createURI(BD.NAMESPACE + "mary"); + final URI leon = vf.createURI(BD.NAMESPACE + "leon"); + final URI paul = vf.createURI(BD.NAMESPACE + "paul"); + final URI brad = vf.createURI(BD.NAMESPACE + "brad"); + final URI fred = vf.createURI(BD.NAMESPACE + "fred"); + final URI knows = vf.createURI(BD.NAMESPACE + "knows"); + + /* + * Create some statements. + */ + cxn.add(mary, knows, fred); + cxn.add(john, knows, leon); + cxn.add(john, RDFS.LABEL, vf.createLiteral("John")); + cxn.add(mary, RDF.TYPE, RDFS.RESOURCE); + + /* + * Note: The either flush() or commit() is required to flush the + * statement buffers to the database before executing any + * operations that go around the sail. + */ + cxn.commit(); + + { + + String query = + "prefix bd: <"+BD.NAMESPACE+"> " + + "prefix rdf: <"+RDF.NAMESPACE+"> " + + "prefix rdfs: <"+RDFS.NAMESPACE+"> " + + "select * " + + "where { " + + " { " + + " ?a bd:knows bd:fred . " + + " } UNION { " + + " ?a bd:knows bd:leon . " + + " } " + + " OPTIONAL { ?a rdf:type ?type } " + + " OPTIONAL { ?a rdfs:label ?label } " + + "}"; + + final SailTupleQuery tupleQuery = (SailTupleQuery) + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + tupleQuery.setIncludeInferred(false /* includeInferred */); + + if (log.isInfoEnabled()) { + + final BigdataSailTupleQuery bdTupleQuery = + (BigdataSailTupleQuery) tupleQuery; + final QueryRoot root = (QueryRoot) bdTupleQuery.getTupleExpr(); + final Projection p = (Projection) root.getArg(); + final TupleExpr tupleExpr = p.getArg(); + + log.info(tupleExpr); + + final SOpTreeBuilder stb = new SOpTreeBuilder(); + final SOpTree tree = stb.collectSOps(tupleExpr); + + log.info(tree); + log.info(query); + + final TupleQueryResult result = tupleQuery.evaluate(); + log.info("results:"); + while (result.hasNext()) { + log.info(result.next()); + } + + } + + final Collection<BindingSet> answer = new LinkedList<BindingSet>(); + answer.add(createBindingSet( + new BindingImpl("a", john), + new BindingImpl("label", vf.createLiteral("John")) + )); + answer.add(createBindingSet( + new BindingImpl("a", mary), + new BindingImpl("type", RDFS.RESOURCE) + )); + + final TupleQueryResult result = tupleQuery.evaluate(); + compare(result, answer); + + } + + } finally { + cxn.close(); + } + } finally { + sail.__tearDownUnitTest();//shutDown(); + } + + } + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-07 16:46:59
|
Revision: 4180 http://bigdata.svn.sourceforge.net/bigdata/?rev=4180&view=rev Author: thompsonbry Date: 2011-02-07 16:46:53 +0000 (Mon, 07 Feb 2011) Log Message: ----------- javadoc for the getReadWriteConnection() and getUnisolatedConnection() methods (copied from the BigdataSail). Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java 2011-02-04 21:02:20 UTC (rev 4179) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java 2011-02-07 16:46:53 UTC (rev 4180) @@ -79,6 +79,9 @@ } + /** + * Return a connection backed by a read-write transaction. + */ public BigdataSailRepositoryConnection getReadWriteConnection() throws RepositoryException { @@ -95,6 +98,12 @@ } + /** + * Return an unisolated connection to the database. Only one of these + * allowed at a time. + * + * @return unisolated connection to the database + */ public BigdataSailRepositoryConnection getUnisolatedConnection() throws RepositoryException { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-04 21:02:27
|
Revision: 4179 http://bigdata.svn.sourceforge.net/bigdata/?rev=4179&view=rev Author: thompsonbry Date: 2011-02-04 21:02:20 +0000 (Fri, 04 Feb 2011) Log Message: ----------- javadoc Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java 2011-02-04 13:01:01 UTC (rev 4178) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/internal/DefaultExtensionFactory.java 2011-02-04 21:02:20 UTC (rev 4179) @@ -6,7 +6,13 @@ import com.bigdata.rdf.lexicon.LexiconRelation; /** - * Empty {@link IExtensionFactory}. + * Default {@link IExtensionFactory}. The following extensions are supported: + * <dl> + * <dt>{@link DateTimeExtension}</dt> + * <dd>Inlining literals which represent <code>xsd:dateTime</code> values into + * the statement indices.</dd> + * <dt></dt><dd></dd> + * </dl> */ public class DefaultExtensionFactory implements IExtensionFactory { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-04 13:01:07
|
Revision: 4178 http://bigdata.svn.sourceforge.net/bigdata/?rev=4178&view=rev Author: thompsonbry Date: 2011-02-04 13:01:01 +0000 (Fri, 04 Feb 2011) Log Message: ----------- Modified to first stop the lookup service and httpd classserver in addition to zookeeper before starting those services. This change was made in an effort to keep CI clean when a previous CI run was broken, e.g., due to a timeout, and thus left some services running. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/build.xml Modified: branches/QUADS_QUERY_BRANCH/build.xml =================================================================== --- branches/QUADS_QUERY_BRANCH/build.xml 2011-02-03 23:40:35 UTC (rev 4177) +++ branches/QUADS_QUERY_BRANCH/build.xml 2011-02-04 13:01:01 UTC (rev 4178) @@ -1420,6 +1420,9 @@ <!-- around if the JVM is killed. That pid file needs to be cleaned --> <!-- up before we can start a new instance. --> <antcall target="stopZookeeper" /> + <antcall target="stopLookup" /> + <antcall target="stopHttpd" /> + <antcall target="startZookeeper" /> <antcall target="startHttpd" /> <antcall target="startLookup" /> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-03 23:40:41
|
Revision: 4177 http://bigdata.svn.sourceforge.net/bigdata/?rev=4177&view=rev Author: thompsonbry Date: 2011-02-03 23:40:35 +0000 (Thu, 03 Feb 2011) Log Message: ----------- Back port of the bug fix for https://sourceforge.net/apps/trac/bigdata/ticket/236 (Journal size doubles) to the trunk. Modified Paths: -------------- trunk/bigdata/src/java/com/bigdata/journal/FileMetadata.java trunk/bigdata/src/java/com/bigdata/journal/WORMStrategy.java Modified: trunk/bigdata/src/java/com/bigdata/journal/FileMetadata.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2011-02-03 13:04:48 UTC (rev 4176) +++ trunk/bigdata/src/java/com/bigdata/journal/FileMetadata.java 2011-02-03 23:40:35 UTC (rev 4177) @@ -113,6 +113,14 @@ final long userExtent; /** + * The initialExtent as declared to the constructor. This governs the file + * extension policy. + * + * @see Options#INITIAL_EXTENT + */ + final long initialExtent; + + /** * The #of bits out of a 64-bit long integer that are used to encode the * byte offset as an unsigned integer. The remaining bits are used to * encode the byte count (aka record length) as an unsigned integer. @@ -340,6 +348,8 @@ // this.readCacheCapacity = readCacheCapacity; // // this.readCacheMaxRecordSize = readCacheMaxRecordSize; + + this.initialExtent = initialExtent; this.writeCacheEnabled = writeCacheEnabled; Modified: trunk/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- trunk/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2011-02-03 13:04:48 UTC (rev 4176) +++ trunk/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2011-02-03 23:40:35 UTC (rev 4177) @@ -833,7 +833,9 @@ WORMStrategy(final long maximumExtent, final long minimumExtension, final FileMetadata fileMetadata) { - super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, + // @see https://sourceforge.net/apps/trac/bigdata/ticket/236 + super(fileMetadata.initialExtent, maximumExtent, fileMetadata.offsetBits, +// super(fileMetadata.extent, maximumExtent, fileMetadata.offsetBits, fileMetadata.nextOffset, fileMetadata.bufferMode, fileMetadata.readOnly); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2011-02-03 13:04:54
|
Revision: 4176 http://bigdata.svn.sourceforge.net/bigdata/?rev=4176&view=rev Author: thompsonbry Date: 2011-02-03 13:04:48 +0000 (Thu, 03 Feb 2011) Log Message: ----------- switched back to NOT spawning the zkServer start script since zk was not starting.... Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/build.xml Modified: branches/QUADS_QUERY_BRANCH/build.xml =================================================================== --- branches/QUADS_QUERY_BRANCH/build.xml 2011-02-02 18:59:56 UTC (rev 4175) +++ branches/QUADS_QUERY_BRANCH/build.xml 2011-02-03 13:04:48 UTC (rev 4176) @@ -1507,11 +1507,13 @@ <!-- Note: It appears to be necessary to spawn the script which starts --> <!-- zookeeper under some OS platforms (for example, Centos 5.4) even --> <!-- though it will start without being spawned under others (OSX). --> + <!-- However, this change appears to result in zookeeper not running --> + <!-- so I have backed it out for now. --> <target name="startZookeeper"> <echo message="test.zookeeper.installDir=${test.zookeeper.installDir}"/> <echo>bin/zkServer.sh start </echo> - <exec executable="bin/zkServer.sh" dir="${test.zookeeper.installDir}" spawn="true"> + <exec executable="bin/zkServer.sh" dir="${test.zookeeper.installDir}" logerror="true"> <arg value="start"/> </exec> </target> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |