This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2010-10-07 20:23:11
|
Revision: 3749 http://bigdata.svn.sourceforge.net/bigdata/?rev=3749&view=rev Author: thompsonbry Date: 2010-10-07 20:23:05 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Trying another code change. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 20:04:21 UTC (rev 3748) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 20:23:05 UTC (rev 3749) @@ -366,28 +366,8 @@ runState = new RunState(this); - statsMap = new ConcurrentHashMap<Integer, BOpStats>(); - - for (Map.Entry<Integer, BOp> e : bopIndex.entrySet()) { - - final int bopId = e.getKey(); - - final BOp tmp = e.getValue(); - - if ((tmp instanceof PipelineOp)) { - - final PipelineOp bop = (PipelineOp) tmp; - - statsMap.put(bopId, bop.newStats()); - - } - } + statsMap = createStatsMap(bopIndex); -// final BOpStats queryStats = query.newStats(); -// -// statsMap.put((Integer) query -// .getRequiredProperty(BOp.Annotations.BOP_ID), queryStats); - if (!query.isMutation()) { final BOpStats queryStats = statsMap.get(query.getId()); @@ -417,6 +397,41 @@ } /** + * Pre-populate a map with {@link BOpStats} objects for a query. + * + * @param bopIndex + * A map of the operators in the query which have assigned + * bopIds. + * + * @return A new map with an entry for each operator with a bopId which + * associates that operator with its {@link BOpStats} object. + */ + static private ConcurrentHashMap<Integer, BOpStats> createStatsMap( + final Map<Integer, BOp> bopIndex) { + + ConcurrentHashMap<Integer, BOpStats> statsMap = new ConcurrentHashMap<Integer, BOpStats>(); + + for (Map.Entry<Integer, BOp> e : bopIndex.entrySet()) { + + final int bopId = e.getKey(); + + final BOp tmp = e.getValue(); + + if ((tmp instanceof PipelineOp)) { + + final PipelineOp bop = (PipelineOp) tmp; + + statsMap.put(bopId, bop.newStats()); + + } + + } + + return statsMap; + + } + + /** * Take a chunk generated by some pass over an operator and make it * available to the target operator. How this is done depends on whether the * query is running against a standalone database or the scale-out database. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-07 20:04:28
|
Revision: 3748 http://bigdata.svn.sourceforge.net/bigdata/?rev=3748&view=rev Author: thompsonbry Date: 2010-10-07 20:04:21 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Another code reorganization. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 20:00:16 UTC (rev 3747) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 20:04:21 UTC (rev 3748) @@ -166,7 +166,7 @@ * * @see RunState */ - private final ReentrantLock lock; + private final ReentrantLock lock = new ReentrantLock(); /** * The run state of this query and <code>null</code> unless this is the @@ -357,10 +357,6 @@ this.query = query; - this.lock = new ReentrantLock(); - - this.runState = controller ? new RunState(this) : null; - this.bopIndex = BOpUtility.getIndex(query); /* @@ -368,6 +364,8 @@ */ if (controller) { + runState = new RunState(this); + statsMap = new ConcurrentHashMap<Integer, BOpStats>(); for (Map.Entry<Integer, BOp> e : bopIndex.entrySet()) { @@ -409,20 +407,13 @@ } else { - statsMap = null; + runState = null; // Note: only on the query controller. + statsMap = null; // Note: only on the query controller. + queryBuffer = null; // Note: only on the query controller. + queryIterator = null; // Note: only when queryBuffer is defined. - // Note: only exists on the query controller. - queryBuffer = null; - queryIterator = null; - } - // System.err - // .println("new RunningQuery:: queryId=" + queryId - // + ", isController=" + controller + ", queryController=" - // + clientProxy + ", queryEngine=" - // + queryEngine.getServiceUUID()); - } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-07 20:00:22
|
Revision: 3747 http://bigdata.svn.sourceforge.net/bigdata/?rev=3747&view=rev Author: thompsonbry Date: 2010-10-07 20:00:16 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Reorganized the code in an attempt to get around a weird CI problem. compile: [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/build.xml:76: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds [javac] Compiling 1398 source files to /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/ant-build/classes [javac] javac 1.6.0_17 [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:379: variable lock might already have been assigned [javac] lock = new ReentrantLock(); [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:381: variable runState might already have been assigned [javac] runState = controller ? new RunState(this) : null; [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:392: variable queryBuffer might already have been assigned [javac] queryBuffer = query.newBuffer(queryStats); [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:397: variable queryBuffer might already have been assigned [javac] queryBuffer = null; [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:404: variable queryBuffer might already have been assigned [javac] queryBuffer = null; [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:410: variable queryIterator might already have been assigned [javac] queryIterator = null; [javac] ^ [javac] /var/lib/hudson/jobs/Bigdata Quads Query Branch/workspace/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java:414: variable queryIterator might already have been assigned [javac] queryIterator = new QueryResultIterator<IBindingSet[]>(this, [javac] ^ [javac] Note: Some input files use or override a deprecated API. [javac] Note: Recompile with -Xlint:deprecation for details. [javac] Note: Some input files use unchecked or unsafe operations. [javac] Note: Recompile with -Xlint:unchecked for details. [javac] 7 errors Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 19:52:35 UTC (rev 3746) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 20:00:16 UTC (rev 3747) @@ -357,63 +357,64 @@ this.query = query; - bopIndex = BOpUtility.getIndex(query); + this.lock = new ReentrantLock(); + this.runState = controller ? new RunState(this) : null; + + this.bopIndex = BOpUtility.getIndex(query); + /* * Setup the BOpStats object for each pipeline operator in the query. */ if (controller) { + statsMap = new ConcurrentHashMap<Integer, BOpStats>(); + for (Map.Entry<Integer, BOp> e : bopIndex.entrySet()) { + final int bopId = e.getKey(); + final BOp tmp = e.getValue(); + if ((tmp instanceof PipelineOp)) { + final PipelineOp bop = (PipelineOp) tmp; + statsMap.put(bopId, bop.newStats()); + } } - } else { - statsMap = null; - } - lock = new ReentrantLock(); +// final BOpStats queryStats = query.newStats(); +// +// statsMap.put((Integer) query +// .getRequiredProperty(BOp.Annotations.BOP_ID), queryStats); - runState = controller ? new RunState(this) : null; + if (!query.isMutation()) { - if (controller) { + final BOpStats queryStats = statsMap.get(query.getId()); - final BOpStats queryStats = query.newStats(); + queryBuffer = query.newBuffer(queryStats); - statsMap.put((Integer) query - .getRequiredProperty(BOp.Annotations.BOP_ID), queryStats); + queryIterator = new QueryResultIterator<IBindingSet[]>(this, + queryBuffer.iterator()); - if (!query.isMutation()) { - - queryBuffer = query.newBuffer(queryStats); - } else { // Note: Not used for mutation queries. queryBuffer = null; + queryIterator = null; } } else { + statsMap = null; + // Note: only exists on the query controller. queryBuffer = null; - - } - - if (queryBuffer == null) { - queryIterator = null; - - } else { - - queryIterator = new QueryResultIterator<IBindingSet[]>(this, - queryBuffer.iterator()); - + } // System.err This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-07 19:52:41
|
Revision: 3746 http://bigdata.svn.sourceforge.net/bigdata/?rev=3746&view=rev Author: thompsonbry Date: 2010-10-07 19:52:35 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Kicking off another CI build in response to a weird error from Hudson. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 19:05:15 UTC (rev 3745) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-07 19:52:35 UTC (rev 3746) @@ -376,7 +376,6 @@ statsMap = null; } - // runStateLock = controller ? new ReentrantLock() : null; lock = new ReentrantLock(); runState = controller ? new RunState(this) : null; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-07 19:05:24
|
Revision: 3745 http://bigdata.svn.sourceforge.net/bigdata/?rev=3745&view=rev Author: thompsonbry Date: 2010-10-07 19:05:15 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Finished UNION and integrated it into the SAIL. The new decision tree rewrites of the rules to bops is now enabled by default. Groups the UNION, STEPS, STAR (not yet implemented), and JoinGraph (not yet implemented) operators under a "controller" package. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestCopyBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestJoinGraph.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/MockRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/join/TestPipelineJoin.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/controller/TestUnion.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/bset/TestUnionBindingSets.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/eval/ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -234,6 +234,8 @@ * {@link Annotations#TIMESTAMP} associated with that operation. * * @see #TIMESTAMP + * + * @todo Move to {@link IPredicate}? */ String MUTATION = BOp.class.getName() + ".mutation"; @@ -244,6 +246,8 @@ * reads or writes on the database (no default). * * @see #MUTATION + * + * @todo Move to {@link IPredicate}? */ String TIMESTAMP = BOp.class.getName() + ".timestamp"; @@ -256,6 +260,18 @@ BOpEvaluationContext DEFAULT_EVALUATION_CONTEXT = BOpEvaluationContext.ANY; /** + * A boolean annotation whose value indicates whether or not this is a + * control operator (default {@value #DEFAULT_CONTROLLER}). A control + * operator is an operator which will issue subqueries for its + * arguments. Thus control operators mark a boundary in pipelined + * evaluation. Some examples of control operators include UNION, STEPS, + * and STAR (aka transitive closure). + */ + String CONTROLLER = BOp.class.getName()+".controller"; + + boolean DEFAULT_CONTROLLER = false; + + /** * For hash partitioned operators, this is the set of the member nodes * for the operator. * <p> Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -610,6 +610,8 @@ * long gone. The problem with specifying a hashCode() and equals() method * for BOp/BOpBase/Predicate is that we wind up with duplicate bop * exceptions being reported by BOpUtility#getIndex(BOp). + * + * Note: Both Var and Constant override hashCode() and equals(). */ // /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContext.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -116,6 +116,8 @@ /** * Where to write the output of the operator. + * + * @see PipelineOp.Annotations#SINK_REF */ public final IBlockingBuffer<E[]> getSink() { return sink; @@ -125,6 +127,8 @@ * Optional alternative sink for the output of the operator. This is used by * things like SPARQL optional joins to route failed joins outside of the * join group. + * + * @see PipelineOp.Annotations#ALT_SINK_REF */ public final IBlockingBuffer<E[]> getSink2() { return sink2; @@ -180,18 +184,6 @@ * When doing that, modify to automatically track the {@link BOpStats} * as the <i>source</i> is consumed. */ -// * @throws IllegalArgumentException -// * if the <i>indexManager</i> is <code>null</code> -// * @throws IllegalArgumentException -// * if the <i>indexManager</i> is is not a <em>local</em> index -// * manager. -// * @throws IllegalArgumentException -// * if the <i>readTimestamp</i> is {@link ITx#UNISOLATED} -// * (queries may not read on the unisolated indices). -// * @throws IllegalArgumentException -// * if the <i>writeTimestamp</i> is neither -// * {@link ITx#UNISOLATED} nor a read-write transaction -// * identifier. public BOpContext(final IRunningQuery runningQuery,final int partitionId, final BOpStats stats, final IAsynchronousIterator<E[]> source, final IBlockingBuffer<E[]> sink, final IBlockingBuffer<E[]> sink2) { @@ -199,31 +191,12 @@ super(runningQuery.getFederation(), runningQuery.getIndexManager()); this.runningQuery = runningQuery; -// if (indexManager == null) -// throw new IllegalArgumentException(); -// if (indexManager instanceof IBigdataFederation<?>) { -// /* -// * This is disallowed because predicates always read on local index -// * objects, even in scale-out. -// */ -// throw new IllegalArgumentException( -// "Expecting a local index manager, not: " -// + indexManager.getClass().toString()); -// } -// if (readTimestamp == ITx.UNISOLATED) -// throw new IllegalArgumentException(); -// if (TimestampUtility.isReadOnly(writeTimestamp)) -// throw new IllegalArgumentException(); if (stats == null) throw new IllegalArgumentException(); if (source == null) throw new IllegalArgumentException(); if (sink == null) throw new IllegalArgumentException(); -// this.fed = fed; // may be null -// this.indexManager = indexManager; -// this.readTimestamp = readTimestamp; -// this.writeTimestamp = writeTimestamp; this.partitionId = partitionId; this.stats = stats; this.source = source; @@ -266,7 +239,7 @@ if (constraints != null) { // verify constraint. - return isConsistent(constraints, bindings); + return BOpUtility.isConsistent(constraints, bindings); } @@ -339,49 +312,6 @@ } - /** - * Check constraints. - * - * @param constraints - * @param bindingSet - * - * @return <code>true</code> iff the constraints are satisfied. - */ - public boolean isConsistent(final IConstraint[] constraints, - final IBindingSet bindingSet) { - - for (int i = 0; i < constraints.length; i++) { - - final IConstraint constraint = constraints[i]; - - if (!constraint.accept(bindingSet)) { - - if (log.isDebugEnabled()) { - - log.debug("Rejected by " - + constraint.getClass().getSimpleName() + " : " - + bindingSet); - - } - - return false; - - } - - if (log.isTraceEnabled()) { - - log.debug("Accepted by " - + constraint.getClass().getSimpleName() + " : " - + bindingSet); - - } - - } - - return true; - - } - // /** // * Cancel the running query (normal termination). // * <p> @@ -455,4 +385,40 @@ // // } + /** + * Copy data from the source to the sink. The sink will be flushed and + * closed. The source will be closed. + */ + public void copySourceToSink() { + + // source. + final IAsynchronousIterator<IBindingSet[]> source = (IAsynchronousIterator) getSource(); + + // default sink + final IBlockingBuffer<IBindingSet[]> sink = (IBlockingBuffer) getSink(); + + final BOpStats stats = getStats(); + + try { + + // copy binding sets from the source. + BOpUtility.copy(source, sink, null/* sink2 */, + null/* constraints */, stats); + + // flush the sink. + sink.flush(); + + } finally { + + sink.close(); + + if (sink2 != null) + sink2.close(); + + source.close(); + + } + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -27,6 +27,7 @@ package com.bigdata.bop; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -35,9 +36,13 @@ import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + import com.bigdata.bop.BOp.Annotations; import com.bigdata.bop.engine.BOpStats; import com.bigdata.btree.AbstractNode; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; import cutthecrap.utils.striterators.Expander; import cutthecrap.utils.striterators.Filter; @@ -52,7 +57,7 @@ */ public class BOpUtility { -// private static final Logger log = Logger.getLogger(BOpUtility.class); + private static final Logger log = Logger.getLogger(BOpUtility.class); /** * Pre-order recursive visitation of the operator tree (arguments only, no @@ -468,9 +473,51 @@ } + /** + * Return the left-deep child of the operator, halting at a leaf or earlier + * if a control operator is found. + * + * @param op + * The operator. + * + * @return The child where pipeline evaluation should begin. + * + * @throws IllegalArgumentException + * if the argument is <code>null</code>. + * + * @todo This does not protect against loops in the operator tree. + * + * @todo unit tests. + */ + static public BOp getPipelineStart(BOp op) { + if (op == null) + throw new IllegalArgumentException(); + + while (true) { + if (op.getProperty(BOp.Annotations.CONTROLLER, + BOp.Annotations.DEFAULT_CONTROLLER)) { + // Halt at a control operator. + return op; + } + if(op.arity()==0) { + // No children. + return op; + } + final BOp left = op.get(0); + if (left == null) { + // Halt at a leaf. + return op; + } + // Descend through the left child. + op = left; + } + + } + /** - * Combine chunks drawn from an iterator into a single chunk. + * Combine chunks drawn from an iterator into a single chunk. This is useful + * when materializing intermediate results for an all-at-once operator. * * @param itr * The iterator @@ -511,20 +558,34 @@ } if (nchunks == 0) { + return new IBindingSet[0]; + } else if (nchunks == 1) { + return list.get(0); + } else { + int n = 0; + final IBindingSet[] a = new IBindingSet[nelements]; + final Iterator<IBindingSet[]> itr2 = list.iterator(); + while (itr2.hasNext()) { + final IBindingSet[] t = itr2.next(); + System.arraycopy(t/* src */, 0/* srcPos */, a/* dest */, n/* destPos */, t.length/* length */); + n += t.length; + } + return a; + } } // toArray() @@ -618,4 +679,160 @@ // // } + /** + * Check constraints. + * + * @param constraints + * @param bindingSet + * + * @return <code>true</code> iff the constraints are satisfied. + */ + static public boolean isConsistent(final IConstraint[] constraints, + final IBindingSet bindingSet) { + + for (int i = 0; i < constraints.length; i++) { + + final IConstraint constraint = constraints[i]; + + if (!constraint.accept(bindingSet)) { + + if (log.isDebugEnabled()) { + + log.debug("Rejected by " + + constraint.getClass().getSimpleName() + " : " + + bindingSet); + + } + + return false; + + } + + if (log.isTraceEnabled()) { + + log.debug("Accepted by " + + constraint.getClass().getSimpleName() + " : " + + bindingSet); + + } + + } + + return true; + + } + + /** + * Copy binding sets from the source to the sink(s). + * + * @param source + * The source. + * @param sink + * The sink (required). + * @param sink2 + * Another sink (optional). + * @param constraints + * Binding sets which fail these constraints will NOT be copied + * (optional). + * @param stats + * The {@link BOpStats#chunksIn} and {@link BOpStats#unitsIn} + * will be updated during the copy (optional). + */ + static public void copy( + final IAsynchronousIterator<IBindingSet[]> source, + final IBlockingBuffer<IBindingSet[]> sink, + final IBlockingBuffer<IBindingSet[]> sink2, + final IConstraint[] constraints, final BOpStats stats) { + + while (source.hasNext()) { + + final IBindingSet[] chunk = source.next(); + + if (stats != null) { + + stats.chunksIn.increment(); + + stats.unitsIn.add(chunk.length); + + } + + // apply optional constraints. + final IBindingSet[] tmp = applyConstraints(chunk,constraints); + +// System.err.println("Copying: "+Arrays.toString(tmp)); + + // copy accepted binding sets to the default sink. + sink.add(tmp); + + if (sink2 != null) { + // copy accepted binding sets to the alt sink. + sink2.add(tmp); + } + + } + + } + + /** + * Return a dense array containing only those {@link IBindingSet}s which + * satisfy the constraints. + * + * @param chunk + * A chunk of binding sets. + * @param constraints + * The constraints (optional). + * + * @return The dense chunk of binding sets. + */ + static private IBindingSet[] applyConstraints(final IBindingSet[] chunk, + final IConstraint[] constraints) { + + if (constraints == null) { + + /* + * No constraints, copy all binding sets. + */ + + return chunk; + + } + + /* + * Copy binding sets which satisfy the constraint(s). + */ + + IBindingSet[] t = new IBindingSet[chunk.length]; + + int j = 0; + + for (int i = 0; i < chunk.length; i++) { + + final IBindingSet bindingSet = chunk[i]; + + if (BOpUtility.isConsistent(constraints, bindingSet)) { + + t[j++] = bindingSet; + + } + + } + + if (j != chunk.length) { + + // allocate exact size array. + final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array + .newInstance(chunk[0].getClass(), j); + + // make a dense copy. + System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */, + 0/* dstPos */, j/* len */); + + t = tmp; + + } + + return t; + + } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/NV.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -147,10 +147,10 @@ * * @param a * The array. - * + * * @return The map. */ - static public Map<String, Object> asMap(final NV[] a) { + static public Map<String, Object> asMap(final NV... a) { final Map<String, Object> tmp = new LinkedHashMap<String, Object>( a.length); @@ -162,7 +162,7 @@ } return tmp; - + } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/PipelineOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -56,7 +56,7 @@ /** * The value of the annotation is the {@link BOp.Annotations#BOP_ID} of * the ancestor in the operator tree which serves as the default sink - * for binding sets (default is the parent). + * for binding sets (optional, default is the parent). */ String SINK_REF = PipelineOp.class.getName() + ".sinkRef"; Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -1,208 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 25, 2010 - */ - -package com.bigdata.bop.bset; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.IConstraint; -import com.bigdata.bop.engine.BOpStats; -import com.bigdata.bop.engine.IChunkAccessor; -import com.bigdata.relation.accesspath.IAsynchronousIterator; -import com.bigdata.relation.accesspath.IBlockingBuffer; - -/** - * This operator copies its source to its sink. Specializations exist which are - * used to feed the the initial set of intermediate results into a pipeline ( - * {@link StartOp}) and which are used to replicate intermediate results to more - * than one sink ({@link Tee}). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class CopyBindingSetOp extends PipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public interface Annotations extends PipelineOp.Annotations { - - /** - * An optional {@link IConstraint}[] which places restrictions on the - * legal patterns in the variable bindings. - */ - String CONSTRAINTS = CopyBindingSetOp.class.getName() + ".constraints"; - - } - - /** - * Deep copy constructor. - * - * @param op - */ - public CopyBindingSetOp(CopyBindingSetOp op) { - super(op); - } - - /** - * Shallow copy constructor. - * - * @param args - * @param annotations - */ - public CopyBindingSetOp(BOp[] args, Map<String, Object> annotations) { - super(args, annotations); - } - - /** - * @see Annotations#CONSTRAINTS - */ - public IConstraint[] constraints() { - - return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - - return new FutureTask<Void>(new CopyTask(this, context)); - - } - - /** - * Copy the source to the sink. - * - * @todo Optimize this. When using an {@link IChunkAccessor} we should be - * able to directly output the same chunk. - */ - static private class CopyTask implements Callable<Void> { - - private final BOpContext<IBindingSet> context; - - /** - * The constraint (if any) specified for the join operator. - */ - final private IConstraint[] constraints; - - CopyTask(final CopyBindingSetOp op, - final BOpContext<IBindingSet> context) { - - this.context = context; - - this.constraints = op.constraints(); - - } - - public Void call() throws Exception { - final IAsynchronousIterator<IBindingSet[]> source = context - .getSource(); - final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); - final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2(); - try { - final BOpStats stats = context.getStats(); - while (source.hasNext()) { - final IBindingSet[] chunk = source.next(); - stats.chunksIn.increment(); - stats.unitsIn.add(chunk.length); - final IBindingSet[] tmp = applyConstraints(chunk); - sink.add(tmp); - if (sink2 != null) - sink2.add(tmp); - } - sink.flush(); - if (sink2 != null) - sink2.flush(); - return null; - } finally { - sink.close(); - if (sink2 != null) - sink2.close(); - source.close(); - } - } - - private IBindingSet[] applyConstraints(final IBindingSet[] chunk) { - - if (constraints == null) { - - /* - * No constraints, copy all binding sets. - */ - - return chunk; - - } - - /* - * Copy binding sets which satisfy the constraint(s). - */ - - IBindingSet[] t = new IBindingSet[chunk.length]; - - int j = 0; - - for (int i = 0; i < chunk.length; i++) { - - final IBindingSet bindingSet = chunk[i]; - - if (context.isConsistent(constraints, bindingSet)) { - - t[j++] = bindingSet; - - } - - } - - if (j != chunk.length) { - - // allocate exact size array. - final IBindingSet[] tmp = (IBindingSet[]) java.lang.reflect.Array - .newInstance(chunk[0].getClass(), j); - - // make a dense copy. - System.arraycopy(t/* src */, 0/* srcPos */, tmp/* dst */, - 0/* dstPos */, j/* len */); - - t = tmp; - - } - - return t; - - } - - } // class CopyTask - -} Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java (from rev 3706, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyBindingSetOp.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/CopyOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,203 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 25, 2010 + */ + +package com.bigdata.bop.bset; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.IConstraint; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.engine.BOpStats; +import com.bigdata.bop.engine.IChunkAccessor; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.IBlockingBuffer; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +/** + * This operator copies its source to its sink(s). Specializations exist which are + * used to feed the the initial set of intermediate results into a pipeline ( + * {@link StartOp}) and which are used to replicate intermediate results to more + * than one sink ({@link Tee}). + * + * @see Annotations#SINK_REF + * @see Annotations#ALT_SINK_REF + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class CopyOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * An optional {@link IConstraint}[] which places restrictions on the + * legal patterns in the variable bindings. + */ + String CONSTRAINTS = CopyOp.class.getName() + ".constraints"; + + /** + * An optional {@link IBindingSet}[] to be used <strong>instead</strong> + * of the default source. + */ + String BINDING_SETS = CopyOp.class.getName() + ".bindingSets"; + + } + + /** + * Deep copy constructor. + * + * @param op + */ + public CopyOp(CopyOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public CopyOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + /** + * @see Annotations#CONSTRAINTS + */ + public IConstraint[] constraints() { + + return getProperty(Annotations.CONSTRAINTS, null/* defaultValue */); + + } + + /** + * @see Annotations#BINDING_SETS + */ + public IBindingSet[] bindingSets() { + + return getProperty(Annotations.BINDING_SETS, null/* defaultValue */); + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new CopyTask(this, context)); + + } + + /** + * Copy the source to the sink. + * + * @todo Optimize this. When using an {@link IChunkAccessor} we should be + * able to directly output the same chunk. + */ + static private class CopyTask implements Callable<Void> { + + private final CopyOp op; + + private final BOpContext<IBindingSet> context; + + CopyTask(final CopyOp op, + final BOpContext<IBindingSet> context) { + + this.op = op; + + this.context = context; + + } + + public Void call() throws Exception { + + // source. + final IAsynchronousIterator<IBindingSet[]> source = context + .getSource(); + + // default sink + final IBlockingBuffer<IBindingSet[]> sink = context.getSink(); + + // optional altSink. + final IBlockingBuffer<IBindingSet[]> sink2 = context.getSink2(); + + final BOpStats stats = context.getStats(); + + final IConstraint[] constraints = op.constraints(); + + try { + + final IBindingSet[] bindingSets = op.bindingSets(); + + if (bindingSets != null) { + + // copy optional additional binding sets. + BOpUtility.copy( + new ThickAsynchronousIterator<IBindingSet[]>( + new IBindingSet[][] { bindingSets }), sink, + sink2, constraints, stats); + + } else { + + // copy binding sets from the source. + BOpUtility.copy(source, sink, sink2, constraints, stats); + + } + + // flush the sink. + sink.flush(); + if (sink2 != null) // and the optional altSink. + sink2.flush(); + + // Done. + return null; + + } finally { + + sink.close(); + + if (sink2 != null) + sink2.close(); + + source.close(); + + } + + } + + } // class CopyTask + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/StartOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -5,10 +5,10 @@ import com.bigdata.bop.BOp; /** - * A version of {@link CopyBindingSetOp} which is always evaluated on the query + * A version of {@link CopyOp} which is always evaluated on the query * controller. */ -public class StartOp extends CopyBindingSetOp { +public class StartOp extends CopyOp { /** * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Tee.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -32,6 +32,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.controller.Union; import com.bigdata.bop.join.PipelineJoin; import com.bigdata.rdf.rules.TMUtility; import com.bigdata.relation.RelationFusedView; @@ -87,7 +88,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -public class Tee extends CopyBindingSetOp { +public class Tee extends CopyOp { /** * Deleted: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -1,128 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Aug 18, 2010 - */ - -package com.bigdata.bop.bset; - -import java.util.Map; -import java.util.concurrent.FutureTask; - -import com.bigdata.bop.BOpContext; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.PipelineOp; - -/** - * UNION(ops)[maxParallel(default all)] - * <p> - * Executes each of the operands in the union as a subqueries. Each subquery is - * run as a separate query but is linked to the parent query in which the UNION - * is being evaluated. The subqueries do not receive bindings from the parent - * and may be executed independently. By default, the subqueries are run with - * unlimited parallelism. - * <p> - * Note: UNION runs on the query controller. The - * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be - * overridden to specify the parent of the UNION operator, thereby routing - * around the UNION operator itself. If you fail to do this, then the - * intermediate results of the subqueries will be routed through the UNION - * operator on the query controller. - * <p> - * UNION can not be used when intermediate results from other computations must - * be routed into subqueries. However, a {@link Tee} pattern may help in such - * cases. For example, a {@link Tee} may be used to create a union of pipeline - * joins for two access paths during truth maintenance. - * <p> - * For example: - * - * <pre> - * UNION([a,b,c],{}) - * </pre> - * - * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each - * subquery will be initialized with a single empty {@link IBindingSet}. The - * output of those subqueries will be routed to the UNION operator (their - * parent) unless the subqueries explicitly override this behavior using - * {@link PipelineOp.Annotations#SINK_REF}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class Union extends PipelineOp { - - /** - * - */ - private static final long serialVersionUID = 1L; - - /** - * @param args - * Two or more operators whose union is desired. - * @param annotations - */ - public Union(final PipelineOp[] args, - final Map<String, Object> annotations) { - - super(args, annotations); - - if (args.length < 2) - throw new IllegalArgumentException(); - - } - - public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { - -// return new FutureTask<Void>(new UnionTask(this, context)); - throw new UnsupportedOperationException(); - } - -// /** -// * Pipeline union impl. -// * -// * FIXME All this does is copy its inputs to its outputs. Since we only run -// * one chunk of input at a time, it seems that the easiest way to implement -// * a union is to have the operators in the union just target the same sink. -// */ -// private static class UnionTask extends Haltable<Void> implements Callable<Void> { -// -// public UnionTask(// -// final Union op,// -// final BOpContext<IBindingSet> context -// ) { -// -// if (op == null) -// throw new IllegalArgumentException(); -// if (context == null) -// throw new IllegalArgumentException(); -// } -// -// public Void call() throws Exception { -// // TODO Auto-generated method stub -// throw new UnsupportedOperationException(); -// } -// -// } - -} Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,360 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.BOpContext; +import com.bigdata.bop.BOpEvaluationContext; +import com.bigdata.bop.BOpUtility; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; +import com.bigdata.bop.bset.Tee; +import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.engine.RunningQuery; +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.util.concurrent.LatchedExecutor; + +/** + * Executes each of the operands as a subquery. The operands are evaluated in + * the order given and with the annotated parallelism. Each subquery is run as a + * separate query but is linked to the parent query in the operator is being + * evaluated. The subqueries do not receive bindings from the parent and may be + * executed independently. By default, the subqueries are run with unlimited + * parallelism. + * <p> + * Note: This operator must on the query controller. The + * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be + * overridden to specify the parent of the this operator. If you fail to do + * this, then the intermediate results of the subqueries will be routed to this + * operator, which DOES NOT pass them on. This may cause unnecessary network + * traffic. It may also cause the query to block if the buffer capacity is + * limited. + * <p> + * If you want to route intermediate results from other computations into + * subqueries, then consider a {@link Tee} pattern instead. + * <p> + * For example: + * + * <pre> + * SLICE[1]( + * UNION[2]([a{sinkRef=1},b{sinkRef=1},c{sinkRef=1}],{}) + * ) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries MUST be explicitly routed to the SLICE operator + * using {@link PipelineOp.Annotations#SINK_REF} on each of the subqueries. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +abstract public class AbstractSubqueryOp extends PipelineOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public interface Annotations extends PipelineOp.Annotations { + + /** + * The maximum parallelism with which the subqueries will be evaluated + * (default is unlimited). + */ + String MAX_PARALLEL = AbstractSubqueryOp.class.getName() + + ".maxParallel"; + + int DEFAULT_MAX_PARALLEL = Integer.MAX_VALUE; + + } + + /** + * @see Annotations#MAX_PARALLEL + */ + public int getMaxParallel() { + return getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + } + + /** + * Deep copy constructor. + */ + public AbstractSubqueryOp(final AbstractSubqueryOp op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * @param annotations + */ + public AbstractSubqueryOp(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + if (!getEvaluationContext().equals(BOpEvaluationContext.CONTROLLER)) + throw new IllegalArgumentException(Annotations.EVALUATION_CONTEXT + + "=" + getEvaluationContext()); + + if (!getProperty(Annotations.CONTROLLER, Annotations.DEFAULT_CONTROLLER)) + throw new IllegalArgumentException(Annotations.CONTROLLER); + +// // The id of this operator (if any). +// final Integer thisId = (Integer)getProperty(Annotations.BOP_ID); +// +// for(BOp op : args) { +// +// final Integer sinkId = (Integer) op +// .getRequiredProperty(Annotations.SINK_REF); +// +// if(sinkId.equals(thisId)) +// throw new RuntimeException("Operand may not target ") +// +// } + + } + + public FutureTask<Void> eval(final BOpContext<IBindingSet> context) { + + return new FutureTask<Void>(new ControllerTask(this, context)); + + } + + /** + * Evaluates the arguments of the operator as subqueries. The arguments are + * evaluated in order. An {@link Executor} with limited parallelism to + * evaluate the arguments. If the controller operator is interrupted, then + * the subqueries are cancelled. If a subquery fails, then all subqueries + * are cancelled. + */ + private static class ControllerTask implements Callable<Void> { + + private final AbstractSubqueryOp controllerOp; + private final BOpContext<IBindingSet> context; + private final List<FutureTask<RunningQuery>> tasks = new LinkedList<FutureTask<RunningQuery>>(); + private final CountDownLatch latch; + private final int nparallel; + private final Executor executor; + + public ControllerTask(final AbstractSubqueryOp controllerOp, final BOpContext<IBindingSet> context) { + + if (controllerOp == null) + throw new IllegalArgumentException(); + + if (context == null) + throw new IllegalArgumentException(); + + this.controllerOp = controllerOp; + + this.context = context; + + this.nparallel = controllerOp.getProperty(Annotations.MAX_PARALLEL, + Annotations.DEFAULT_MAX_PARALLEL); + + this.executor = new LatchedExecutor(context.getIndexManager() + .getExecutorService(), nparallel); + + this.latch = new CountDownLatch(controllerOp.arity()); + + /* + * Create FutureTasks for each subquery. The futures are submitted + * to the Executor yet. That happens in call(). By deferring the + * evaluation until call() we gain the ability to cancel all + * subqueries if any subquery fails. + */ + for (BOp op : controllerOp.args()) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] if + * it fails. + */ + tasks.add(new FutureTask<RunningQuery>(new SubqueryTask(op, + context)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }); + + } + + } + + /** + * Evaluate the subqueries with limited parallelism. + */ + public Void call() throws Exception { + + try { + + /* + * Run subqueries with limited parallelism. + */ + for (FutureTask<RunningQuery> ft : tasks) { + executor.execute(ft); + } + + /* + * Close the source. Controllers do not accept inputs from the + * pipeline. + */ + context.getSource().close(); + + /* + * Wait for all subqueries to complete. + */ + latch.await(); + + /* + * Get the futures, throwing out any errors. + */ + for (FutureTask<RunningQuery> ft : tasks) + ft.get(); + + // Now that we know the subqueries ran Ok, flush the sink. + context.getSink().flush(); + + // Done. + return null; + + } finally { + + // Cancel any tasks which are still running. + cancelTasks(); + + context.getSink().close(); + + if (context.getSink2() != null) + context.getSink2().close(); + + } + + } + + /** + * Cancel any running tasks. + */ + private void cancelTasks() { + + for (FutureTask<RunningQuery> ft : tasks) + ft.cancel(true/* mayInterruptIfRunning */); + + } + + /** + * Run a subquery. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class SubqueryTask implements Callable<RunningQuery> { + + /** + * The evaluation context for the parent query. + */ + private final BOpContext<IBindingSet> parentContext; + + /** + * The root operator for the subquery. + */ + private final BOp subQueryOp; + + public SubqueryTask(final BOp subQuery, + final BOpContext<IBindingSet> parentContext) { + + this.subQueryOp = subQuery; + + this.parentContext = parentContext; + + } + + public RunningQuery call() throws Exception { + + IAsynchronousIterator<IBindingSet[]> subquerySolutionItr = null; + try { + + final QueryEngine queryEngine = parentContext.getRunningQuery() + .getQueryEngine(); + + final RunningQuery runningQuery = queryEngine + .eval(subQueryOp); + + // Iterator visiting the subquery solutions. + subquerySolutionItr = runningQuery.iterator(); + + // Copy solutions from the subquery to the query. + BOpUtility.copy(subquerySolutionItr, parentContext + .getSink(), null/* sink2 */, null/* constraints */, + null/* stats */); + + // wait for the subquery. + runningQuery.get(); + + // done. + return runningQuery; + + } catch (Throwable t) { + + // If a subquery fails, then cancel all of the subqueries. + ControllerTask.this.cancelTasks(); + + // rethrow the exception. + throw new RuntimeException(t); + + } finally { + + if (subquerySolutionItr != null) + subquerySolutionItr.close(); + + } + + } + + } // SubqueryTask + + } // ControllerTask + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/AbstractSubqueryOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/eval/JoinGraph.java 2010-09-30 20:39:15 UTC (rev 3706) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/JoinGraph.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -25,7 +25,7 @@ * Created on Aug 16, 2010 */ -package com.bigdata.bop.eval; +package com.bigdata.bop.controller; import java.io.Serializable; import java.util.LinkedList; Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,84 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; + +/** + * STEPS(ops) + * + * <pre> + * STEPS([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in sequence. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the STEPS operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Steps extends AbstractSubqueryOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + */ + public Steps(Steps op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * Two or more operators whose union is desired. + * @param annotations + */ + public Steps(final BOp[] args, + final Map<String, Object> annotations) { + + super(args, annotations); + + if (getMaxParallel() != 1) + throw new IllegalArgumentException(Annotations.MAX_PARALLEL + "=" + + getMaxParallel()); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Steps.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Copied: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java (from rev 3740, branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java) =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/controller/Union.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -0,0 +1,83 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Aug 18, 2010 + */ + +package com.bigdata.bop.controller; + +import java.util.Map; + +import com.bigdata.bop.BOp; +import com.bigdata.bop.IBindingSet; +import com.bigdata.bop.PipelineOp; + +/** + * UNION(ops)[maxParallel(default all)] + * <pre> + * UNION([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the UNION operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class Union extends AbstractSubqueryOp { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Deep copy constructor. + * + * @param op + */ + public Union(final Union op) { + super(op); + } + + /** + * Shallow copy constructor. + * + * @param args + * Two or more operators whose union is desired. + * @param annotations + */ + public Union(final BOp[] args, final Map<String, Object> annotations) { + + super(args, annotations); + + if (args.length < 2) + throw new IllegalArgumentException(); + + } + +} Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/IRunningQuery.java 2010-10-07 19:05:15 UTC (rev 3745) @@ -56,6 +56,11 @@ IIndexManager getIndexManager(); /** + * The query engine. This may be used to submit subqueries for evaluation. + */ + QueryEngine getQueryEngine(); + + /** * Cancel the running query (normal termination). * <p> * Note: This method provides a means for an operator to indicate that the Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-07 16:19:16 UTC (rev 3744) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-07 ... [truncated message content] |
From: <tho...@us...> - 2010-10-07 16:19:22
|
Revision: 3744 http://bigdata.svn.sourceforge.net/bigdata/?rev=3744&view=rev Author: thompsonbry Date: 2010-10-07 16:19:16 +0000 (Thu, 07 Oct 2010) Log Message: ----------- Corrected the error message to indicate the use of "file:" to locate the log4j configuration file. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java 2010-10-06 16:59:41 UTC (rev 3743) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/config/LogUtil.java 2010-10-07 16:19:16 UTC (rev 3744) @@ -25,9 +25,6 @@ package com.bigdata.util.config; -import java.util.Set; -import java.util.HashSet; - import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.DOMConfigurator; @@ -66,11 +63,11 @@ System.out.println (" set system property " +"'-Dlog4j.configuration=" - +"bigdata/src/resources/logging/log4j.properties" + +"file:bigdata/src/resources/logging/log4j.properties" +"\n and/or \n" +" set system property " +"'-Dlog4j.primary.configuration=" - +"<installDir>/" + +"file:<installDir>/" +"bigdata/src/resources/logging/log4j.properties'"); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ble...@us...> - 2010-10-06 16:59:48
|
Revision: 3743 http://bigdata.svn.sourceforge.net/bigdata/?rev=3743&view=rev Author: blevine218 Date: 2010-10-06 16:59:41 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Cleaning up log output during unit tests - Removed some hard-coded log levels - Removed some printlns - Reduced some log levels from DEBUG or TRACE to ERROR Modified Paths: -------------- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/boot/BootComponentTest.java branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/cache/TestHardReferenceQueue.java branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/util/TestEntryUtil.java Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/boot/BootComponentTest.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/boot/BootComponentTest.java 2010-10-06 16:36:38 UTC (rev 3742) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/boot/BootComponentTest.java 2010-10-06 16:59:41 UTC (rev 3743) @@ -73,9 +73,12 @@ // @BeforeClass public static void initAll() { public static void initAll() { - System.setProperty("log4j.configuration", DataFinder.bestPath("var/config/logging/log4j.properties") ); - logger = LogUtil.getLog4jLogger - ( (BootComponentTest.class).getName() ); + String log4jConfigFile = System.getProperty("log4j.configuration"); + + if (log4jConfigFile == null) { + System.setProperty("log4j.configuration", DataFinder.bestPath("var/config/logging/log4j.properties") ); + } + logger = LogUtil.getLog4jLogger( (BootComponentTest.class).getName() ); } // Kills any lingering processes Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/cache/TestHardReferenceQueue.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/cache/TestHardReferenceQueue.java 2010-10-06 16:36:38 UTC (rev 3742) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/cache/TestHardReferenceQueue.java 2010-10-06 16:59:41 UTC (rev 3743) @@ -80,7 +80,7 @@ new HardReferenceQueue<String>(new MyListener<String>(), 0); fail("Expecting: " + IllegalArgumentException.class); } catch (IllegalArgumentException ex) { - System.err.println("Ignoring expectedRefs exception: " + ex); + //System.err.println("Ignoring expectedRefs exception: " + ex); } } @@ -100,7 +100,7 @@ cache.add(null); fail("Expecting: " + IllegalArgumentException.class); } catch (IllegalArgumentException ex) { - System.err.println("Ignoring expectedRefs exception: " + ex); + //System.err.println("Ignoring expectedRefs exception: " + ex); } } Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/util/TestEntryUtil.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/util/TestEntryUtil.java 2010-10-06 16:36:38 UTC (rev 3742) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/util/TestEntryUtil.java 2010-10-06 16:59:41 UTC (rev 3743) @@ -20,6 +20,8 @@ import org.junit.Test; public class TestEntryUtil { + private static Level TEST_LOG_LEVEL = Level.ERROR; + @Test public void testGetEntryByType_with_null_args() throws SecurityException, NoSuchMethodException, @@ -75,14 +77,14 @@ public void testDisplayEntryEntryLogger() { EntryUtil.displayEntry( new Address(), - getLevelLogger(Level.DEBUG)); + getLevelLogger(TEST_LOG_LEVEL)); } @Test public void testDisplayEntryEntryStringLogger() { EntryUtil.displayEntry( new Location(), "Label", - getLevelLogger(Level.DEBUG)); + getLevelLogger(TEST_LOG_LEVEL)); } @Test public void testDisplayEntryStringEntryStringLogger() { @@ -90,7 +92,7 @@ "Prefix", new Comment("This is a comment."), "Label", - getLevelLogger(Level.DEBUG)); + getLevelLogger(TEST_LOG_LEVEL)); } @Test public void testDisplayEntryStringEntryStringLogger_null() { @@ -98,7 +100,7 @@ null, null, null, - getLevelLogger(Level.DEBUG)); + getLevelLogger(TEST_LOG_LEVEL)); } private static void assertNotEquivalentEntries(Entry entry1, Entry entry2) { @@ -106,7 +108,7 @@ EntryUtil.compareEntries( entry1, entry2, - getLevelLogger(Level.TRACE))); + getLevelLogger(TEST_LOG_LEVEL))); } @Test public void testCompareEntries_not_equal_null() { @@ -135,7 +137,7 @@ EntryUtil.compareEntries( entry1, entry2, - getLevelLogger(Level.TRACE))); + getLevelLogger(TEST_LOG_LEVEL))); } @Test public void testCompareEntries_equal_null() { @@ -196,7 +198,7 @@ EntryUtil.compareEntrySets("Equivalent", entries1, entries2, - getLevelLogger(Level.TRACE))); + getLevelLogger(TEST_LOG_LEVEL))); } private static void assertNotEquivalentSets(Entry[] entries1, Entry[] entries2) { @@ -204,7 +206,7 @@ EntryUtil.compareEntrySets("Not equivalent", entries1, entries2, - getLevelLogger(Level.TRACE))); + getLevelLogger(TEST_LOG_LEVEL))); } @Test public void testCompareEntrySets_unequiv_null() { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ble...@us...> - 2010-10-06 16:36:44
|
Revision: 3742 http://bigdata.svn.sourceforge.net/bigdata/?rev=3742&view=rev Author: blevine218 Date: 2010-10-06 16:36:38 +0000 (Wed, 06 Oct 2010) Log Message: ----------- remove hard-coded log levels Modified Paths: -------------- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/btree/TestRemoveAll.java Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/btree/TestRemoveAll.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/btree/TestRemoveAll.java 2010-10-06 15:32:00 UTC (rev 3741) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/btree/TestRemoveAll.java 2010-10-06 16:36:38 UTC (rev 3742) @@ -88,14 +88,14 @@ } - assertTrue(btree.dump(Level.DEBUG,System.err)); + assertTrue(btree.dump(Level.ERROR,System.err)); assertSameIterator(new Object[] { v1, v2, v3, v4, v5, v6, v7, v8 }, btree.rangeIterator()); btree.removeAll(); - assertTrue(btree.dump(Level.DEBUG,System.err)); + assertTrue(btree.dump(Level.ERROR,System.err)); assertSameIterator(new Object[] {}, btree.rangeIterator()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ble...@us...> - 2010-10-06 15:32:06
|
Revision: 3741 http://bigdata.svn.sourceforge.net/bigdata/?rev=3741&view=rev Author: blevine218 Date: 2010-10-06 15:32:00 +0000 (Wed, 06 Oct 2010) Log Message: ----------- remove hard-coded log levels Modified Paths: -------------- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/StressTestConcurrent.java branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/TestMove.java Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/StressTestConcurrent.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/StressTestConcurrent.java 2010-10-06 14:44:41 UTC (rev 3740) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/StressTestConcurrent.java 2010-10-06 15:32:00 UTC (rev 3741) @@ -779,9 +779,6 @@ private void setupLBSForMove(final IDataService targetService) throws IOException { - // explicitly set the log level for the load balancer. - LoadBalancerService.log.setLevel(Level.INFO); -EmbeddedLoadBalancer.logger.setLevel(Level.INFO); //BTM final AbstractEmbeddedLoadBalancerService lbs = ((AbstractEmbeddedLoadBalancerService) ((EmbeddedFederation<?>) fed) //BTM .getLoadBalancerService()); Modified: branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/TestMove.java =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/TestMove.java 2010-10-06 14:44:41 UTC (rev 3740) +++ branches/maven_scaleout/bigdata-core/src/test/java/com/bigdata/service/TestMove.java 2010-10-06 15:32:00 UTC (rev 3741) @@ -289,9 +289,6 @@ System.err.println("Setting up LBS for move."); - // explicitly set the log level for the load balancer. - LoadBalancerService.log.setLevel(Level.INFO); -EmbeddedLoadBalancer.logger.setLevel(Level.INFO); //BTM final AbstractEmbeddedLoadBalancerService lbs = ((AbstractEmbeddedLoadBalancerService) ((EmbeddedFederation) fed) //BTM .getLoadBalancerService()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-06 14:44:49
|
Revision: 3740 http://bigdata.svn.sourceforge.net/bigdata/?rev=3740&view=rev Author: thompsonbry Date: 2010-10-06 14:44:41 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Added support for PipelineOp.Annotations#SINK_REF (to override the default sink in support of routing around a union operator). Modified RunningQuery to permit the default sink and the optional sink to target the same operator. Modified the FederatedQueryEngine to use a thread pool to materialize chunks. Added unit tests for optional joins for the query engine and federated query engine. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/bset/Union.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -28,17 +28,11 @@ package com.bigdata.bop.bset; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; -import com.bigdata.bop.BOp; import com.bigdata.bop.BOpContext; +import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.IBindingSet; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.rdf.rules.TMUtility; -import com.bigdata.relation.RelationFusedView; -import com.bigdata.util.concurrent.Haltable; /** * UNION(ops)[maxParallel(default all)] @@ -49,19 +43,30 @@ * and may be executed independently. By default, the subqueries are run with * unlimited parallelism. * <p> - * UNION is useful when independent queries are evaluated and their outputs are - * merged. Outputs from the UNION operator flow to the parent operator and will - * be mapped across shards or nodes as appropriate for the parent. UNION runs on - * the query controller. In order to avoid routing intermediate results through - * the controller, the {@link PipelineOp.Annotations#SINK_REF} of each - * child operand should be overridden to specify the parent of the UNION - * operator. + * Note: UNION runs on the query controller. The + * {@link PipelineOp.Annotations#SINK_REF} of each child operand should be + * overridden to specify the parent of the UNION operator, thereby routing + * around the UNION operator itself. If you fail to do this, then the + * intermediate results of the subqueries will be routed through the UNION + * operator on the query controller. * <p> - * UNION can not be used when the intermediate results must be routed into the - * subqueries. However, a {@link Tee} pattern may help in such cases. For - * example, a {@link Tee} may be used to create a union of pipeline joins for - * two access paths during truth maintenance. + * UNION can not be used when intermediate results from other computations must + * be routed into subqueries. However, a {@link Tee} pattern may help in such + * cases. For example, a {@link Tee} may be used to create a union of pipeline + * joins for two access paths during truth maintenance. + * <p> + * For example: * + * <pre> + * UNION([a,b,c],{}) + * </pre> + * + * Will run the subqueries <i>a</i>, <i>b</i>, and <i>c</i> in parallel. Each + * subquery will be initialized with a single empty {@link IBindingSet}. The + * output of those subqueries will be routed to the UNION operator (their + * parent) unless the subqueries explicitly override this behavior using + * {@link PipelineOp.Annotations#SINK_REF}. + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -393,6 +393,12 @@ } + protected void execute(final Runnable r) { + + localIndexManager.getExecutorService().execute(r); + + } + /** * Runnable submits chunks available for evaluation against running queries. * @@ -438,7 +444,7 @@ if (log.isDebugEnabled()) log.debug("Running chunk: " + chunk); // execute task. - localIndexManager.getExecutorService().execute(ft); + execute(ft); } catch (RejectedExecutionException ex) { // shutdown of the pool (should be an unbounded // pool). Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -873,8 +873,7 @@ * target it with a message. (The sink will be null iff there is no * parent for this operator.) */ - sinkId = p == null ? null : (Integer) p - .getRequiredProperty(PipelineOp.Annotations.BOP_ID); + sinkId = getEffectiveDefaultSink(bop, p); // altSink (null when not specified). altSinkId = (Integer) op @@ -889,12 +888,12 @@ + bop); } - if (sinkId != null && altSinkId != null - && sinkId.intValue() == altSinkId.intValue()) { - throw new RuntimeException( - "The primary and alternative sink may not be the same operator: " - + bop); - } +// if (sinkId != null && altSinkId != null +// && sinkId.intValue() == altSinkId.intValue()) { +// throw new RuntimeException( +// "The primary and alternative sink may not be the same operator: " +// + bop); +// } /* * Setup the BOpStats object. For some operators, e.g., SliceOp, @@ -932,6 +931,38 @@ } /** + * Return the effective default sink. + * + * @param bop + * The operator. + * @param p + * The parent of that operator, if any. + */ + private Integer getEffectiveDefaultSink(final BOp bop, final BOp p) { + + if (bop == null) + throw new IllegalArgumentException(); + + Integer sink; + + // Explictly specified sink? + sink = (Integer) bop.getProperty(PipelineOp.Annotations.SINK_REF); + + if (sink == null) { + if (p == null) { + // No parent, so no sink. + return null; + } + // The parent is the sink. + sink = (Integer) p + .getRequiredProperty(PipelineOp.Annotations.BOP_ID); + } + + return sink; + + } + + /** * Evaluate the {@link IChunkMessage}. */ public void run() { Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -31,13 +31,10 @@ import java.nio.ByteBuffer; import java.rmi.RemoteException; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -99,23 +96,23 @@ */ private final IQueryClient clientProxy; - /** - * A queue of {@link IChunkMessage}s which needs to have their data - * materialized so an operator can consume those data on this node. - * This queue is drained by the {@link MaterializeChunksTask}. - */ - final private BlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new LinkedBlockingQueue<IChunkMessage<?>>(); +// /** +// * A queue of {@link IChunkMessage}s which needs to have their data +// * materialized so an operator can consume those data on this node. +// * This queue is drained by the {@link MaterializeChunksTask}. +// */ +// final private BlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new LinkedBlockingQueue<IChunkMessage<?>>(); /** - * The service on which we run {@link MaterializeChunksTask}. This is + * The service used to accept {@link IChunkMessage} for evaluation. This is * started by {@link #init()}. */ - private final AtomicReference<ExecutorService> materializeChunksService = new AtomicReference<ExecutorService>(); + private final AtomicReference<ExecutorService> acceptTaskService = new AtomicReference<ExecutorService>(); - /** - * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. - */ - private final AtomicReference<FutureTask<Void>> materializeChunksFuture = new AtomicReference<FutureTask<Void>>(); +// /** +// * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. +// */ +// private final AtomicReference<FutureTask<Void>> acceptMessageTaskFuture = new AtomicReference<FutureTask<Void>>(); @Override public UUID getServiceUUID() { @@ -229,17 +226,16 @@ /* * The proxy for this query engine when used as a query controller. * + * Note: DGC is relied on to clean up the exported proxy when the + * query engine dies. * - * Should the data services expose their query engine in this - * manner? - * - * @todo We need to unexport the proxy as well when the service is - * shutdown. This should follow the same pattern as DataService -> - * DataServer. E.g., a QueryEngineServer class. + * @todo There should be an explicit "QueryEngineServer" which is + * used as the front end for SPARQL queries. It should have an + * explicitly configured Exporter for its proxy. */ this.clientProxy = (IQueryClient) ((JiniFederation<?>) fed) - .getProxy(this, false/* enableDGC */); + .getProxy(this, true/* enableDGC */); } else { @@ -275,26 +271,11 @@ public void init() { super.init(); - - final FutureTask<Void> ft = new FutureTask<Void>( - new MaterializeChunksTask(), (Void) null); - - if (materializeChunksFuture.compareAndSet(null/* expect */, ft)) { - materializeChunksService.set(Executors - .newSingleThreadExecutor(new DaemonThreadFactory( - FederatedQueryEngine.class - + ".materializeChunksService"))); + acceptTaskService.set(Executors + .newCachedThreadPool(new DaemonThreadFactory( + FederatedQueryEngine.class + ".acceptService"))); -// getIndexManager().getExecutorService().execute(ft); - materializeChunksService.get().execute(ft); - - } else { - - throw new IllegalStateException("Already running"); - - } - } /** @@ -305,21 +286,14 @@ @Override protected void didShutdown() { - // stop materializing chunks. - final Future<?> f = materializeChunksFuture.get(); - if (f != null) { - f.cancel(true/* mayInterruptIfRunning */); - } - - // stop the service on which we ran the MaterializeChunksTask. - final ExecutorService s = materializeChunksService.get(); + // stop the service which is accepting messages. + final ExecutorService s = acceptTaskService.get(); if (s != null) { s.shutdownNow(); } // Clear the references. - materializeChunksFuture.set(null); - materializeChunksService.set(null); + acceptTaskService.set(null); } @@ -331,63 +305,50 @@ @Override public void shutdownNow() { - // stop materializing chunks. - final Future<?> f = materializeChunksFuture.get(); - if (f != null) - f.cancel(true/* mayInterruptIfRunning */); + // stop the service which is accepting messages. + final ExecutorService s = acceptTaskService.get(); + if (s != null) { + s.shutdownNow(); + } + // Clear the references. + acceptTaskService.set(null); + super.shutdownNow(); } /** - * Runnable materializes chunks and makes them available for further - * processing. + * Materialize an {@link IChunkMessage} for processing and place it on the + * queue of accepted messages. * - * @todo multiple threads for materializing chunks, not just one. can - * be multiple {@link MaterializeChunksTask}s running. + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> */ - private class MaterializeChunksTask implements Runnable { + private class MaterializeMessageTask implements Runnable { + private final IChunkMessage<?> msg; + + public MaterializeMessageTask(final IChunkMessage<?> msg) { + this.msg = msg; + } + public void run() { - if(log.isInfoEnabled()) - log.info("running: " + this); - while (true) { - try { - final IChunkMessage<?> msg = chunkMaterializationQueue.take(); - if(log.isDebugEnabled()) - log.debug("msg=" + msg); - try { - if(!accept(msg)) { - if(log.isDebugEnabled()) - log.debug("dropping: " + msg); - continue; - } - if(log.isDebugEnabled()) - log.debug("accepted: " + msg); - /* - * @todo The type warning here is because the rest of - * the API does not know what to do with messages for - * chunks other than IBindingSet[], e.g., IElement[], - * etc. - */ - FederatedQueryEngine.this - .acceptChunk((IChunkMessage) msg); - } catch(Throwable t) { - if(InnerCause.isInnerCause(t, InterruptedException.class)) { - log.warn("Interrupted."); - return; - } - throw new RuntimeException(t); - } - } catch (InterruptedException e) { + try { + if (!accept(msg)) { + if (log.isDebugEnabled()) + log.debug("dropping: " + msg); + return; + } + if (log.isDebugEnabled()) + log.debug("accepted: " + msg); + FederatedQueryEngine.this.acceptChunk((IChunkMessage) msg); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { log.warn("Interrupted."); return; - } catch (Throwable ex) { - // log and continue - log.error(ex, ex); - continue; } + throw new RuntimeException(t); } } @@ -410,59 +371,30 @@ if (q == null) { - /* - * This code path handles the message the first time a chunk is - * observed on a node for a query. Since we do not broadcast the - * query to all nodes, the node has to resolve the query from the - * query controller. - * - * @todo Track recently terminated queries and do not recreate them. - */ - // true iff this is the query controller final boolean isController = getServiceUUID().equals( msg.getQueryController().getServiceUUID()); - - if(isController) { + + if (isController) { /* * @todo This would indicate that the query had been * concurrently terminated and cleared from the set of - * runningQueries and that we were not retaining metadata about - * queries which had been terminated. + * runningQueries and that we were not retaining metadata + * about queries which had been terminated. */ throw new AssertionError( "Query not running on controller: thisService=" + getServiceUUID() + ", msg=" + msg); } - /* - * Request the query from the query controller (RMI). - * - * @todo RMI is too expensive. Apply a memoizer pattern to avoid - * race conditions. - */ - final PipelineOp query = msg.getQueryController() - .getQuery(msg.getQueryId()); - - q = newRunningQuery(FederatedQueryEngine.this, queryId, - false/* controller */, msg.getQueryController(), query); - - final RunningQuery tmp = runningQueries.putIfAbsent(queryId, q); + // Get the query declaration from the query controller. + q = getDeclaredQuery(queryId); - if(tmp != null) { - - // another thread won this race. - q = (FederatedRunningQuery) tmp; - - } - } -// if(q == null) -// throw new RuntimeException(ERR_QUERY_NOT_RUNNING + queryId); - if (!q.isCancelled() && !msg.isMaterialized()) { + // materialize the chunk for this message. msg.materialize(q); } @@ -470,9 +402,44 @@ return !q.isCancelled(); } - - } // MaterializeChunksTask + /** + * This code path handles the message the first time a chunk is observed + * on a node for a query. Since we do not broadcast the query to all + * nodes, the node has to resolve the query from the query controller. + * + * @throws RemoteException + * + * @todo Track recently terminated queries and do not recreate them. + */ + private FederatedRunningQuery getDeclaredQuery(final UUID queryId) + throws RemoteException { + + /* + * Request the query from the query controller (RMI). + */ + final PipelineOp query = msg.getQueryController().getQuery( + msg.getQueryId()); + + FederatedRunningQuery q = newRunningQuery( + FederatedQueryEngine.this, queryId, false/* controller */, + msg.getQueryController(), query); + + final RunningQuery tmp = runningQueries.putIfAbsent(queryId, q); + + if (tmp != null) { + + // another thread won this race. + q = (FederatedRunningQuery) tmp; + + } + + return q; + + } + + } + public void declareQuery(final IQueryDecl queryDecl) { final UUID queryId = queryDecl.getQueryId(); @@ -489,11 +456,22 @@ if (msg == null) throw new IllegalArgumentException(); + if(log.isDebugEnabled()) + log.debug("msg=" + msg); + assertRunning(); - // queue up message to be materialized or otherwise handled later. - chunkMaterializationQueue.add(msg); + /* + * Schedule task to materialized or otherwise handle the message. + */ + + final Executor s = acceptTaskService.get(); + if (s == null) + throw new RuntimeException("Not running"); + + s.execute(new MaterializeMessageTask(msg)); + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -44,9 +44,7 @@ import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; -import com.bigdata.bop.PipelineOp; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; import com.bigdata.bop.IBindingSet; @@ -55,12 +53,14 @@ import com.bigdata.bop.IVariable; import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; +import com.bigdata.bop.PipelineOp; import com.bigdata.bop.Var; import com.bigdata.bop.ap.E; import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.ConditionalRoutingOp; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQ; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.fed.TestFederatedQueryEngine; import com.bigdata.bop.join.PipelineJoin; @@ -1143,7 +1143,6 @@ BOpEvaluationContext.CONTROLLER),// })); - // @todo the KEY_ORDER should be bound before evaluation. final Predicate<?> pred1Op = new Predicate<E>(new IVariableOrConstant[] { Var.var("x"), Var.var("y") }, NV .asMap(new NV[] {// @@ -1411,16 +1410,212 @@ return nsuccess; } - + /** - * @todo Write unit tests for optional joins, including where an alternative - * sink is specified in the {@link BOpContext} and is used when the - * join fails. + * Unit test for optional join. Two joins are used and target a + * {@link SliceOp}. The 2nd join is marked as optional. Intermediate results + * which do not succeed on the optional join are forwarded to the + * {@link SliceOp} which is the target specified by the + * {@link PipelineOp.Annotations#ALT_SINK_REF}. + * + * @todo Write unit test for optional join groups. Here the goal is to + * verify that intermediate results may skip more than one join. This + * was a problem for the old query evaluation approach since binding + * sets had to cascade through the query one join at a time. However, + * the new query engine design should handle this case. */ - public void test_query_join2_optionals() { + public void test_query_join2_optionals() throws Exception { - fail("write test"); + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; + final int joinId2 = 4; + final int predId2 = 5; + final int sliceId = 6; + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { x, y }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { y, z }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + startOp, pred1Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + })); + final PipelineOp join2Op = new PipelineJoin<E>(// + join1Op, pred2Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ + new EQ(x,z) + }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL,true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// + })); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{join2Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // two solutions where the 2nd join succeeds. + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ), + // plus anything we read from the first access path which did not join. + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary") }// + ), + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Paul") }// + ) + }; + + assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + +// new E("John", "Mary"),// [0] +// new E("Leon", "Paul"),// [1] +// new E("Mary", "Paul"),// [2] +// new E("Paul", "Leon"),// [3] + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 1st join operator. + { + final BOpStats stats = statsMap.get(joinId1); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join1: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 2nd join operator. + { + final BOpStats stats = statsMap.get(joinId2); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join2: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(1L, stats.chunksIn.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + + // Validate stats for the sliceOp. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(2L, stats.chunksIn.get()); + assertEquals(4L, stats.unitsIn.get()); + assertEquals(4L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java 2010-10-06 14:44:41 UTC (rev 3740) @@ -35,7 +35,6 @@ import com.bigdata.bop.ArrayBindingSet; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContext; import com.bigdata.bop.BOpEvaluationContext; import com.bigdata.bop.Constant; import com.bigdata.bop.HashBindingSet; @@ -51,6 +50,7 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.bop.ap.R; import com.bigdata.bop.bset.StartOp; +import com.bigdata.bop.constraint.EQ; import com.bigdata.bop.constraint.EQConstant; import com.bigdata.bop.engine.BOpStats; import com.bigdata.bop.engine.IChunkMessage; @@ -1014,7 +1014,6 @@ new String[] { namespace }),// new NV(Predicate.Annotations.BOP_ID, predId1),// new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// -// new NV(Predicate.Annotations.KEY_ORDER,R.primaryKeyOrder),// })); final Predicate<?> pred2Op = new Predicate<E>(new IVariableOrConstant[] { @@ -1024,7 +1023,6 @@ new String[] { namespace }),// new NV(Predicate.Annotations.BOP_ID, predId2),// new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// -// new NV(Predicate.Annotations.KEY_ORDER,R.primaryKeyOrder),// })); final PipelineOp join1Op = new PipelineJoin<E>(// @@ -1172,14 +1170,229 @@ } /** - * @todo Write unit tests for optional joins, including where an alternative - * sink is specified in the {@link BOpContext} and is used when the - * join fails. - * */ - public void test_query_join2_optionals() { + * Unit test for optional join. Two joins are used and target a + * {@link SliceOp}. The 2nd join is marked as optional. Intermediate results + * which do not succeed on the optional join are forwarded to the + * {@link SliceOp} which is the target specified by the + * {@link PipelineOp.Annotations#ALT_SINK_REF}. + * + * @todo Write unit test for optional join groups. Here the goal is to + * verify that intermediate results may skip more than one join. This + * was a problem for the old query evaluation approach since binding + * sets had to cascade through the query one join at a time. However, + * the new query engine design should handle this case. + */ + public void test_query_join2_optionals() throws Exception { - fail("write test"); + final int startId = 1; + final int joinId1 = 2; + final int predId1 = 3; + final int joinId2 = 4; + final int predId2 = 5; + final int sliceId = 6; + + final IVariable<?> x = Var.var("x"); + final IVariable<?> y = Var.var("y"); + final IVariable<?> z = Var.var("z"); + + final PipelineOp startOp = new StartOp(new BOp[] {}, + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, startId),// + new NV(SliceOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final Predicate<?> pred1Op = new Predicate<E>( + new IVariableOrConstant[] { x, y }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId1),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final Predicate<?> pred2Op = new Predicate<E>( + new IVariableOrConstant[] { y, z }, NV + .asMap(new NV[] {// + new NV(Predicate.Annotations.RELATION_NAME, + new String[] { namespace }),// + new NV(Predicate.Annotations.BOP_ID, predId2),// + new NV(Predicate.Annotations.TIMESTAMP, ITx.READ_COMMITTED),// + })); + + final PipelineOp join1Op = new PipelineJoin<E>(// + startOp, pred1Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId1),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + })); + final PipelineOp join2Op = new PipelineJoin<E>(// + join1Op, pred2Op,// + NV.asMap(new NV[] {// + new NV(Predicate.Annotations.BOP_ID, joinId2),// + // Note: shard-partitioned joins! + new NV( Predicate.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.SHARDED),// + // constraint x == z + new NV(PipelineJoin.Annotations.CONSTRAINTS,new IConstraint[]{ + new EQ(x,z) + }), + // join is optional. + new NV(PipelineJoin.Annotations.OPTIONAL,true),// + // optional target is the same as the default target. + new NV(PipelineOp.Annotations.ALT_SINK_REF,sliceId),// + })); + + final PipelineOp sliceOp = new SliceOp(// + new BOp[]{join2Op}, + NV.asMap(new NV[] {// + new NV(BOp.Annotations.BOP_ID, sliceId),// + new NV(BOp.Annotations.EVALUATION_CONTEXT, + BOpEvaluationContext.CONTROLLER),// + })); + + final PipelineOp query = sliceOp; + + // start the query. + final UUID queryId = UUID.randomUUID(); + final IChunkMessage<IBindingSet> initialChunkMessage; + { + + final IBindingSet initialBindings = new HashBindingSet(); + +// initialBindings.set(Var.var("x"), new Constant<String>("Mary")); + + initialChunkMessage = new LocalChunkMessage<IBindingSet>(queryEngine, + queryId, startId,// + -1, // partitionId + newBindingSetIterator(initialBindings)); + } + final RunningQuery runningQuery = queryEngine.eval(queryId, query, + initialChunkMessage); + + // verify solutions. + { + + // the expected solutions. + final IBindingSet[] expected = new IBindingSet[] {// + // solutions where the 2nd join succeeds. + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("John"), + new Constant<String>("Mary"), + new Constant<String>("John") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("John"), + new Constant<String>("Mary") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Leon"), + new Constant<String>("Paul"), + new Constant<String>("Leon") }// + ), + new ArrayBindingSet(// + new IVariable[] { x, y, z },// + new IConstant[] { new Constant<String>("Paul"), + new Constant<String>("Leon"), + new Constant<String>("Paul") }// + ), + /* + * Plus anything we read from the first access path which + * did not pass the 2nd join. + */ + new ArrayBindingSet(// + new IVariable[] { Var.var("x"), Var.var("y") },// + new IConstant[] { new Constant<String>("Mary"), + new Constant<String>("Paul") }// + ), + }; + + TestQueryEngine.assertSameSolutionsAnyOrder(expected, + new Dechunkerator<IBindingSet>(runningQuery.iterator())); + +// // partition0 +// new E("John", "Mary"),// +// new E("Leon", "Paul"),// +// // partition1 +// new E("Mary", "John"),// +// new E("Mary", "Paul"),// +// new E("Paul", "Leon"),// + } + + // Wait until the query is done. + runningQuery.get(); + final Map<Integer, BOpStats> statsMap = runningQuery.getStats(); + { + // validate the stats map. + assertNotNull(statsMap); + assertEquals(4, statsMap.size()); + if (log.isInfoEnabled()) + log.info(statsMap.toString()); + } + + // validate the stats for the start operator. + { + final BOpStats stats = statsMap.get(startId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("start: " + stats.toString()); + + // verify query solution stats details. + assertEquals(1L, stats.chunksIn.get()); + assertEquals(1L, stats.unitsIn.get()); + assertEquals(1L, stats.unitsOut.get()); + assertEquals(1L, stats.chunksOut.get()); + } + + // validate the stats for the 1st join operator. + { + final BOpStats stats = statsMap.get(joinId1); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join1: " + stats.toString()); + + // verify query solution stats details. + assertEquals(2L, stats.chunksIn.get()); + assertEquals(2L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); + assertEquals(2L, stats.chunksOut.get()); + } + + // validate the stats for the 2nd join operator. + { + final BOpStats stats = statsMap.get(joinId2); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("join2: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(1L, stats.chunksIn.get()); + assertEquals(5L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + + // Validate stats for the sliceOp. + { + final BOpStats stats = statsMap.get(sliceId); + assertNotNull(stats); + if (log.isInfoEnabled()) + log.info("slice: " + stats.toString()); + + // verify query solution stats details. +// assertEquals(2L, stats.chunksIn.get()); + assertEquals(5L, stats.unitsIn.get()); + assertEquals(5L, stats.unitsOut.get()); +// assertEquals(1L, stats.chunksOut.get()); + } + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-06 13:12:09
|
Revision: 3739 http://bigdata.svn.sourceforge.net/bigdata/?rev=3739&view=rev Author: thompsonbry Date: 2010-10-06 13:12:02 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Fixed a bug in FederatedQueryEngine where it was applying this.clientProxy in newRunningQuery(...) rather than the argument passed to that method. This was causing errors when the query crossed to another query engine (a peer) since the peer was attempting to communicate with itself as the query controller rather than the query engine against which the query was originally submitted. Modified both QueryEngine and FederatedQueryEngine to use private services for running queries and for materializing chunks. This makes it easier to identify the threads in a JVM which are specific to the query engine control logic. Fixed problems in Haltable where isError() or isCancelled could report false (they were being invoked without a lock held which made the update of the termination condition non-atomic with respect to those tests). Modified Haltable to not use a static CANCELLED exception object since that was providing a stack trace for when the class was loaded rather than when Haltable#cancel(mayInterruptIfRunning) was invoked. It now has a private 'cancelled' field. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -32,6 +32,8 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -51,6 +53,7 @@ import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; +import com.bigdata.util.concurrent.DaemonThreadFactory; /** * A class managing execution of concurrent queries against a local @@ -343,7 +346,12 @@ if (engineFuture.compareAndSet(null/* expect */, ft)) { - localIndexManager.getExecutorService().execute(ft); + engineService.set(Executors + .newSingleThreadExecutor(new DaemonThreadFactory( + QueryEngine.class + ".engineService"))); + + engineService.get().execute(ft); +// localIndexManager.getExecutorService().execute(ft); } else { @@ -354,11 +362,16 @@ } /** - * The {@link Future} for the query engine. + * The service on which we run the query engine. This is started by {@link #init()}. */ - private final AtomicReference<FutureTask<Void>> engineFuture = new AtomicReference<FutureTask<Void>>(); + private final AtomicReference<ExecutorService> engineService = new AtomicReference<ExecutorService>(); /** + * The {@link Future} for the query engine. This is set by {@link #init()}. + */ + private final AtomicReference<FutureTask<Void>> engineFuture = new AtomicReference<FutureTask<Void>>(); + + /** * Volatile flag is set for normal termination. When set, no new queries * will be accepted but existing queries will run to completion. */ @@ -524,9 +537,20 @@ // stop the query engine. final Future<?> f = engineFuture.get(); - if (f != null) + if (f != null) { f.cancel(true/* mayInterruptIfRunning */); + } + + // stop the service on which we ran the query engine. + final ExecutorService s = engineService.get(); + if (s != null) { + s.shutdownNow(); + } + // clear references. + engineFuture.set(null); + engineService.set(null); + } /** @@ -549,6 +573,12 @@ if (f != null) f.cancel(true/* mayInterruptIfRunning */); + // stop the service on which we ran the query engine. + final ExecutorService s = engineService.get(); + if (s != null) { + s.shutdownNow(); + } + // halt any running queries. for(RunningQuery q : runningQueries.values()) { @@ -556,6 +586,10 @@ } + // clear references. + engineFuture.set(null); + engineService.set(null); + } /** Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunState.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -398,7 +398,7 @@ messagesConsumed(msg.bopId, msg.nmessages); if (TableLog.tableLog.isInfoEnabled()) { - TableLog.tableLog.info(getTableRow("startOp", msg.serviceId, + TableLog.tableLog.info(getTableRow("startOp", msg.runningOnServiceId, msg.bopId, msg.partitionId, msg.nmessages/* fanIn */, null/* cause */, null/* stats */)); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -204,7 +204,7 @@ public void setDeadline(final long deadline) { if (!controller) - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); if (deadline <= 0) throw new IllegalArgumentException(); @@ -291,8 +291,10 @@ /** * Return the current statistics for the query and <code>null</code> unless - * this is the query controller. For {@link PipelineOp} operator - * which is evaluated there will be a single entry in this map. + * this is the query controller. There will be a single entry in the map for + * each distinct {@link PipelineOp}. The map entries are inserted when we + * first begin to run an instance of that operator on some + * {@link IChunkMessage}. */ public Map<Integer/* bopId */, BOpStats> getStats() { @@ -511,7 +513,9 @@ try { // verify still running. - future.halted(); + if (future.isDone()) { + throw new RuntimeException("Query is done", future.getCause()); + } // add chunk to be consumed. chunksIn.add(msg); @@ -535,7 +539,7 @@ void startQuery(final IChunkMessage<IBindingSet> msg) { if (!controller) - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); if (msg == null) throw new IllegalArgumentException(); @@ -576,7 +580,7 @@ public void startOp(final StartOpMessage msg) { if (!controller) - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); if (msg == null) throw new IllegalArgumentException(); @@ -618,7 +622,7 @@ public void haltOp(final HaltOpMessage msg) { if (!controller) - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); if (msg == null) throw new IllegalArgumentException(); @@ -1083,7 +1087,7 @@ public IAsynchronousIterator<IBindingSet[]> iterator() { if (!controller) - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(ERR_NOT_CONTROLLER); if (queryIterator == null) throw new UnsupportedOperationException(); Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/StartOpMessage.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -33,7 +33,7 @@ final public int partitionId; /** The node on which the operator will execute. */ - final public UUID serviceId; + final public UUID runningOnServiceId; /** * The #of {@link IChunkMessage} accepted as the input for the operator. @@ -45,7 +45,8 @@ final public int nmessages; public StartOpMessage(final UUID queryId, final int opId, - final int partitionId, final UUID serviceId, final int nmessages) { + final int partitionId, final UUID runningOnServiceId, + final int nmessages) { if (queryId == null) throw new IllegalArgumentException(); @@ -59,7 +60,7 @@ this.partitionId = partitionId; - this.serviceId = serviceId; + this.runningOnServiceId = runningOnServiceId; this.nmessages = nmessages; @@ -67,7 +68,7 @@ public String toString() { return getClass().getName() + "{queryId=" + queryId + ",bopId=" + bopId - + ",partitionId=" + partitionId + ",serviceId=" + serviceId + + ",partitionId=" + partitionId + ",serviceId=" + runningOnServiceId + ",nchunks=" + nmessages + "}"; } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -33,6 +33,8 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -57,6 +59,7 @@ import com.bigdata.service.ResourceService; import com.bigdata.service.jini.JiniFederation; import com.bigdata.util.InnerCause; +import com.bigdata.util.concurrent.DaemonThreadFactory; /** * An {@link IBigdataFederation} aware {@link QueryEngine}. @@ -95,14 +98,21 @@ * The proxy for this query engine when used as a query controller. */ private final IQueryClient clientProxy; - + /** * A queue of {@link IChunkMessage}s which needs to have their data * materialized so an operator can consume those data on this node. + * This queue is drained by the {@link MaterializeChunksTask}. */ final private BlockingQueue<IChunkMessage<?>> chunkMaterializationQueue = new LinkedBlockingQueue<IChunkMessage<?>>(); /** + * The service on which we run {@link MaterializeChunksTask}. This is + * started by {@link #init()}. + */ + private final AtomicReference<ExecutorService> materializeChunksService = new AtomicReference<ExecutorService>(); + + /** * The {@link Future} for the task draining the {@link #chunkMaterializationQueue}. */ private final AtomicReference<FutureTask<Void>> materializeChunksFuture = new AtomicReference<FutureTask<Void>>(); @@ -270,8 +280,14 @@ new MaterializeChunksTask(), (Void) null); if (materializeChunksFuture.compareAndSet(null/* expect */, ft)) { - - getIndexManager().getExecutorService().execute(ft); + + materializeChunksService.set(Executors + .newSingleThreadExecutor(new DaemonThreadFactory( + FederatedQueryEngine.class + + ".materializeChunksService"))); + +// getIndexManager().getExecutorService().execute(ft); + materializeChunksService.get().execute(ft); } else { @@ -288,12 +304,23 @@ */ @Override protected void didShutdown() { - + // stop materializing chunks. final Future<?> f = materializeChunksFuture.get(); - if (f != null) + if (f != null) { f.cancel(true/* mayInterruptIfRunning */); + } + // stop the service on which we ran the MaterializeChunksTask. + final ExecutorService s = materializeChunksService.get(); + if (s != null) { + s.shutdownNow(); + } + + // Clear the references. + materializeChunksFuture.set(null); + materializeChunksService.set(null); + } /** @@ -516,8 +543,8 @@ final boolean controller, final IQueryClient clientProxy, final PipelineOp query) { - return new FederatedRunningQuery(this, queryId, controller, - this.clientProxy, query); + return new FederatedRunningQuery(this/*queryEngine*/, queryId, controller, + clientProxy, query); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/util/concurrent/Haltable.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -69,11 +69,11 @@ private final transient static Logger log = Logger .getLogger(Haltable.class); - /** - * Exception used to indicate a {@link #cancel(boolean) cancelled} - * computation. - */ - private static Throwable CANCELLED = new InterruptedException("CANCELLED"); +// /** +// * Exception used to indicate a {@link #cancel(boolean) cancelled} +// * computation. +// */ +// private static final Throwable CANCELLED = new InterruptedException("CANCELLED"); /** * Lock guarding the {@link #halted} condition and the various non-volatile, @@ -87,20 +87,20 @@ final private Condition halted = lock.newCondition(); /** - * The result of the computation. + * The result of the computation. This is guarded by the {@link #lock} . */ private V result = null; /** * The first cause as set by {@link #halt(Throwable)}. */ - private Throwable firstCause = null; + private volatile Throwable firstCause = null; /** * Flag is set <code>true</code> if the process was halted by a * {@link Throwable} not included in the set of normal termination causes. */ - private boolean error = false; + private volatile boolean error = false; /** * Set to <code>true</code> iff the process should halt. @@ -108,6 +108,12 @@ private volatile boolean halt = false; /** + * Set to <code>true</code> iff the process was {@link #cancel(boolean) + * cancelled}. + */ + private volatile boolean cancelled = false; + + /** * Halt (normal termination). */ final public void halt(final V v) { @@ -173,34 +179,56 @@ } /** - * Return unless processing has been halted. - * <p> - * This method may be used to detect asynchronous termination of the - * process. It will throw out the wrapper first cause if the process is - * halted. The method should be invoked from within the execution of the - * process itself so that it may notice asynchronous termination. + * Return unless processing has been halted. The method should be invoked + * from within the execution of the process itself so that it may notice + * asynchronous termination. It will throw out the wrapper first cause if + * the process is halted. The method is <code>protected</code> since the + * semantics are those of testing for unexpected termination of the process + * from within the process. External processes should use {@link #isDone()}. * * @throws RuntimeException * wrapping the {@link #firstCause} iff processing has been * halted. */ - final public void halted() { + final protected void halted() { if (halt) { - if (firstCause == null) + if (firstCause == null) { + /* + * Note: this is an error since there is an expectation by the + * process when it invokes halted() that the process is still + * running (since it invoked halted() it must be running). Since + * it is running, + */ throw new RuntimeException(); + + } throw new RuntimeException(firstCause); } } final public boolean cancel(final boolean mayInterruptIfRunning) { + lock.lock(); + try { + + final Throwable t = new InterruptedException(); - halt(CANCELLED); + halt(t); - // return true if this was the firstCause. - return (firstCause == CANCELLED); + if (firstCause == t) { + // iff this was the firstCause. + cancelled = true; + return true; + } + return false; + + } finally { + + lock.unlock(); + + } } final public V get() throws InterruptedException, ExecutionException { @@ -209,8 +237,11 @@ while (!halt) { halted.await(); } - if (firstCause == CANCELLED) - throw new CancellationException(); + if(cancelled) { + final CancellationException t = new CancellationException(); + t.initCause(firstCause); + throw t; + } if (error) throw new ExecutionException(firstCause); return result; @@ -260,13 +291,25 @@ */ final public boolean isError() { - return halt && error; + // Note: lock required for atomic visibility for [halt AND error]. + lock.lock(); + try { + return halt && error; + } finally { + lock.unlock(); + } } public boolean isCancelled() { - return halt && firstCause == CANCELLED; + // Note: lock required for atomic visibility for [halt AND cancelled]. + lock.lock(); + try { + return halt && cancelled; + } finally { + lock.unlock(); + } } @@ -333,8 +376,8 @@ * termination. */ protected boolean isNormalTerminationCause(final Throwable cause) { - if (CANCELLED == cause) - return true; +// if (InnerCause.isInnerCause(cause, CancelledException.class)) +// return true; if (InnerCause.isInnerCause(cause, InterruptedException.class)) return true; if (InnerCause.isInnerCause(cause, CancellationException.class)) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 13:02:06 UTC (rev 3738) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java 2010-10-06 13:12:02 UTC (rev 3739) @@ -906,7 +906,7 @@ { // validate the stats map. assertNotNull(statsMap); - assertEquals(3, statsMap.size()); + assertEquals("statsMap.size()", 3, statsMap.size()); if (log.isInfoEnabled()) log.info(statsMap.toString()); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-06 13:02:12
|
Revision: 3738 http://bigdata.svn.sourceforge.net/bigdata/?rev=3738&view=rev Author: thompsonbry Date: 2010-10-06 13:02:06 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Applying serialVersionUID to be compatible with the implied serialVersionUID in the trunk. That serialialVersionUID is now explicit in both the trunk and the QUADS_QUERY_BRANCH Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java 2010-10-06 13:00:41 UTC (rev 3737) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java 2010-10-06 13:02:06 UTC (rev 3738) @@ -64,6 +64,8 @@ final static public Logger log = Logger.getLogger(BaseVocabulary.class); + private static final long serialVersionUID = 1560142397515291331L; + /** * The database that is the authority for the defined terms and term * identifiers. This will be <code>null</code> when the de-serialization This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-06 13:00:47
|
Revision: 3737 http://bigdata.svn.sourceforge.net/bigdata/?rev=3737&view=rev Author: thompsonbry Date: 2010-10-06 13:00:41 +0000 (Wed, 06 Oct 2010) Log Message: ----------- Applying the as found serialialVersionUID to BaseVocabulary. The class did not have an explicit serialVersionId. This change should be backward compatible with existing stores created from the trunk. Modified Paths: -------------- trunk/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java Modified: trunk/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java =================================================================== --- trunk/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java 2010-10-06 02:34:14 UTC (rev 3736) +++ trunk/bigdata-rdf/src/java/com/bigdata/rdf/vocab/BaseVocabulary.java 2010-10-06 13:00:41 UTC (rev 3737) @@ -65,6 +65,11 @@ final static public Logger log = Logger.getLogger(BaseVocabulary.class); /** + * The serialVersionUID as reported by the trunk on Oct 6, 2010. + */ + private static final long serialVersionUID = 1560142397515291331L; + + /** * The database that is the authority for the defined terms and term * identifiers. This will be <code>null</code> when the de-serialization * ctor is used. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2010-10-06 02:34:21
|
Revision: 3736 http://bigdata.svn.sourceforge.net/bigdata/?rev=3736&view=rev Author: mrpersonick Date: 2010-10-06 02:34:14 +0000 (Wed, 06 Oct 2010) Log Message: ----------- change sets notification for truth maintenance add and remove Modified Paths: -------------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java Added Paths: ----------- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -29,11 +29,15 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.ISPOAssertionBuffer; import com.bigdata.rdf.spo.JustificationWriter; @@ -101,6 +105,10 @@ * {@link Justification}s for entailments. */ protected final boolean justify; + + protected final IChangeLog changeLog; + + protected final Map<IV, BigdataBNode> bnodes; /** * Create a buffer. @@ -126,6 +134,17 @@ AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity, boolean justified) { + this(focusStore, db, filter, capacity, justified, + null/* changeLog */, null/* bnodes */); + + } + + public SPOAssertionBuffer(AbstractTripleStore focusStore, + AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity, + boolean justified, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes + ) { + super(db, filter, capacity); if (focusStore == null) @@ -142,6 +161,10 @@ justifications = justified ? new Justification[capacity] : null; + this.changeLog = changeLog; + + this.bnodes = bnodes; + } /** @@ -180,12 +203,28 @@ if (numJustifications == 0) { - // batch insert statements into the focusStore. - n = db.addStatements( + if (changeLog == null) { + + // batch insert statements into the focusStore. + n = db.addStatements( focusStore, true/* copyOnly */, new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/), null/*filter*/); + + } else { + + n = com.bigdata.rdf.sail.changesets. + StatementWriter.addStatements( + db, + focusStore, + true/* copyOnly */, + null/* filter */, + new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/), + changeLog, + bnodes); + + } } else { @@ -209,7 +248,8 @@ // task will write SPOs on the statement indices. tasks.add(new StatementWriter(getTermDatabase(), focusStore, false/* copyOnly */, new ChunkedArrayIterator<ISPO>( - numStmts, stmts, null/*keyOrder*/), nwritten)); + numStmts, stmts, null/*keyOrder*/), nwritten, + changeLog, bnodes)); // task will write justifications on the justifications index. final AtomicLong nwrittenj = new AtomicLong(); Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -27,6 +27,11 @@ package com.bigdata.rdf.inf; +import java.util.Map; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.StatementWriter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.store.AbstractTripleStore; @@ -50,6 +55,10 @@ private final AbstractTripleStore store; private final boolean computeClosureForStatementIdentifiers; + protected final IChangeLog changeLog; + + protected final Map<IV, BigdataBNode> bnodes; + /** * @param store * The database from which the statement will be removed when the @@ -63,6 +72,15 @@ public SPORetractionBuffer(AbstractTripleStore store, int capacity, boolean computeClosureForStatementIdentifiers) { + this(store, capacity, computeClosureForStatementIdentifiers, + null/* changeLog */, null/* bnodes */); + + } + + public SPORetractionBuffer(AbstractTripleStore store, int capacity, + boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + super(store, null/*filter*/, capacity); if (store == null) @@ -72,14 +90,34 @@ this.computeClosureForStatementIdentifiers = computeClosureForStatementIdentifiers; + this.changeLog = changeLog; + + this.bnodes = bnodes; + } public int flush() { if (isEmpty()) return 0; - long n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts, + final long n; + + if (changeLog == null) { + + n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts, null/*keyOrder*/), computeClosureForStatementIdentifiers); + + } else { + + n = StatementWriter.removeStatements( + store, + new ChunkedArrayIterator<ISPO>( + numStmts,stmts,null/*keyOrder*/), + computeClosureForStatementIdentifiers, + changeLog, + bnodes); + + } // reset the counter. numStmts = 0; Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -47,21 +47,27 @@ package com.bigdata.rdf.inf; +import java.util.Map; import java.util.Properties; import org.apache.log4j.Logger; import org.apache.log4j.MDC; import com.bigdata.journal.TemporaryStore; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.StatementEnum; import com.bigdata.rdf.rio.IStatementBuffer; import com.bigdata.rdf.rules.InferenceEngine; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.spo.SPOArrayIterator; import com.bigdata.rdf.spo.SPOKeyOrder; import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIterator; import com.bigdata.rdf.store.IRawTripleStore; import com.bigdata.rdf.store.TempTripleStore; import com.bigdata.relation.accesspath.IElementFilter; @@ -234,8 +240,21 @@ static public int applyExistingStatements( final AbstractTripleStore focusStore, final AbstractTripleStore database, - final IElementFilter<ISPO> filter) { + final IElementFilter<ISPO> filter + ) { + + return applyExistingStatements(focusStore, database, filter, + null/* changeLog */, null/* bnodes */); + } + + static public int applyExistingStatements( + final AbstractTripleStore focusStore, + final AbstractTripleStore database, + final IElementFilter<ISPO> filter, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes + ) { + if(INFO) log.info("Filtering statements already known to the database"); @@ -248,7 +267,7 @@ final IChunkedOrderedIterator<ISPO> itr = focusStore.getAccessPath( SPOKeyOrder.SPO, ExplicitSPOFilter.INSTANCE).iterator(); - + int nremoved = 0; int nupgraded = 0; @@ -266,7 +285,8 @@ */ final SPOAssertionBuffer assertionBuffer = new SPOAssertionBuffer( - database, database, filter, capacity, false/* justified */); + database, database, filter, capacity, false/* justified */, + changeLog, bnodes); /* * This buffer will retract statements from the tempStore that are @@ -290,7 +310,7 @@ for(int i=0; i<chunk.length; i++) { final SPO spo = (SPO)chunk[i]; - + // Lookup the statement in the database. final ISPO tmp = database.getStatement(spo.s, spo.p, spo.o); @@ -365,6 +385,13 @@ */ public ClosureStats assertAll(final TempTripleStore tempStore) { + return assertAll(tempStore, null/* changeLog */, null/* bnodes */); + + } + + public ClosureStats assertAll(final TempTripleStore tempStore, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + if (tempStore == null) { throw new IllegalArgumentException(); @@ -409,7 +436,7 @@ * consistent if we change our mind about that practice. */ - applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter); + applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter, changeLog, bnodes); final ClosureStats stats = inferenceEngine.computeClosure(tempStore); @@ -429,7 +456,8 @@ // tempStore.dumpStore(database,true,true,false,true); final long ncopied = tempStore.copyStatements(database, - null/* filter */, true /* copyJustifications */); + null/* filter */, true /* copyJustifications */, + changeLog, bnodes); // database.dumpStore(database,true,true,false,true); @@ -478,6 +506,13 @@ */ public ClosureStats retractAll(final TempTripleStore tempStore) { + return retractAll(tempStore, null/* changeLog */, null/* bnodes */); + + } + + public ClosureStats retractAll(final TempTripleStore tempStore, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + final long begin = System.currentTimeMillis(); final ClosureStats stats = new ClosureStats(); @@ -512,7 +547,7 @@ } // do truth maintenance. - retractAll(stats, tempStore, 0); + retractAll(stats, tempStore, 0, changeLog, bnodes); MDC.remove("depth"); @@ -591,7 +626,8 @@ * explicit statements to be retracted. */ private void retractAll(final ClosureStats stats, - final TempTripleStore tempStore, final int depth) { + final TempTripleStore tempStore, final int depth, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { MDC.put("depth", "depth=" + depth); @@ -640,7 +676,9 @@ database, // the persistent db. null, //filter @todo was inferenceEngine.doNotAddFilter, capacity,// - false // justify + false,// justify + changeLog, + bnodes ); /* @@ -657,7 +695,8 @@ * identifiers. */ final SPORetractionBuffer retractionBuffer = new SPORetractionBuffer( - database, capacity, false/* computeClosureForStatementIdentifiers */); + database, capacity, false/* computeClosureForStatementIdentifiers */, + changeLog, bnodes); /* * Note: when we enter this method recursively statements in the @@ -964,7 +1003,7 @@ * Recursive processing. */ - retractAll(stats, focusStore, depth + 1); + retractAll(stats, focusStore, depth + 1, changeLog, bnodes); } Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -1,9 +1,17 @@ package com.bigdata.rdf.spo; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.log4j.Logger; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; +import com.bigdata.rdf.sail.changesets.ChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIteratorImpl; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.striterator.IChunkedOrderedIterator; @@ -18,6 +26,8 @@ */ public class StatementWriter implements Callable<Long>{ + protected static final Logger log = Logger.getLogger(StatementWriter.class); + private final AbstractTripleStore database; private final AbstractTripleStore statementStore; private final boolean copyOnly; @@ -27,6 +37,10 @@ * Incremented by the #of statements written on the statements indices. */ public final AtomicLong nwritten; + + private final IChangeLog changeLog; + + private final Map<IV, BigdataBNode> bnodes; /** * @param database @@ -51,7 +65,17 @@ public StatementWriter(AbstractTripleStore database, AbstractTripleStore statementStore, boolean copyOnly, IChunkedOrderedIterator<ISPO> itr, AtomicLong nwritten) { - + + this(database, statementStore, copyOnly, itr, nwritten, + null/* changeLog */, null/* bnodes */); + + } + + public StatementWriter(final AbstractTripleStore database, + final AbstractTripleStore statementStore, final boolean copyOnly, + final IChunkedOrderedIterator<ISPO> itr, final AtomicLong nwritten, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + if (database == null) throw new IllegalArgumentException(); @@ -73,6 +97,10 @@ this.itr = itr; this.nwritten = nwritten; + + this.changeLog = changeLog; + + this.bnodes = bnodes; } @@ -85,11 +113,30 @@ final long begin = System.currentTimeMillis(); - nwritten.addAndGet(database.addStatements(statementStore, copyOnly, - itr, null/* filter */)); + final long n; + + if (changeLog == null) { + + n = database.addStatements(statementStore, copyOnly, + itr, null/* filter */); + + } else { + n = com.bigdata.rdf.sail.changesets.StatementWriter.addStatements( + database, + statementStore, + copyOnly, + null/* filter */, + itr, + changeLog, + bnodes); + + } + + nwritten.addAndGet(n); + return System.currentTimeMillis() - begin; } - + } Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -87,6 +87,7 @@ import com.bigdata.rdf.lexicon.ITermIndexCodes; import com.bigdata.rdf.lexicon.ITextIndexer; import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataResource; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.BigdataURI; @@ -102,6 +103,7 @@ import com.bigdata.rdf.rules.MatchRule; import com.bigdata.rdf.rules.RDFJoinNexusFactory; import com.bigdata.rdf.rules.RuleContextEnum; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.BulkCompleteConverter; import com.bigdata.rdf.spo.BulkFilterConverter; import com.bigdata.rdf.spo.ExplicitSPOFilter; @@ -2982,6 +2984,18 @@ final AbstractTripleStore dst,// final IElementFilter<ISPO> filter,// final boolean copyJustifications// + ) { + + return copyStatements(dst, filter, copyJustifications, + null/* changeLog */, null /* bnodes */); + + } + + public long copyStatements(// + final AbstractTripleStore dst,// + final IElementFilter<ISPO> filter,// + final boolean copyJustifications,// + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes ) { if (dst == this) @@ -2995,9 +3009,25 @@ if (!copyJustifications) { - // add statements to the target store. - return dst - .addStatements(dst, true/* copyOnly */, itr, null/* filter */); + if (changeLog == null) { + + // add statements to the target store. + return dst + .addStatements(dst, true/* copyOnly */, itr, null/* filter */); + + } else { + + return com.bigdata.rdf.sail.changesets. + StatementWriter.addStatements( + dst, + dst, + true/* copyOnly */, + null/* filter */, + itr, + changeLog, + bnodes); + + } } else { @@ -3020,8 +3050,8 @@ final AtomicLong nwritten = new AtomicLong(); // task will write SPOs on the statement indices. - tasks.add(new StatementWriter(this, dst, true/* copyOnly */, - itr, nwritten)); + tasks.add(new StatementWriter(dst, dst, true/* copyOnly */, + itr, nwritten, changeLog, bnodes)); // task will write justifications on the justifications index. final AtomicLong nwrittenj = new AtomicLong(); Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -132,6 +132,7 @@ import com.bigdata.rdf.sail.changesets.ChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.sail.changesets.IChangeRecord; +import com.bigdata.rdf.sail.changesets.StatementWriter; import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; @@ -2329,35 +2330,42 @@ } else { - final IAccessPath<ISPO> ap = - database.getAccessPath(s, p, o, c); - - final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + final IChunkedOrderedIterator<ISPO> itr = + database.getAccessPath(s, p, o, c).iterator(); - if (itr.hasNext()) { - - final BigdataStatementIteratorImpl itr2 = - new BigdataStatementIteratorImpl(database, bnodes2, itr) - .start(database.getExecutorService()); - - final BigdataStatement[] stmts = - new BigdataStatement[database.getChunkCapacity()]; - - int i = 0; - while (i < stmts.length && itr2.hasNext()) { - stmts[i++] = itr2.next(); - if (i == stmts.length) { - // process stmts[] - n += removeAndNotify(stmts, i); - i = 0; - } - } - if (i > 0) { - n += removeAndNotify(stmts, i); - } - - } + n = StatementWriter.removeStatements(database, itr, + true/* computeClosureForStatementIdentifiers */, + changeLog, bnodes2); +// final IAccessPath<ISPO> ap = +// database.getAccessPath(s, p, o, c); +// +// final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); +// +// if (itr.hasNext()) { +// +// final BigdataStatementIteratorImpl itr2 = +// new BigdataStatementIteratorImpl(database, bnodes2, itr) +// .start(database.getExecutorService()); +// +// final BigdataStatement[] stmts = +// new BigdataStatement[database.getChunkCapacity()]; +// +// int i = 0; +// while (i < stmts.length && itr2.hasNext()) { +// stmts[i++] = itr2.next(); +// if (i == stmts.length) { +// // process stmts[] +// n += removeAndNotify(stmts, i); +// i = 0; +// } +// } +// if (i > 0) { +// n += removeAndNotify(stmts, i); +// } +// +// } + } } @@ -2367,69 +2375,69 @@ } - private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { - - final SPO[] tmp = new SPO[numStmts]; +// private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { +// +// final SPO[] tmp = new SPO[numStmts]; +// +// for (int i = 0; i < tmp.length; i++) { +// +// final BigdataStatement stmt = stmts[i]; +// +// /* +// * Note: context position is not passed when statement identifiers +// * are in use since the statement identifier is assigned based on +// * the {s,p,o} triple. +// */ +// +// final SPO spo = new SPO(stmt); +// +// if (log.isDebugEnabled()) +// log.debug("adding: " + stmt.toString() + " (" + spo + ")"); +// +// if(!spo.isFullyBound()) { +// +// throw new AssertionError("Not fully bound? : " + spo); +// +// } +// +// tmp[i] = spo; +// +// } +// +// /* +// * Note: When handling statement identifiers, we clone tmp[] to avoid a +// * side-effect on its order so that we can unify the assigned statement +// * identifiers below. +// * +// * Note: In order to report back the [ISPO#isModified()] flag, we also +// * need to clone tmp[] to avoid a side effect on its order. Therefore we +// * now always clone tmp[]. +// */ +//// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); +// final long nwritten = database.removeStatements(tmp.clone(), numStmts); +// +// // Copy the state of the isModified() flag +// { +// +// for (int i = 0; i < numStmts; i++) { +// +// if (tmp[i].isModified()) { +// +// stmts[i].setModified(true); +// +// changeLog.changeEvent( +// new ChangeRecord(stmts[i], ChangeAction.REMOVED)); +// +// } +// +// } +// +// } +// +// return nwritten; +// +// } - for (int i = 0; i < tmp.length; i++) { - - final BigdataStatement stmt = stmts[i]; - - /* - * Note: context position is not passed when statement identifiers - * are in use since the statement identifier is assigned based on - * the {s,p,o} triple. - */ - - final SPO spo = new SPO(stmt); - - if (log.isDebugEnabled()) - log.debug("adding: " + stmt.toString() + " (" + spo + ")"); - - if(!spo.isFullyBound()) { - - throw new AssertionError("Not fully bound? : " + spo); - - } - - tmp[i] = spo; - - } - - /* - * Note: When handling statement identifiers, we clone tmp[] to avoid a - * side-effect on its order so that we can unify the assigned statement - * identifiers below. - * - * Note: In order to report back the [ISPO#isModified()] flag, we also - * need to clone tmp[] to avoid a side effect on its order. Therefore we - * now always clone tmp[]. - */ -// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); - final long nwritten = database.removeStatements(tmp.clone(), numStmts); - - // Copy the state of the isModified() flag - { - - for (int i = 0; i < numStmts; i++) { - - if (tmp[i].isModified()) { - - stmts[i].setModified(true); - - changeLog.changeEvent( - new ChangeRecord(stmts[i], ChangeAction.REMOVED)); - - } - - } - - } - - return nwritten; - - } - public synchronized CloseableIteration<? extends Resource, SailException> getContextIDs() throws SailException { @@ -2695,7 +2703,9 @@ if(getTruthMaintenance()) { // do TM, writing on the database. - tm.assertAll((TempTripleStore)assertBuffer.getStatementStore()); + tm.assertAll( + (TempTripleStore)assertBuffer.getStatementStore(), + changeLog, bnodes2); // must be reallocated on demand. assertBuffer = null; @@ -2712,7 +2722,8 @@ if(getTruthMaintenance()) { // do TM, writing on the database. - tm.retractAll((TempTripleStore)retractBuffer.getStatementStore()); + tm.retractAll((TempTripleStore)retractBuffer.getStatementStore(), + changeLog, bnodes2); // must be re-allocated on demand. retractBuffer = null; Added: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java (rev 0) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -0,0 +1,196 @@ +package com.bigdata.rdf.sail.changesets; + +import java.util.Iterator; +import java.util.Map; +import org.apache.log4j.Logger; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPO; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIteratorImpl; +import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.IChunkedOrderedIterator; + +public class StatementWriter { + + protected static final Logger log = Logger.getLogger(StatementWriter.class); + + public static long addStatements(final AbstractTripleStore database, + final AbstractTripleStore statementStore, + final boolean copyOnly, + final IElementFilter<ISPO> filter, + final IChunkedOrderedIterator<ISPO> itr, + final IChangeLog changeLog, + final Map<IV, BigdataBNode> bnodes) { + + long n = 0; + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while ((i = nextChunk(itr2, stmts)) > 0) { + n += addStatements(database, statementStore, copyOnly, filter, + stmts, i, changeLog); + } + + } + + return n; + + } + + private static long addStatements(final AbstractTripleStore database, + final AbstractTripleStore statementStore, + final boolean copyOnly, + final IElementFilter<ISPO> filter, + final BigdataStatement[] stmts, + final int numStmts, + final IChangeLog changeLog) { + + final SPO[] tmp = allocateSPOs(stmts, numStmts); + + final long n = database.addStatements(statementStore, copyOnly, + new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(), + null/* keyOrder */), filter); + + // Copy the state of the isModified() flag and notify changeLog + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.ADDED)); + + } + + } + + return n; + + } + + public static long removeStatements(final AbstractTripleStore database, + final IChunkedOrderedIterator<ISPO> itr, + final boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog, + final Map<IV, BigdataBNode> bnodes) { + + long n = 0; + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while ((i = nextChunk(itr2, stmts)) > 0) { + n += removeStatements(database, stmts, i, + computeClosureForStatementIdentifiers, changeLog); + } + + } + + return n; + + } + + private static long removeStatements(final AbstractTripleStore database, + final BigdataStatement[] stmts, + final int numStmts, + final boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog) { + + final SPO[] tmp = allocateSPOs(stmts, numStmts); + + final long n = database.removeStatements( + new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(), + null/* keyOrder */), + computeClosureForStatementIdentifiers); + + // Copy the state of the isModified() flag and notify changeLog + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.REMOVED)); + + } + + } + + return n; + + } + + private static int nextChunk(final Iterator<BigdataStatement> itr, + final BigdataStatement[] stmts) { + + assert stmts != null && stmts.length > 0; + + int i = 0; + while (itr.hasNext()) { + stmts[i++] = itr.next(); + if (i == stmts.length) { + // stmts[] is full + return i; + } + } + + /* + * stmts[] is empty (i = 0) or partially + * full (i > 0 && i < stmts.length) + */ + return i; + + } + + private static SPO[] allocateSPOs(final BigdataStatement[] stmts, + final int numStmts) { + + final SPO[] tmp = new SPO[numStmts]; + + for (int i = 0; i < tmp.length; i++) { + + final BigdataStatement stmt = stmts[i]; + + final SPO spo = new SPO(stmt); + + if (log.isDebugEnabled()) + log.debug("writing: " + stmt.toString() + " (" + spo + ")"); + + if(!spo.isFullyBound()) { + + throw new AssertionError("Not fully bound? : " + spo); + + } + + tmp[i] = spo; + + } + + return tmp; + + + } + +} Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -292,7 +292,7 @@ } - public void testTruthMaintenance() throws Exception { + public void testTMAdd() throws Exception { final BigdataSail sail = getSail(getTriplesWithInference()); sail.initialize(); @@ -363,6 +363,113 @@ } + public void testTMRetract() throws Exception { + + final BigdataSail sail = getSail(getTriplesWithInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + + final BigdataStatement[] explicitAdd = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] inferredAdd = new BigdataStatement[] { + vf.createStatement(a, RDF.TYPE, RDFS.CLASS), + vf.createStatement(a, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(a, RDFS.SUBCLASSOF, a), + vf.createStatement(a, RDFS.SUBCLASSOF, c), + vf.createStatement(b, RDF.TYPE, RDFS.CLASS), + vf.createStatement(b, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(b, RDFS.SUBCLASSOF, b), + vf.createStatement(c, RDF.TYPE, RDFS.CLASS), + vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(c, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] explicitRemove = new BigdataStatement[] { + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] inferredRemove = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, c), + vf.createStatement(c, RDF.TYPE, RDFS.CLASS), + vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(c, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + for (BigdataStatement stmt : explicitAdd) { + cxn.add(stmt); + } + + cxn.commit();// + + { + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : explicitAdd) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + for (BigdataStatement stmt : inferredAdd) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + for (BigdataStatement stmt : explicitRemove) { + cxn.remove(stmt); + } + + cxn.commit();// + + { + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : explicitRemove) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + for (BigdataStatement stmt : inferredRemove) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + if (log.isDebugEnabled()) { + log.debug("\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + private void compare(final Collection<IChangeRecord> expected, final Collection<IChangeRecord> actual) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-05 20:30:44
|
Revision: 3735 http://bigdata.svn.sourceforge.net/bigdata/?rev=3735&view=rev Author: thompsonbry Date: 2010-10-05 20:30:37 +0000 (Tue, 05 Oct 2010) Log Message: ----------- Removed unused imports. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-05 20:30:15 UTC (rev 3734) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-05 20:30:37 UTC (rev 3735) @@ -42,24 +42,12 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpContextBase; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; -import com.bigdata.bop.ap.Predicate; -import com.bigdata.bop.cost.DiskCostModel; -import com.bigdata.bop.cost.ScanCostReport; -import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; -import com.bigdata.btree.IBTreeStatistics; import com.bigdata.btree.IndexSegment; -import com.bigdata.btree.UnisolatedReadWriteIndex; import com.bigdata.btree.view.FusedView; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.Journal; -import com.bigdata.journal.NoSuchIndexException; -import com.bigdata.journal.TimestampUtility; -import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-05 20:30:21
|
Revision: 3734 http://bigdata.svn.sourceforge.net/bigdata/?rev=3734&view=rev Author: thompsonbry Date: 2010-10-05 20:30:15 +0000 (Tue, 05 Oct 2010) Log Message: ----------- Removed unused imports. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java 2010-10-05 20:29:26 UTC (rev 3733) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java 2010-10-05 20:30:15 UTC (rev 3734) @@ -28,17 +28,13 @@ package com.bigdata.rdf.spo; -import java.util.HashMap; - import junit.framework.TestCase2; import com.bigdata.bop.BOp; import com.bigdata.bop.Constant; import com.bigdata.bop.IPredicate; -import com.bigdata.bop.IVariableOrConstant; import com.bigdata.bop.NV; import com.bigdata.bop.Var; -import com.bigdata.bop.ap.Predicate; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.TermId; import com.bigdata.rdf.internal.VTE; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-05 20:29:32
|
Revision: 3733 http://bigdata.svn.sourceforge.net/bigdata/?rev=3733&view=rev Author: thompsonbry Date: 2010-10-05 20:29:26 +0000 (Tue, 05 Oct 2010) Log Message: ----------- Also removed the unit tests which were verifying the old implementations of hashCode() and equals() for SPOPredicate. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java 2010-10-05 20:28:22 UTC (rev 3732) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPOPredicate.java 2010-10-05 20:29:26 UTC (rev 3733) @@ -182,130 +182,130 @@ } - /** - * Verify equality testing with same impl. - */ - public void test_equalsSameImpl() { - - final Var<IV> u = Var.var("u"); - - final SPOPredicate p1 = new SPOPredicate(relation,u, rdfsSubClassOf, rdfsResource); - - final SPOPredicate p2 = new SPOPredicate(relation,u, rdfType, rdfsClass); - - log.info(p1.toString()); - - log.info(p2.toString()); - - assertTrue(p1.equals(new SPOPredicate(relation, u, rdfsSubClassOf, rdfsResource))); - - assertTrue(p2.equals(new SPOPredicate(relation, u, rdfType, rdfsClass))); - - assertFalse(p1.equals(p2)); - - assertFalse(p2.equals(p1)); - - } +// /** +// * Verify equality testing with same impl. +// */ +// public void test_equalsSameImpl() { +// +// final Var<IV> u = Var.var("u"); +// +// final SPOPredicate p1 = new SPOPredicate(relation,u, rdfsSubClassOf, rdfsResource); +// +// final SPOPredicate p2 = new SPOPredicate(relation,u, rdfType, rdfsClass); +// +// log.info(p1.toString()); +// +// log.info(p2.toString()); +// +// assertTrue(p1.equals(new SPOPredicate(relation, u, rdfsSubClassOf, rdfsResource))); +// +// assertTrue(p2.equals(new SPOPredicate(relation, u, rdfType, rdfsClass))); +// +// assertFalse(p1.equals(p2)); +// +// assertFalse(p2.equals(p1)); +// +// } +// +// public void test_equalsDifferentImpl() { +// +// final Var<IV> u = Var.var("u"); +// +// final SPOPredicate p1 = new SPOPredicate(relation, u, rdfType, rdfsClass); +// +// final Predicate p2 = new Predicate(new IVariableOrConstant[] { u, +// rdfType, rdfsClass }, new NV( +// Predicate.Annotations.RELATION_NAME, new String[] { relation })); +// +// log.info(p1.toString()); +// +// log.info(p2.toString()); +// +// assertTrue(p1.equals(p2)); +// +// assertTrue(p2.equals(p1)); +// +// } +// +// /** +// * Note: {@link HashMap} support will breaks unless the {@link IPredicate} +// * class defines <code>equals(Object o)</code>. If it just defines +// * <code>equals(IPredicate)</code> then {@link Object#equals(Object)} will +// * be invoked instead! +// */ +// public void test_hashMapSameImpl() { +// +// final Var<IV> u = Var.var("u"); +// +// final SPOPredicate p1 = new SPOPredicate(relation, u, rdfsSubClassOf, +// rdfsResource); +// +// final Predicate<?> p1b = new Predicate(new IVariableOrConstant[] { u, +// rdfsSubClassOf, rdfsResource }, new NV(Predicate.Annotations.RELATION_NAME, +// new String[] { relation })); +// +// final SPOPredicate p2 = new SPOPredicate(relation, u, rdfType, +// rdfsClass); +// +// final Predicate<?> p2b = new Predicate(new IVariableOrConstant[] { u, +// rdfType, rdfsClass }, new NV(Predicate.Annotations.RELATION_NAME, +// new String[] { relation })); +// +// // p1 and p1b compare as equal. +// assertTrue(p1.equals(p1)); +// assertTrue(p1.equals(p1b)); +// assertTrue(p1b.equals(p1)); +// assertTrue(p1b.equals(p1b)); +// +// // {p1,p1b} not equal {p2,p2b} +// assertFalse(p1.equals(p2)); +// assertFalse(p1.equals(p2b)); +// assertFalse(p1b.equals(p2)); +// assertFalse(p1b.equals(p2b)); +// +// // {p1,p1b} have the same hash code. +// assertEquals(p1.hashCode(), p1b.hashCode()); +// +// // {p2,p2b} have the same hash code. +// assertEquals(p2.hashCode(), p2b.hashCode()); +// +// final HashMap<IPredicate,String> map = new HashMap<IPredicate,String>(); +// +// assertFalse(map.containsKey(p1)); +// assertFalse(map.containsKey(p2)); +// assertFalse(map.containsKey(p1b)); +// assertFalse(map.containsKey(p2b)); +// +// assertEquals(0,map.size()); +// assertNull(map.put(p1,"p1")); +// assertEquals(1,map.size()); +// assertEquals("p1",map.put(p1,"p1")); +// assertEquals(1,map.size()); +// +// assertTrue(p1.equals(p1b)); +// assertTrue(p1b.equals(p1)); +// assertTrue(p1.hashCode()==p1b.hashCode()); +// assertEquals("p1",map.put(p1b,"p1")); +// assertEquals(1,map.size()); +// +// assertTrue(map.containsKey(p1)); +// assertTrue(map.containsKey(p1b)); +// assertFalse(map.containsKey(p2)); +// assertFalse(map.containsKey(p2b)); +// +// assertEquals("p1",map.get(p1)); +// assertEquals("p1",map.get(p1b)); +// +// map.put(p2,"p2"); +// +// assertTrue(map.containsKey(p1)); +// assertTrue(map.containsKey(p1b)); +// assertTrue(map.containsKey(p2)); +// assertTrue(map.containsKey(p2b)); +// +// assertEquals("p2",map.get(p2)); +// assertEquals("p2",map.get(p2b)); +// +// } - public void test_equalsDifferentImpl() { - - final Var<IV> u = Var.var("u"); - - final SPOPredicate p1 = new SPOPredicate(relation, u, rdfType, rdfsClass); - - final Predicate p2 = new Predicate(new IVariableOrConstant[] { u, - rdfType, rdfsClass }, new NV( - Predicate.Annotations.RELATION_NAME, new String[] { relation })); - - log.info(p1.toString()); - - log.info(p2.toString()); - - assertTrue(p1.equals(p2)); - - assertTrue(p2.equals(p1)); - - } - - /** - * Note: {@link HashMap} support will breaks unless the {@link IPredicate} - * class defines <code>equals(Object o)</code>. If it just defines - * <code>equals(IPredicate)</code> then {@link Object#equals(Object)} will - * be invoked instead! - */ - public void test_hashMapSameImpl() { - - final Var<IV> u = Var.var("u"); - - final SPOPredicate p1 = new SPOPredicate(relation, u, rdfsSubClassOf, - rdfsResource); - - final Predicate<?> p1b = new Predicate(new IVariableOrConstant[] { u, - rdfsSubClassOf, rdfsResource }, new NV(Predicate.Annotations.RELATION_NAME, - new String[] { relation })); - - final SPOPredicate p2 = new SPOPredicate(relation, u, rdfType, - rdfsClass); - - final Predicate<?> p2b = new Predicate(new IVariableOrConstant[] { u, - rdfType, rdfsClass }, new NV(Predicate.Annotations.RELATION_NAME, - new String[] { relation })); - - // p1 and p1b compare as equal. - assertTrue(p1.equals(p1)); - assertTrue(p1.equals(p1b)); - assertTrue(p1b.equals(p1)); - assertTrue(p1b.equals(p1b)); - - // {p1,p1b} not equal {p2,p2b} - assertFalse(p1.equals(p2)); - assertFalse(p1.equals(p2b)); - assertFalse(p1b.equals(p2)); - assertFalse(p1b.equals(p2b)); - - // {p1,p1b} have the same hash code. - assertEquals(p1.hashCode(), p1b.hashCode()); - - // {p2,p2b} have the same hash code. - assertEquals(p2.hashCode(), p2b.hashCode()); - - final HashMap<IPredicate,String> map = new HashMap<IPredicate,String>(); - - assertFalse(map.containsKey(p1)); - assertFalse(map.containsKey(p2)); - assertFalse(map.containsKey(p1b)); - assertFalse(map.containsKey(p2b)); - - assertEquals(0,map.size()); - assertNull(map.put(p1,"p1")); - assertEquals(1,map.size()); - assertEquals("p1",map.put(p1,"p1")); - assertEquals(1,map.size()); - - assertTrue(p1.equals(p1b)); - assertTrue(p1b.equals(p1)); - assertTrue(p1.hashCode()==p1b.hashCode()); - assertEquals("p1",map.put(p1b,"p1")); - assertEquals(1,map.size()); - - assertTrue(map.containsKey(p1)); - assertTrue(map.containsKey(p1b)); - assertFalse(map.containsKey(p2)); - assertFalse(map.containsKey(p2b)); - - assertEquals("p1",map.get(p1)); - assertEquals("p1",map.get(p1b)); - - map.put(p2,"p2"); - - assertTrue(map.containsKey(p1)); - assertTrue(map.containsKey(p1b)); - assertTrue(map.containsKey(p2)); - assertTrue(map.containsKey(p2b)); - - assertEquals("p2",map.get(p2)); - assertEquals("p2",map.get(p2b)); - - } - } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-05 20:28:28
|
Revision: 3732 http://bigdata.svn.sourceforge.net/bigdata/?rev=3732&view=rev Author: thompsonbry Date: 2010-10-05 20:28:22 +0000 (Tue, 05 Oct 2010) Log Message: ----------- Bug fix for "duplicate bop" reporting problem. I've simply taken out the definition of equals() and hashCode() for Predicate. Those methods were once used to cache access paths, but they have not been used in that manner for a long time. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-05 20:18:36 UTC (rev 3731) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-05 20:28:22 UTC (rev 3732) @@ -603,4 +603,95 @@ } + /* + * Note: I've played around with a few hash functions and senses of + * equality. Predicate (before the bops were introduced) used to have a + * hashCode() and equals() which was used to cache access paths, but that is + * long gone. The problem with specifying a hashCode() and equals() method + * for BOp/BOpBase/Predicate is that we wind up with duplicate bop + * exceptions being reported by BOpUtility#getIndex(BOp). + */ + +// /** +// * <code>true</code> if all arguments and annotations are the same. +// */ +// public boolean equals(final Object other) { +// +// if (this == other) +// return true; +// +// if (!(other instanceof BOp)) +// return false; +// +// final BOp o = (BOp) other; +// +// final int arity = arity(); +// +// if (arity != o.arity()) +// return false; +// +// for (int i = 0; i < arity; i++) { +// +// final BOp x = get(i); +// +// final BOp y = o.get(i); +// +// /* +// * X Y +// * same same : continue (includes null == null); +// * null other : return false; +// * !null other : if(!x.equals(y)) return false. +// */ +// if (x != y || x == null || !(x.equals(y))) { +//// && (// +//// (x != null && !(x.equals(y))) || // +//// (y != null && !(y.equals(x))))// +//// ) { +// +// return false; +// +// } +// +// } +// +// return annotations.equals(o.annotations()); +// +// } +// +// /** +// * The hash code is based on the hash of the operands plus the optional +// * {@link BOp.Annotations#BOP_ID}. It is cached. +// */ +// public int hashCode() { +// +// int h = hash; +// +// if (h == 0) { +// +// final int n = arity(); +// +// for (int i = 0; i < n; i++) { +// +// h = 31 * h + get(i).hashCode(); +// +// } +// +// Integer id = (Integer) getProperty(Annotations.BOP_ID); +// +// if (id != null) +// h = 31 * h + id.intValue(); +// +// hash = h; +// +// } +// +// return h; +// +// } +// +// /** +// * Caches the hash code. +// */ +// private int hash = 0; + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-05 20:18:36 UTC (rev 3731) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-05 20:28:22 UTC (rev 3732) @@ -413,7 +413,7 @@ * trees whose sinks target a descendant, which is another way * to create a loop. */ - throw new DuplicateBOpException("id=" + t.getId() + ", root=" + throw new DuplicateBOpException("dup=" + t + ", root=" + toString(op)); } } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-05 20:18:36 UTC (rev 3731) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-05 20:28:22 UTC (rev 3732) @@ -543,63 +543,70 @@ } - public boolean equals(final Object other) { - - if (this == other) - return true; - - if(!(other instanceof IPredicate<?>)) - return false; - - final IPredicate<?> o = (IPredicate<?>)other; - - final int arity = arity(); - - if(arity != o.arity()) return false; - - for (int i = 0; i < arity; i++) { - - final IVariableOrConstant<?> x = get(i); - - final IVariableOrConstant<?> y = o.get(i); - - if (x != y && !(x.equals(y))) { - - return false; - - } - - } - - return true; - - } + /* + * Intentionally removed. See BOpBase. + * + * hashCode() and equals() for Predicate were once used to cache access + * paths, but that code was history long before we developed the bop model. + */ - public int hashCode() { - - int h = hash; +// public boolean equals(final Object other) { +// +// if (this == other) +// return true; +// +// if(!(other instanceof IPredicate<?>)) +// return false; +// +// final IPredicate<?> o = (IPredicate<?>)other; +// +// final int arity = arity(); +// +// if(arity != o.arity()) return false; +// +// for (int i = 0; i < arity; i++) { +// +// final IVariableOrConstant<?> x = get(i); +// +// final IVariableOrConstant<?> y = o.get(i); +// +// if (x != y && !(x.equals(y))) { +// +// return false; +// +// } +// +// } +// +// return true; +// +// } +// +// public int hashCode() { +// +// int h = hash; +// +// if (h == 0) { +// +// final int n = arity(); +// +// for (int i = 0; i < n; i++) { +// +// h = 31 * h + get(i).hashCode(); +// +// } +// +// hash = h; +// +// } +// +// return h; +// +// } +// +// /** +// * Caches the hash code. +// */ +// private int hash = 0; - if (h == 0) { - - final int n = arity(); - - for (int i = 0; i < n; i++) { - - h = 31 * h + get(i).hashCode(); - - } - - hash = h; - - } - - return h; - - } - - /** - * Caches the hash code. - */ - private int hash = 0; - } Modified: branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2010-10-05 20:18:36 UTC (rev 3731) +++ branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java 2010-10-05 20:28:22 UTC (rev 3732) @@ -79,9 +79,6 @@ * @param s * @param p * @param o - * - * @deprecated Only used by the unit tests. They should use the shallow - * copy constructor form. */ public SPOPredicate(final String relationName, final IVariableOrConstant<IV> s, This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ble...@us...> - 2010-10-05 20:18:42
|
Revision: 3731 http://bigdata.svn.sourceforge.net/bigdata/?rev=3731&view=rev Author: blevine218 Date: 2010-10-05 20:18:36 +0000 (Tue, 05 Oct 2010) Log Message: ----------- Execute unit tests using surefire plugin (part 2) - Unit tests now run as part of the build lifecycle. - Unit tests can be skipped by adding -DskipTests to the mvn command-line - log4j.configuration and log4j.path now point to new, less verbose log4j.properties file located in .../testing/conf/log4j.properties Modified Paths: -------------- branches/maven_scaleout/bigdata-core/pom.xml branches/maven_scaleout/bigdata-integ/pom.xml branches/maven_scaleout/pom.xml Added Paths: ----------- branches/maven_scaleout/bigdata-core/src/test/deploy/testing/conf/log4j.properties Modified: branches/maven_scaleout/bigdata-core/pom.xml =================================================================== --- branches/maven_scaleout/bigdata-core/pom.xml 2010-10-05 19:57:25 UTC (rev 3730) +++ branches/maven_scaleout/bigdata-core/pom.xml 2010-10-05 20:18:36 UTC (rev 3731) @@ -14,38 +14,37 @@ <name>Bigdata Core</name> <url>http://www.bigdata.com</url> + <description>The Bigdata Core</description> - <properties> - <!-- group ID for non-public bigdata dependencies. --> + <properties> + <!-- group ID for non-public bigdata dependencies. --> <thirdParty.groupId>com.bigdata.thirdparty</thirdParty.groupId> - - <!-- These properties are used to set system properties that are required by the unit - tests. See the surefire plugin configuration below. Any of these can be overridden on - the command-line by adding -D<property-name>=<propert-value>. - --> - - <!-- Skip running unit tests by default until we get the kinks worked out. Can be overridden - by adding '-DskipTests=false' to the command-line --> - <skipTests>true</skipTests> - - <!-- This directory is created when the deployment tarball is created - during the 'package' phase by the maven assembly plugin. If the naming - convention as defined in src/main/assembly/deploy.xml changes, this property - needs to change as well. --> + + <!-- + These properties are used to set system properties that are required by the unit tests. See the surefire plugin configuration below. Any of these can be overridden on the command-line by + adding -D<property-name>=<propert-value>. + --> + + <!-- + This directory is created when the deployment tarball is created during the 'package' phase by the maven assembly plugin. If the naming convention as defined in + src/main/assembly/deploy.xml changes, this property needs to change as well. + + TODO: May want to see if these can be promoted to the parent POM and shared with the bigdata-integ module + --> <deploy.root.dir>${project.build.directory}/${project.artifactId}-${project.version}-deploy</deploy.root.dir> - + <deploy.dir>${deploy.root.dir}/${project.artifactId}-${project.version}</deploy.dir> <app.home>${deploy.dir}</app.home> <test.dir>${deploy.dir}/testing</test.dir> - <deploy.conf.dir>${test.dir}/conf</deploy.conf.dir> + <test.conf.dir>${test.dir}/conf</test.conf.dir> <deploy.lib>${deploy.dir}/lib</deploy.lib> <test.codebase.dir>${deploy.lib.dl}</test.codebase.dir> <test.codebase.port>23333</test.codebase.port> - <java.security.policy>${deploy.conf.dir}/policy.all</java.security.policy> - <log4j.configuration>${deploy.dir}/var/config/logging/log4j.properties</log4j.configuration> + <java.security.policy>${test.conf.dir}/policy.all</java.security.policy> + <log4j.configuration>${test.conf.dir}/log4j.properties</log4j.configuration> <java.net.preferIPv4Stack>true</java.net.preferIPv4Stack> <default.nic>eth0</default.nic> - + <!-- Set to empty string to indicate "unset." Application code will set a reasonable default --> <federation.name></federation.name> </properties> @@ -56,8 +55,10 @@ <artifactId>maven-compiler-plugin</artifactId> <configuration> <compilerArguments> - <!-- Apparently Javac may compile java source files inside jars put on the classpath. Weird. Zookeeper 3.2.1 jar contained classes and sources, and under some circumstances, the - java files were getting recompiled and put into the bigdata jar. This setting forces javac to only look for source in the current maven source directory. --> + <!-- + Apparently Javac may compile java source files inside jars put on the classpath. Weird. Zookeeper 3.2.1 jar contained classes and sources, and under some circumstances, the + java files were getting recompiled and put into the bigdata jar. This setting forces javac to only look for source in the current maven source directory. + --> <sourcepath>${project.build.sourceDirectory}</sourcepath> </compilerArguments> </configuration> @@ -110,9 +111,10 @@ </executions> </plugin> - <!-- Configure the surefire plugin to run in the 'integration-test' phase rather than the 'test' phase. - This allows us to run the unit tests after packaging has been done, but before the build artifacts - are installed during the 'install' phase --> + <!-- + Configure the surefire plugin to run in the 'integration-test' phase rather than the 'test' phase. This allows us to run the unit tests after packaging has been done, but before the + build artifacts are installed during the 'install' phase + --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> @@ -127,10 +129,11 @@ <goal>test</goal> </goals> <configuration> + <testFailureIgnore>true</testFailureIgnore> <skip>false</skip> <!-- Run only the top-level suite which in turn runs the other suites and tests --> <test>com.bigdata.TestAll</test> - + <!-- These system properties are required by the unit tests. --> <systemPropertyVariables> <java.security.policy>${java.security.policy}</java.security.policy> @@ -141,7 +144,7 @@ <log4j.path>${log4j.configuration}</log4j.path> <default.nic>${default.nic}</default.nic> <federation.name>${federation.name}</federation.name> - + <classserver.jar>${deploy.lib}/classserver.jar</classserver.jar> <colt.jar>${deploy.lib}/colt.jar</colt.jar> <ctc_utils.jar>${deploy.lib}/ctc_utils.jar</ctc_utils.jar> @@ -429,8 +432,10 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> - <!-- These exclusions are to address the fact that 1.2.15 added new features that depends on Sun specific jars, but these jars cannot be made available due to Sun's click-through requirement - on them. We aren't using the new features anyway, so they are safe to exclude. log4j should have made these optional in their POM. --> + <!-- + These exclusions are to address the fact that 1.2.15 added new features that depends on Sun specific jars, but these jars cannot be made available due to Sun's click-through + requirement on them. We aren't using the new features anyway, so they are safe to exclude. log4j should have made these optional in their POM. + --> <exclusions> <exclusion> <groupId>javax.mail</groupId> @@ -590,7 +595,13 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.5</version> + <reportSets> + <reportSet> + <reports> + <report>report-only</report> + </reports> + </reportSet> + </reportSets> </plugin> <plugin> Added: branches/maven_scaleout/bigdata-core/src/test/deploy/testing/conf/log4j.properties =================================================================== --- branches/maven_scaleout/bigdata-core/src/test/deploy/testing/conf/log4j.properties (rev 0) +++ branches/maven_scaleout/bigdata-core/src/test/deploy/testing/conf/log4j.properties 2010-10-05 20:18:36 UTC (rev 3731) @@ -0,0 +1,35 @@ +# Default log4j configuration. See the individual classes for the +# specific loggers, but generally they are named for the class in +# which they are defined. + +# Default log4j configuration for testing purposes. +# +# You probably want to set the default log level to ERROR. + +#log4j.rootCategory=ALL, dest2 +log4j.rootCategory=ERROR, dest2 + +# Loggers. +# Note: logging here at INFO or DEBUG will significantly impact throughput! +#log4j.logger.com.bigdata=ERROR +#log4j.logger.org.apache=ERROR +#log4j.junit=ERROR + +# dest1 +log4j.appender.dest1=org.apache.log4j.ConsoleAppender +log4j.appender.dest1.layout=org.apache.log4j.PatternLayout +log4j.appender.dest1.layout.ConversionPattern=%-5p: %r %l: %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-5p: %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +#log4j.appender.dest1.layout.ConversionPattern=%-4r(%d) [%t] %-5p %c(%l:%M) %x - %m%n + +# dest2 includes the thread name and elapsed milliseconds. +# Note: %r is elapsed milliseconds. +# Note: %t is the thread name. +# See http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html +log4j.appender.dest2=org.apache.log4j.ConsoleAppender +log4j.appender.dest2.layout=org.apache.log4j.PatternLayout +log4j.appender.dest2.layout.ConversionPattern=%-5p: %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n + + + Modified: branches/maven_scaleout/bigdata-integ/pom.xml =================================================================== --- branches/maven_scaleout/bigdata-integ/pom.xml 2010-10-05 19:57:25 UTC (rev 3730) +++ branches/maven_scaleout/bigdata-integ/pom.xml 2010-10-05 20:18:36 UTC (rev 3731) @@ -10,6 +10,7 @@ <groupId>com.bigdata</groupId> <artifactId>bigdata-integration-test</artifactId> <name>bigdata Integration Tests</name> + <description>The Bigdata Integration Tests</description> <!-- Note: Most properties include the "integ." prefix so that they @@ -221,7 +222,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.6</version> <reportSets> <reportSet> <id>integration-tests</id> Modified: branches/maven_scaleout/pom.xml =================================================================== --- branches/maven_scaleout/pom.xml 2010-10-05 19:57:25 UTC (rev 3730) +++ branches/maven_scaleout/pom.xml 2010-10-05 20:18:36 UTC (rev 3731) @@ -1,5 +1,4 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bigdata</groupId> @@ -49,13 +48,13 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.5</version> + <version>2.6</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> - <version>2.5</version> + <version>2.6</version> </plugin> <plugin> @@ -84,11 +83,11 @@ <dependencyManagement> <dependencies> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.8.1</version> - <scope>test</scope> - </dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.8.1</version> + <scope>test</scope> + </dependency> <!-- slf4j --> @@ -106,7 +105,10 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.5</version> + <version>2.6</version> + <configuration> + <xrefLocation>${project.reporting.outputDirectory}/../xref-test</xrefLocation> + </configuration> </plugin> </plugins> </reporting> @@ -148,14 +150,16 @@ <configuration> <jdk>1.5</jdk> <!-- - To use clover with bigdata, you should - add the following to your maven settings - file, .m2/settings.xml <profile> - <id>bigdata-clover</id> <activation> - <activeByDefault>true</activeByDefault> - </activation> <properties> - <maven.clover.licenseLocation>/your/path/to/clover.license</maven.clover.licenseLocation> - </properties> </profile> + To use clover with bigdata, you should add the following to your maven settings file, .m2/settings.xml + <profile> + <id>bigdata-clover</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <maven.clover.licenseLocation>/your/path/to/clover.license</maven.clover.licenseLocation> + </properties> + </profile> --> </configuration> </plugin> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2010-10-05 19:57:35
|
Revision: 3730 http://bigdata.svn.sourceforge.net/bigdata/?rev=3730&view=rev Author: thompsonbry Date: 2010-10-05 19:57:25 +0000 (Tue, 05 Oct 2010) Log Message: ----------- UNION is still not implemented. It is the only remaining feature to be code complete for scale-out quads query. The decision tree rewrites in Rule2BOpUtility are still disabled by default. However, if you enable it and then run TestBigdataSailWithQuads, only one additional test fails ("Basic - Var 2") due to a "duplicate bop exception". The problem goes back to the hashCode() and equals() implementations of Predicate. I will address those in a second commit. I also plan to look at some optimizations, including: deferring some aspects of default graph and named graph rewrites to run time; improved pipelining in the QueryEngine (chaining BlockingBuffers together in pipes). - Constants are no longer cloned during deep copy of operator trees. Also, both Constant and Var now extend a common base class which disallows copy-on-write mutations to their annotations. - Hooked in cost model for SCALE-OUT. This requires a mixture of models to approximate the shard views. Changed from a scalar 'cost' into a ScanCostReport and a SubqueryCostReport. The cost reports are attached as annotations to the query plan by Rule2BOpUtility. - Added a shard split handler for the xxxC indices. Added a test suite for that split handler. Refactored the existing split handler test suites to have one test suite per split handler implementation. - Fixed a bug in DataSetJoin. It was not constraining the output solutions correctly. - Removed the "NESTED_SUBQUERY" option, the implementation class (NestedSubqueryWithJoinThreadsTask), and all of the logic to test that join throughout the various test suites. - Cleaned up logic to obtain an AccessPath (all code paths). There was a broken assumption built into the refactor that the IKeyOrder could be specified on the IPredicate as an annotation. This does not work because we select the IKeyOrder, and hence the IAccessPath, based on the asBound predicate during joins in order to always read on the most efficient access path. This is also true in scale-out where we map each binding set across the predicate for the target join in order to obtain the asBound state for that predicate and then push the binding set to the shard on which that asBound predicate will read (or write). - Fixed mapping over shards. It assumed that the target IKeyOrder was fixed in advance. In fact, it should be determined at runtime for normal pipeline joins. - Moved the Rule2BOpUtility and associated classes to com.bigdata.sail since they are SPARQL specific. - Renamed ISolutionExpander to IAccessPathExpander - An occasional test failure appears in TestQueryEngine#test_queryJoin2_concurrentStressTest() where it reports one failure ("nerror=1, ncancel=0, ntimeout=0, ninterrupt=0, nsuccess=999"). I have not tracked this down, but I have noticed this off and on for several days. - TestFederatedQueryEngine is hanging / having controller identity issues. This shows up on test_query_join_2shards_nothingBoundOnAccessPath(). Once it happens the query engine does not tear down correctly and CI will hang. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/architecture/query-cost-model.xls branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/RunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/FederatedRunningQuery.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_FullyBoundPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Algorithm_NestedLocatorScan.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/Bundle.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/fed/shards/MapBindingSetsOverShardsBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractBTree.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/AbstractTuple.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTree.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeStatistics.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/BTreeUtilizationReport.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/ILocalBTreeView.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/IndexSegment.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/btree/UnisolatedReadWriteIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractRelation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/AbstractResource.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/IRelation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/RelationFusedView.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/EmptyAccessPathExpander.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/AbstractJoinNexus.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/DefaultEvaluationPlan2.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/DefaultRangeCountFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/DefaultRuleTaskFactory.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/IJoinNexus.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/NoReorderEvaluationPlan.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/pipeline/DistributedJoinMasterTask.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/search/FullTextIndex.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/AbstractScaleOutFederation.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/service/IBigdataClient.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/ChunkedWrappedIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/striterator/IChunkedIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/resources/logging/log4j.properties branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/TestBOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/ap/TestPredicate.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/engine/TestQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/TestFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/bop/fed/shards/TestMapBindingSetsOverShards.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/locator/TestDefaultResourceLocator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/rule/TestRule.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/TestSegSplitter.java branches/QUADS_QUERY_BRANCH/bigdata-jini/src/test/com/bigdata/bop/fed/jini/TestJiniFederatedQueryEngine.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/bop/rdf/join/DataSetJoin.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/OwlSameAsPropertiesExpandingIterator.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/lexicon/LexiconRelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/magic/MagicRelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/DefaultGraphSolutionExpander.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/NamedGraphSolutionExpander.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOAccessPath.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOPredicate.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPORelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/bop/rdf/join/TestDataSetJoin.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestContextAdvancer.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestDatabaseAtOnceClosure.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestOptionals.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestRuleExpansion.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/rules/TestSlice.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestSPORelation.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/store/StressTestCentos.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataEvaluationStrategyImpl.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/FreeTextSearchExpander.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestNamedGraphs.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestOptionals.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/tck/BigdataConnectionTest.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/tck/BigdataSparqlTest.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/tck/BigdataStoreTest.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java branches/QUADS_QUERY_BRANCH/ctc-striterators/src/java/cutthecrap/utils/striterators/NOPFilter.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ImmutableBOp.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/ScanCostReport.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/SubqueryCostReport.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/IAccessPathExpander.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/AbstractTestSegSplitter.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/TestFixedLengthPrefixShardSplits.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/resources/TestSparseRowStoreSplitHandler.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/XXXCShardSplitHandler.java branches/QUADS_QUERY_BRANCH/bigdata-rdf/src/test/com/bigdata/rdf/spo/TestXXXCShardSplitHandler.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/DGExpander.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/DataSetSummary.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithQuads.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithQuadsWithoutInlining.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/Rule2BOpUtility.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/ISolutionExpander.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/rule/eval/NestedSubqueryWithJoinThreadsTask.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithQuads.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithQuadsAndPipelineJoins.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithQuadsAndPipelineJoinsWithoutInlining.java Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/architecture/query-cost-model.xls =================================================================== (Binary files differ) Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpBase.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -37,6 +37,7 @@ import java.util.Map; import com.bigdata.bop.constraint.EQ; +import com.bigdata.btree.Tuple; /** * Abstract base class for {@link BOp}s. @@ -411,10 +412,12 @@ * The name. * @param value * The value. + * + * @return The old value. */ - protected void _setProperty(final String name, final Object value) { + protected Object _setProperty(final String name, final Object value) { - annotations.put(name,value); + return annotations.put(name,value); } @@ -472,7 +475,7 @@ final BOpBase tmp = this.clone(); - if (tmp.annotations.put(name, value) != null) + if (tmp._setProperty(name, value) != null) throw new IllegalStateException("Already set: name=" + name + ", value=" + value); @@ -480,6 +483,49 @@ } + /** + * Clear the named annotation. + * + * @param name + * The annotation. + * @return A copy of this {@link BOp} in which the named annotation has been + * removed. + */ + public BOpBase clearProperty(final String name) { + + if (name == null) + throw new IllegalArgumentException(); + + final BOpBase tmp = this.clone(); + + tmp._clearProperty(name); + + return tmp; + + } + + /** + * Strips off the named annotations. + * + * @param names + * The annotations to be removed. + * + * @return A copy of this {@link BOp} in which the specified annotations do not appear. + */ + public BOp clearAnnotations(final String[] names) { + + final BOpBase tmp = this.clone(); + + for(String name : names) { + + tmp._clearProperty(name); + + } + + return tmp; + + } + public int getId() { return (Integer) getRequiredProperty(Annotations.BOP_ID); @@ -489,14 +535,23 @@ public String toString() { final StringBuilder sb = new StringBuilder(); -// sb.append(getClass().getName()); - sb.append(super.toString()); + sb.append(getClass().getName()); +// sb.append(super.toString()); + final Integer bopId = (Integer) getProperty(Annotations.BOP_ID); + if (bopId != null) { + sb.append("[" + bopId + "]"); + } sb.append("("); - for (int i = 0; i < args.length; i++) { - final BOp t = args[i]; - if (i > 0) + int nwritten = 0; + for (BOp t : args) { + if (nwritten > 0) sb.append(','); sb.append(t.getClass().getSimpleName()); + final Integer tid = (Integer) t.getProperty(Annotations.BOP_ID); + if (tid != null) { + sb.append("[" + tid + "]"); + } + nwritten++; } sb.append(")"); annotationsToString(sb); @@ -510,11 +565,16 @@ sb.append("["); boolean first = true; for (Map.Entry<String, Object> e : annotations.entrySet()) { - if (!first) + if (first) + sb.append(" "); + else sb.append(", "); if (e.getValue() != null && e.getValue().getClass().isArray()) { sb.append(e.getKey() + "=" + Arrays.toString((Object[]) e.getValue())); + } else if (e.getKey() == IPredicate.Annotations.FLAGS) { + sb.append(e.getKey() + "=" + + Tuple.flagString((Integer) e.getValue())); } else { sb.append(e.getKey() + "=" + e.getValue()); } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpContextBase.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -30,25 +30,15 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import org.apache.log4j.Logger; - import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.btree.IIndex; import com.bigdata.btree.ILocalBTreeView; -import com.bigdata.btree.IRangeQuery; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.TimestampUtility; -import com.bigdata.relation.AbstractRelation; import com.bigdata.relation.IRelation; -import com.bigdata.relation.accesspath.AccessPath; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.locator.IResourceLocator; import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.ISolutionExpander; import com.bigdata.relation.rule.eval.IJoinNexus; -import com.bigdata.service.DataService; import com.bigdata.service.IBigdataFederation; -import com.bigdata.striterator.IKeyOrder; /** * Base class for the bigdata operation evaluation context (NOT serializable). @@ -58,11 +48,16 @@ */ public class BOpContextBase { - static private final transient Logger log = Logger.getLogger(BOpContextBase.class); +// static private final transient Logger log = Logger.getLogger(BOpContextBase.class); -// private final QueryEngine queryEngine; - + /** + * The federation iff running in scale-out. + */ private final IBigdataFederation<?> fed; + + /** + * The <strong>local</strong> index manager. + */ private final IIndexManager indexManager; /** @@ -104,8 +99,11 @@ /** * Core constructor. + * * @param fed + * The federation iff running in scale-out. * @param indexManager + * The <strong>local</strong> index manager. */ public BOpContextBase(final IBigdataFederation<?> fed, final IIndexManager indexManager) { @@ -232,7 +230,7 @@ * conversion is done. It has much of the same logic (this also * handles remote access paths now). * - * @todo Support mutable relation views. + * @todo Support mutable relation views (no - just fix truth maintenance). */ // @SuppressWarnings("unchecked") public <E> IAccessPath<E> getAccessPath(final IRelation<E> relation, @@ -244,147 +242,149 @@ if (predicate == null) throw new IllegalArgumentException(); - /* - * FIXME This should be as assigned by the query planner so the query is - * fully declarative. - */ - final IKeyOrder<E> keyOrder; - { - final IKeyOrder<E> tmp = predicate.getKeyOrder(); - if (tmp != null) { - // use the specified index. - keyOrder = tmp; - } else { - // ask the relation for the best index. - keyOrder = relation.getKeyOrder(predicate); - } - } + return relation.getAccessPath(indexManager/* localIndexManager */, + relation.getKeyOrder(predicate), predicate); - if (keyOrder == null) - throw new RuntimeException("No access path: " + predicate); +// /* +// * Note: ALWAYS use the "perfect" index. +// */ +// final IKeyOrder<E> keyOrder = relation.getKeyOrder(predicate); +//// { +//// final IKeyOrder<E> tmp = predicate.getKeyOrder(); +//// if (tmp != null) { +//// // use the specified index. +//// keyOrder = tmp; +//// } else { +//// // ask the relation for the best index. +//// keyOrder = relation.getKeyOrder(predicate); +//// } +//// } +//// +//// if (keyOrder == null) +//// throw new RuntimeException("No access path: " + predicate); +// +// final int partitionId = predicate.getPartitionId(); +// +// final long timestamp = (Long) predicate +// .getRequiredProperty(BOp.Annotations.TIMESTAMP); +// +// final int flags = predicate.getProperty( +// IPredicate.Annotations.FLAGS, +// IPredicate.Annotations.DEFAULT_FLAGS) +// | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY +// : 0); +// +// final int chunkOfChunksCapacity = predicate.getProperty( +// BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, +// BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); +// +// final int chunkCapacity = predicate.getProperty( +// BufferAnnotations.CHUNK_CAPACITY, +// BufferAnnotations.DEFAULT_CHUNK_CAPACITY); +// +// final int fullyBufferedReadThreshold = predicate.getProperty( +// IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, +// IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); +// +// if (partitionId != -1) { +// +// /* +// * Note: This handles a read against a local index partition. For +// * scale-out, the [indexManager] will be the data service's local +// * index manager. +// * +// * Note: Expanders ARE NOT applied in this code path. Expanders +// * require a total view of the relation, which is not available +// * during scale-out pipeline joins. Likewise, the [backchain] +// * property will be ignored since it is handled by an expander. +// * +// * @todo Replace this with IRelation#getAccessPathForIndexPartition() +// */ +//// return ((AbstractRelation<?>) relation) +//// .getAccessPathForIndexPartition(indexManager, +//// (IPredicate) predicate); +// +// /* +// * @todo This is an error since expanders are currently ignored on +// * shard-wise access paths. While it is possible to enable expanders +// * for shard-wise access paths. +// */ +// if (predicate.getSolutionExpander() != null) +// throw new IllegalArgumentException(); +// +// final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); +// +// // The name of the desired index partition. +// final String name = DataService.getIndexPartitionName(namespace +// + "." + keyOrder.getIndexName(), partitionId); +// +// // MUST be a local index view. +// final ILocalBTreeView ndx = (ILocalBTreeView) indexManager +// .getIndex(name, timestamp); +// +// return new AccessPath<E>(relation, indexManager, timestamp, +// predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, +// chunkCapacity, fullyBufferedReadThreshold).init(); +// +// } +// +//// accessPath = relation.getAccessPath((IPredicate) predicate); +// +// // Decide on a local or remote view of the index. +// final IIndexManager indexManager; +// if (predicate.isRemoteAccessPath()) { +// // use federation in scale-out for a remote access path. +// indexManager = fed != null ? fed : this.indexManager; +// } else { +// indexManager = this.indexManager; +// } +// +// // Obtain the index. +// final String fqn = AbstractRelation.getFQN(relation, keyOrder); +// final IIndex ndx = AbstractRelation.getIndex(indexManager, fqn, timestamp); +// +// if (ndx == null) { +// +// throw new IllegalArgumentException("no index? relation=" +// + relation.getNamespace() + ", timestamp=" + timestamp +// + ", keyOrder=" + keyOrder + ", pred=" + predicate +// + ", indexManager=" + getIndexManager()); +// +// } +// +// // Obtain the access path for that relation and index. +// final IAccessPath<E> accessPath = ((AbstractRelation<E>) relation) +// .newAccessPath(relation, indexManager, timestamp, predicate, +// keyOrder, ndx, flags, chunkOfChunksCapacity, +// chunkCapacity, fullyBufferedReadThreshold); +// +// // optionally wrap with an expander pattern. +// return expander(predicate, accessPath); - final int partitionId = predicate.getPartitionId(); - - final long timestamp = (Long) predicate - .getRequiredProperty(BOp.Annotations.TIMESTAMP); - - final int flags = predicate.getProperty( - IPredicate.Annotations.FLAGS, - IPredicate.Annotations.DEFAULT_FLAGS) - | (TimestampUtility.isReadOnly(timestamp) ? IRangeQuery.READONLY - : 0); - - final int chunkOfChunksCapacity = predicate.getProperty( - BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, - BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY); - - final int chunkCapacity = predicate.getProperty( - BufferAnnotations.CHUNK_CAPACITY, - BufferAnnotations.DEFAULT_CHUNK_CAPACITY); - - final int fullyBufferedReadThreshold = predicate.getProperty( - IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, - IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD); - - if (partitionId != -1) { - - /* - * Note: This handles a read against a local index partition. For - * scale-out, the [indexManager] will be the data service's local - * index manager. - * - * Note: Expanders ARE NOT applied in this code path. Expanders - * require a total view of the relation, which is not available - * during scale-out pipeline joins. Likewise, the [backchain] - * property will be ignored since it is handled by an expander. - * - * @todo Replace this with IRelation#getAccessPathForIndexPartition() - */ -// return ((AbstractRelation<?>) relation) -// .getAccessPathForIndexPartition(indexManager, -// (IPredicate) predicate); - - /* - * @todo This is an error since expanders are currently ignored on - * shard-wise access paths. While it is possible to enable expanders - * for shard-wise access paths. - */ - if (predicate.getSolutionExpander() != null) - throw new IllegalArgumentException(); - - final String namespace = relation.getNamespace();//predicate.getOnlyRelationName(); - - // The name of the desired index partition. - final String name = DataService.getIndexPartitionName(namespace - + "." + keyOrder.getIndexName(), partitionId); - - // MUST be a local index view. - final ILocalBTreeView ndx = (ILocalBTreeView) indexManager - .getIndex(name, timestamp); - - return new AccessPath<E>(relation, indexManager, timestamp, - predicate, keyOrder, ndx, flags, chunkOfChunksCapacity, - chunkCapacity, fullyBufferedReadThreshold).init(); - - } - -// accessPath = relation.getAccessPath((IPredicate) predicate); - - // Decide on a local or remote view of the index. - final IIndexManager indexManager; - if (predicate.isRemoteAccessPath()) { - // use federation in scale-out for a remote access path. - indexManager = fed != null ? fed : this.indexManager; - } else { - indexManager = this.indexManager; - } - - // Obtain the index. - final String fqn = AbstractRelation.getFQN(relation, keyOrder); - final IIndex ndx = AbstractRelation.getIndex(indexManager, fqn, timestamp); - - if (ndx == null) { - - throw new IllegalArgumentException("no index? relation=" - + relation.getNamespace() + ", timestamp=" + timestamp - + ", keyOrder=" + keyOrder + ", pred=" + predicate - + ", indexManager=" + getIndexManager()); - - } - - // Obtain the access path for that relation and index. - final IAccessPath<E> accessPath = ((AbstractRelation<E>) relation) - .newAccessPath(relation, indexManager, timestamp, predicate, - keyOrder, ndx, flags, chunkOfChunksCapacity, - chunkCapacity, fullyBufferedReadThreshold); - - // optionally wrap with an expander pattern. - return expander(predicate, accessPath); - } - /** - * Optionally wrap with an expander pattern. - * - * @param predicate - * @param accessPath - * @return - * @param <E> - */ - private <E> IAccessPath<E> expander(final IPredicate<E> predicate, - final IAccessPath<E> accessPath) { +// /** +// * Optionally wrap with an expander pattern. +// * +// * @param predicate +// * @param accessPath +// * @return +// * @param <E> +// */ +// private <E> IAccessPath<E> expander(final IPredicate<E> predicate, +// final IAccessPath<E> accessPath) { +// +// final ISolutionExpander<E> expander = predicate.getSolutionExpander(); +// +// if (expander != null) { +// +// // allow the predicate to wrap the access path +// return expander.getAccessPath(accessPath); +// +// } +// +// return accessPath; +// +// } - final ISolutionExpander<E> expander = predicate.getSolutionExpander(); - - if (expander != null) { - - // allow the predicate to wrap the access path - return expander.getAccessPath(accessPath); - - } - - return accessPath; - - } - } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BOpUtility.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -413,7 +413,8 @@ * trees whose sinks target a descendant, which is another way * to create a loop. */ - throw new DuplicateBOpException(t.toString()); + throw new DuplicateBOpException("id=" + t.getId() + ", root=" + + toString(op)); } } // wrap to ensure immutable and thread-safe. @@ -528,4 +529,93 @@ } // toArray() + /** + * Pretty print a bop. + * + * @param bop + * The bop. + * + * @return The formatted representation. + */ + public static String toString(final BOp bop) { + + final StringBuilder sb = new StringBuilder(); + + toString(bop, sb, 0); + + // chop off the last \n + sb.setLength(sb.length() - 1); + + return sb.toString(); + + } + + private static void toString(final BOp bop, final StringBuilder sb, + final int indent) { + + sb.append(indent(indent)).append( + bop == null ? "<null>" : bop.toString()).append('\n'); + + if (bop == null) + return; + + for (BOp arg : bop.args()) { + + if (arg.arity() > 0) { + + toString(arg, sb, indent+1); + + } + + } + + } + + /** + * Returns a string that may be used to indent a dump of the nodes in + * the tree. + * + * @param height + * The height. + * + * @return A string suitable for indent at that height. + */ + private static String indent(final int height) { + + if( height == -1 ) { + + // The height is not defined. + + return ""; + + } + + return ws.substring(0, height * 4); + + } + + private static final transient String ws = " "; + +// /** +// * Verify that all bops from the identified <i>startId</i> to the root are +// * {@link PipelineOp}s and have an assigned {@link BOp.Annotations#BOP_ID}. +// * This is required in order for us to be able to target messages to those +// * operators. +// * +// * @param startId +// * The {@link BOp.Annotations#BOP_ID} at which the query will +// * start. This is typically the left-most descendant. +// * @param root +// * The root of the operator tree. +// * +// * @throws RuntimeException +// * if the operator tree does not meet any of the criteria for +// * pipeline evaluation. +// */ +// public static void verifyPipline(final int startId, final BOp root) { +// +// throw new UnsupportedOperationException(); +// +// } + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/BufferAnnotations.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -55,7 +55,7 @@ int DEFAULT_CHUNK_OF_CHUNKS_CAPACITY = 100; /** - * Sets the capacity of the {@link IBuffer}s used to accumulate a chunk of + * Sets the capacity of the {@link IBuffer}[]s used to accumulate a chunk of * {@link IBindingSet}s (default {@value #CHUNK_CAPACITY}). Partial chunks * may be automatically combined into full chunks. * Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Constant.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -29,7 +29,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -final public class Constant<E> extends BOpBase implements IConstant<E> { +final public class Constant<E> extends ImmutableBOp implements IConstant<E> { /** * @@ -84,6 +84,15 @@ } + /** + * Clone is overridden to reduce heap churn. + */ + final public Constant<E> clone() { + + return this; + + } + public String toString() { return value.toString(); @@ -150,5 +159,5 @@ throw new UnsupportedOperationException(); } - + } Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/IPredicate.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -45,7 +45,7 @@ import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.relation.rule.IRule; -import com.bigdata.relation.rule.ISolutionExpander; +import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.relation.rule.eval.IEvaluationPlan; import com.bigdata.relation.rule.eval.pipeline.JoinMasterTask; import com.bigdata.striterator.IKeyOrder; @@ -81,15 +81,15 @@ */ String RELATION_NAME = "relationName"; - /** - * The {@link IKeyOrder} which will be used to read on the relation. - * <p> - * Note: This is generally assigned by the query optimizer. The query - * optimizer considers the possible indices for each {@link IPredicate} - * in a query and the possible join orders and then specifies the order - * of the joins and the index to use for each join. - */ - String KEY_ORDER = "keyOrder"; +// /** +// * The {@link IKeyOrder} which will be used to read on the relation. +// * <p> +// * Note: This is generally assigned by the query optimizer. The query +// * optimizer considers the possible indices for each {@link IPredicate} +// * in a query and the possible join orders and then specifies the order +// * of the joins and the index to use for each join. +// */ +// String KEY_ORDER = "keyOrder"; /** * <code>true</code> iff the predicate is optional (the right operand of @@ -160,17 +160,28 @@ * Access path expander pattern. This allows you to wrap or replace the * {@link IAccessPath}. * <p> - * Note: You MUST be extremely careful when using this feature in - * scale-out. Access path expanders in scale-out are logically - * consistent with used with a {@link #REMOTE_ACCESS_PATH}, but remote - * access paths often lack the performance of a local access path. + * Note: Access path expanders in scale-out are logically consistent + * when used with a {@link #REMOTE_ACCESS_PATH}. However, remote access + * paths often lack the performance of a local access path. In order for + * the expander to be consistent with a local access path it MUST NOT + * rewrite the predicate in such a manner as to read on data onto found + * on the shard onto which the predicate was mapped during query + * evaluation. * <p> - * In order for the expander to be consistent with a local access path - * it MUST NOT rewrite the predicate in such a manner as to read on data - * onto found on the shard onto which the predicate was mapped during - * query evaluation. + * In general, scale-out query depends on binding sets being mapped onto + * the shards against which they will read or write. When an expander + * changes the bindings on an {@link IPredicate}, it typically changes + * the access path which will be used. This is only supported when the + * access path is {@link #REMOTE_ACCESS_PATH}. + * <p> + * Note: If an expander generates nested {@link IAccessPath}s then it + * typically must strip off both the {@link #ACCESS_PATH_EXPANDER} and + * the {@link #ACCESS_PATH_EXPANDER} from the {@link IPredicate} before + * generating the inner {@link IAccessPath} and then layer the + * {@link #ACCESS_PATH_FILTER} back onto the expanded visitation + * pattern. * - * @see ISolutionExpander + * @see IAccessPathExpander */ String ACCESS_PATH_EXPANDER = "accessPathExpander"; @@ -335,7 +346,7 @@ * enforced using {@link IConstraint}s on the {@link IRule}. * <p> * More control over the behavior of optionals may be gained through the use - * of an {@link ISolutionExpander} pattern. + * of an {@link IAccessPathExpander} pattern. * * @return <code>true</code> iff this predicate is optional when evaluating * a JOIN. @@ -348,14 +359,14 @@ * Returns the object that may be used to selectively override the * evaluation of the predicate. * - * @return The {@link ISolutionExpander}. + * @return The {@link IAccessPathExpander}. * * @see Annotations#ACCESS_PATH_EXPANDER * - * @todo replace with {@link ISolutionExpander#getAccessPath(IAccessPath)}, - * which is the only method declared by {@link ISolutionExpander}. + * @todo Replace with {@link IAccessPathExpander#getAccessPath(IAccessPath)} + * , which is the only method declared by {@link IAccessPathExpander}? */ - public ISolutionExpander<E> getSolutionExpander(); + public IAccessPathExpander<E> getAccessPathExpander(); // /** // * An optional constraint on the visitable elements. @@ -386,32 +397,32 @@ */ public IFilter getAccessPathFilter(); - /** - * Return the {@link IKeyOrder} assigned to this {@link IPredicate} by the - * query optimizer. - * - * @return The assigned {@link IKeyOrder} or <code>null</code> if the query - * optimizer has not assigned the {@link IKeyOrder} yet. - * - * @see Annotations#KEY_ORDER - */ - public IKeyOrder<E> getKeyOrder(); +// /** +// * Return the {@link IKeyOrder} assigned to this {@link IPredicate} by the +// * query optimizer. +// * +// * @return The assigned {@link IKeyOrder} or <code>null</code> if the query +// * optimizer has not assigned the {@link IKeyOrder} yet. +// * +// * @see Annotations#KEY_ORDER +// */ +// public IKeyOrder<E> getKeyOrder(); +// +// /** +// * Set the {@link IKeyOrder} annotation on the {@link IPredicate}, returning +// * a new {@link IPredicate} in which the annotation takes on the given +// * binding. +// * +// * @param keyOrder +// * The {@link IKeyOrder}. +// * +// * @return The new {@link IPredicate}. +// * +// * @see Annotations#KEY_ORDER +// */ +// public IPredicate<E> setKeyOrder(final IKeyOrder<E> keyOrder); /** - * Set the {@link IKeyOrder} annotation on the {@link IPredicate}, returning - * a new {@link IPredicate} in which the annotation takes on the given - * binding. - * - * @param keyOrder - * The {@link IKeyOrder}. - * - * @return The new {@link IPredicate}. - * - * @see Annotations#KEY_ORDER - */ - public IPredicate<E> setKeyOrder(final IKeyOrder<E> keyOrder); - - /** * Figure out if all positions in the predicate which are required to form * the key for this access path are bound in the predicate. */ @@ -487,6 +498,9 @@ /** * Return a new instance in which all occurrences of the given variable have * been replaced by the specified constant. + * <p> + * Note: The optimal {@link IKeyOrder} often changes when binding a variable + * on a predicate. * * @param var * The variable. @@ -504,6 +518,9 @@ /** * Return a new instance in which all occurrences of the variable appearing * in the binding set have been replaced by their bound values. + * <p> + * Note: The optimal {@link IKeyOrder} often changes when binding a variable + * on a predicate. * * @param bindingSet * The binding set. Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ImmutableBOp.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ImmutableBOp.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ImmutableBOp.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -0,0 +1,76 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 5, 2010 + */ + +package com.bigdata.bop; + +import java.util.Map; + +/** + * Base class for immutable operators such as {@link Var} and {@link Constant}. + * These operators do not deep copy their data and do not permit decoration with + * annotations. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +abstract public class ImmutableBOp extends BOpBase { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * @param op + */ + public ImmutableBOp(BOpBase op) { + super(op); + } + + /** + * @param args + * @param annotations + */ + public ImmutableBOp(BOp[] args, Map<String, Object> annotations) { + super(args, annotations); + } + + /* + * Overrides for the copy-on-write mutation API. + */ + + @Override + protected Object _setProperty(String name,Object value) { + throw new UnsupportedOperationException(); + } + + @Override + protected void _clearProperty(String name) { + throw new UnsupportedOperationException(); + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ImmutableBOp.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/Var.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -32,7 +32,7 @@ * limit the binding patterns across the rule. (In fact, variables can be * "named" by their index into the binding set for most purposes.) */ -final public class Var<E> extends BOpBase implements IVariable<E>, +final public class Var<E> extends ImmutableBOp implements IVariable<E>, Comparable<IVariable<E>> { private static final long serialVersionUID = -7100443208125002485L; Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/ap/Predicate.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -28,7 +28,6 @@ package com.bigdata.bop.ap; -import java.util.Iterator; import java.util.Map; import com.bigdata.bop.AbstractAccessPathOp; @@ -44,11 +43,11 @@ import com.bigdata.bop.NV; import com.bigdata.relation.accesspath.ElementFilter; import com.bigdata.relation.accesspath.IElementFilter; -import com.bigdata.relation.rule.ISolutionExpander; +import com.bigdata.relation.rule.IAccessPathExpander; import com.bigdata.striterator.IKeyOrder; -import cutthecrap.utils.striterators.FilterBase; import cutthecrap.utils.striterators.IFilter; +import cutthecrap.utils.striterators.NOPFilter; /** * A generic implementation of an immutable {@link IPredicate}. @@ -140,7 +139,7 @@ public Predicate(final IVariableOrConstant<?>[] values, final String relationName, final int partitionId, final boolean optional, final IElementFilter<E> constraint, - final ISolutionExpander<E> expander, final long timestamp) { + final IAccessPathExpander<E> expander, final long timestamp) { this(values, NV.asMap(new NV[] {// new NV(Annotations.RELATION_NAME,new String[]{relationName}),// @@ -258,9 +257,9 @@ } @SuppressWarnings("unchecked") - final public ISolutionExpander<E> getSolutionExpander() { + final public IAccessPathExpander<E> getAccessPathExpander() { - return (ISolutionExpander<E>) getProperty(Annotations.ACCESS_PATH_EXPANDER); + return (IAccessPathExpander<E>) getProperty(Annotations.ACCESS_PATH_EXPANDER); } @@ -325,18 +324,12 @@ final int arity = arity(); +// boolean modified = false; + for (int i = 0; i < arity; i++) { final IVariableOrConstant<?> t = (IVariableOrConstant<?>) get(i); -// if (t == null) { -// /* -// * Note: t != null handles the case where the [c] position of an -// * SPO is allowed to be null. -// */ -// continue; -// } - if (t.isConstant()) continue; @@ -350,6 +343,8 @@ } tmp.set(i, val.clone()); + +// modified = true; } @@ -363,22 +358,22 @@ } - @SuppressWarnings("unchecked") - public IKeyOrder<E> getKeyOrder() { - - return (IKeyOrder<E>) getProperty(Annotations.KEY_ORDER); - - } - - public Predicate<E> setKeyOrder(final IKeyOrder<E> keyOrder) { - - final Predicate<E> tmp = this.clone(); - - tmp._setProperty(Annotations.KEY_ORDER, keyOrder); - - return tmp; - - } +// @SuppressWarnings("unchecked") +// public IKeyOrder<E> getKeyOrder() { +// +// return (IKeyOrder<E>) getProperty(Annotations.KEY_ORDER); +// +// } +// +// public Predicate<E> setKeyOrder(final IKeyOrder<E> keyOrder) { +// +// final Predicate<E> tmp = this.clone(); +// +// tmp._setProperty(Annotations.KEY_ORDER, keyOrder); +// +// return tmp; +// +// } @SuppressWarnings("unchecked") public Predicate<E> clone() { @@ -486,38 +481,23 @@ /* * Wrap the filter. */ - _setProperty(name, new FilterBase() { - - @Override - protected Iterator filterOnce(Iterator src, Object context) { - return src; - } - - }.addFilter(current).addFilter(filter)); + _setProperty(name, new NOPFilter().addFilter(current) + .addFilter(filter)); } } - + /** - * Strips off the named annotations. - * - * @param names - * The annotations to be removed. - * - * @return A new predicate in which the specified annotations do not appear. + * Strengthened return type. + * <p> + * {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Predicate<E> clearAnnotations(final String[] names) { - - final Predicate<E> tmp = this.clone(); - - for(String name : names) { - - tmp._clearProperty(name); - - } - - return tmp; + return (Predicate<E>) super.clearAnnotations(names); + } public String toString() { @@ -533,7 +513,15 @@ final StringBuilder sb = new StringBuilder(); sb.append(getClass().getName()); + + final Integer bopId = (Integer) getProperty(Annotations.BOP_ID); + if (bopId != null) { + + sb.append("[" + bopId + "]"); + + } + sb.append("("); for (int i = 0; i < arity; i++) { Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/ScanCostReport.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/ScanCostReport.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/ScanCostReport.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -0,0 +1,76 @@ +package com.bigdata.bop.cost; + + +/** + * A report on the expected cost of an index key range scan. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * @version $Id$ + */ +public class ScanCostReport { + + /** + * The fast range count. + */ + public final long rangeCount; + + /** + * The #of index partitions in the scan. + */ + public final long shardCount; + + /** + * The expected cost of the scan (milliseconds). + */ + public final double cost; + + /** + * + * @param rangeCount + * The fast range count. + * @param cost + * The expected cost for the scan (milliseconds). + */ + public ScanCostReport(final long rangeCount, final double cost) { + + this.rangeCount = rangeCount; + + this.shardCount = 1; + + this.cost = cost; + + } + + /** + * + * @param rangeCount + * The fast range count. + * @param shardCount + * The #of index partitions in the scan. + * @param cost + * The expected cost for the scan (milliseconds). + */ + public ScanCostReport(final long rangeCount, final long shardCount, + final double cost) { + + this.rangeCount = rangeCount; + + this.shardCount = shardCount; + + this.cost = cost; + + } + + /** + * Human readable representation. + */ + public String toString() { + return super.toString() + // + "{rangeCount=" + rangeCount + // + ",shardCount=" + shardCount + // + ",cost=" + cost + // + "}"; + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/ScanCostReport.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/SubqueryCostReport.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/SubqueryCostReport.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/SubqueryCostReport.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -0,0 +1,74 @@ +package com.bigdata.bop.cost; + +import java.io.Serializable; + +/** + * Subquery cost report. + */ +public class SubqueryCostReport implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** The #of graphs against which subqueries will be issued. */ + public final int ngraphs; + + /** The #of samples to be taken. */ + public final int limit; + + /** The #of samples taken. */ + public final int nsamples; + + /** + * An estimated range count based on the samples and adjusted for the + * #of graphs. + */ + public final long rangeCount; + + /** + * An estimated cost (latency in milliseconds) based on the samples and + * adjusted for the #of graphs. + */ + public final double subqueryCost; + + /** + * + * @param ngraphs + * The #of graphs against which subqueries will be issued. + * @param limit + * The #of samples to be taken. + * @param nsamples + * The #of samples taken. + * @param rangeCount + * An estimated range count based on the samples and adjusted + * for the #of graphs. + * @param subqueryCost + * An estimated cost (latency in milliseconds) based on the + * samples and adjusted for the #of graphs. + */ + public SubqueryCostReport(final int ngraphs, final int limit, + final int nsamples, final long rangeCount, + final double subqueryCost) { + this.ngraphs = ngraphs; + this.limit = limit; + this.nsamples = nsamples; + this.rangeCount = rangeCount; + this.subqueryCost = subqueryCost; + } + + /** + * Human readable representation. + */ + public String toString() { + return super.toString() + // + "{ngraphs=" + ngraphs + // + ",limit=" + limit + // + ",nsamples=" + nsamples + // + ",rangeCount=" + rangeCount + // + ",subqueryCost=" + subqueryCost + // + "}"; + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/cost/SubqueryCostReport.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-05 02:33:11 UTC (rev 3729) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2010-10-05 19:57:25 UTC (rev 3730) @@ -46,8 +46,8 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.PipelineOp; import com.bigdata.bop.ap.Predicate; -import com.bigdata.bop.cost.BTreeCostModel; import com.bigdata.bop.cost.DiskCostModel; +import com.bigdata.bop.cost.ScanCostReport; import com.bigdata.btree.AbstractBTree; import com.bigdata.btree.BTree; import com.bigdata.btree.IBTreeStatistics; @@ -443,6 +443,10 @@ // pool). log.warn("Dropping chunk: queryId=" + queryId); continue; + } catch (Throwable ex) { + // halt that query. + q.halt(ex); + continue; } } catch (InterruptedException e) { ... [truncated message content] |
From: <mrp...@us...> - 2010-10-05 02:33:17
|
Revision: 3729 http://bigdata.svn.sourceforge.net/bigdata/?rev=3729&view=rev Author: mrpersonick Date: 2010-10-05 02:33:11 +0000 (Tue, 05 Oct 2010) Log Message: ----------- incremental progress on change sets: simple add/remove Modified Paths: -------------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java Added Paths: ----------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -48,6 +48,9 @@ import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.model.StatementEnum; +import com.bigdata.rdf.sail.changesets.ChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.store.AbstractTripleStore; @@ -252,6 +255,16 @@ private boolean readOnly = false; + public void setChangeLog(final IChangeLog changeLog) { + + this.changeLog = changeLog; + + } + + protected IChangeLog changeLog; + + + /** * Create a buffer that converts Sesame {@link Value} objects to {@link SPO}s * and writes on the <i>database</i> when it is {@link #flush()}ed. This @@ -297,7 +310,7 @@ */ public StatementBuffer(final TempTripleStore statementStore, final AbstractTripleStore database, final int capacity) { - + if (database == null) throw new IllegalArgumentException(); @@ -362,7 +375,7 @@ * @todo this implementation always returns ZERO (0). */ public long flush() { - + log.info(""); /* @@ -874,6 +887,13 @@ if (tmp[i].isModified()) { stmts[i].setModified(true); + + if (changeLog != null) { + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.ADDED)); + + } } Added: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java (rev 0) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -0,0 +1,11 @@ +package com.bigdata.rdf.spo; + +public enum SPOIndexMutation { + + ADDED, + + REMOVED, + + TYPE_CHANGE + +} Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -129,8 +129,10 @@ import com.bigdata.rdf.rio.StatementBuffer; import com.bigdata.rdf.rules.BackchainAccessPath; import com.bigdata.rdf.rules.InferenceEngine; +import com.bigdata.rdf.sail.changesets.ChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.sail.changesets.IChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.InferredSPOFilter; @@ -1447,6 +1449,8 @@ // FIXME bnodes : must also track the reverse mapping [bnodes2]. assertBuffer.setBNodeMap(bnodes); + + assertBuffer.setChangeLog(changeLog); } @@ -2278,7 +2282,7 @@ } // #of explicit statements removed. - final long n; + long n = 0; if (getTruthMaintenance()) { @@ -2319,7 +2323,42 @@ * buffered). */ - n = database.removeStatements(s, p, o, c); + if (changeLog == null) { + + n = database.removeStatements(s, p, o, c); + + } else { + + final IAccessPath<ISPO> ap = + database.getAccessPath(s, p, o, c); + + final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes2, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while (i < stmts.length && itr2.hasNext()) { + stmts[i++] = itr2.next(); + if (i == stmts.length) { + // process stmts[] + n += removeAndNotify(stmts, i); + i = 0; + } + } + if (i > 0) { + n += removeAndNotify(stmts, i); + } + + } + + } } @@ -2327,7 +2366,70 @@ return (int) Math.min(Integer.MAX_VALUE, n); } + + private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { + + final SPO[] tmp = new SPO[numStmts]; + for (int i = 0; i < tmp.length; i++) { + + final BigdataStatement stmt = stmts[i]; + + /* + * Note: context position is not passed when statement identifiers + * are in use since the statement identifier is assigned based on + * the {s,p,o} triple. + */ + + final SPO spo = new SPO(stmt); + + if (log.isDebugEnabled()) + log.debug("adding: " + stmt.toString() + " (" + spo + ")"); + + if(!spo.isFullyBound()) { + + throw new AssertionError("Not fully bound? : " + spo); + + } + + tmp[i] = spo; + + } + + /* + * Note: When handling statement identifiers, we clone tmp[] to avoid a + * side-effect on its order so that we can unify the assigned statement + * identifiers below. + * + * Note: In order to report back the [ISPO#isModified()] flag, we also + * need to clone tmp[] to avoid a side effect on its order. Therefore we + * now always clone tmp[]. + */ +// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); + final long nwritten = database.removeStatements(tmp.clone(), numStmts); + + // Copy the state of the isModified() flag + { + + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.REMOVED)); + + } + + } + + } + + return nwritten; + + } + public synchronized CloseableIteration<? extends Resource, SailException> getContextIDs() throws SailException { @@ -2420,6 +2522,12 @@ // discard the write set. database.abort(); + if (changeLog != null) { + + changeLog.transactionAborted(); + + } + } /** @@ -2444,6 +2552,12 @@ database.commit(); + if (changeLog != null) { + + changeLog.transactionCommited(); + + } + } // /** @@ -3327,8 +3441,18 @@ * @param log * the change log */ - public void setChangeLog(final IChangeLog log) { + public void setChangeLog(final IChangeLog changeLog) { + + this.changeLog = changeLog; + + if (assertBuffer != null) { + + assertBuffer.setChangeLog(changeLog); + + } } + + private IChangeLog changeLog; } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -1,7 +1,11 @@ package com.bigdata.rdf.sail.changesets; +import java.util.Comparator; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.StatementEnum; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPO; +import com.bigdata.rdf.spo.SPOComparator; public class ChangeRecord implements IChangeRecord { @@ -44,4 +48,55 @@ return stmt; } + + @Override + public boolean equals(Object o) { + + if (o == this) + return true; + + if (o == null || o instanceof IChangeRecord == false) + return false; + + final IChangeRecord rec = (IChangeRecord) o; + + final BigdataStatement stmt2 = rec.getStatement(); + + // statements are equal + if (stmt == stmt2 || + (stmt != null && stmt2 != null && stmt.equals(stmt2))) { + + // actions are equal + return action == rec.getChangeAction(); + + } + + return false; + + } + + public String toString() { + + StringBuilder sb = new StringBuilder(); + + sb.append(action).append(": ").append(stmt); + + return sb.toString(); + + } + + public static final Comparator<IChangeRecord> COMPARATOR = + new Comparator<IChangeRecord>() { + + public int compare(final IChangeRecord r1, final IChangeRecord r2) { + + final ISPO spo1 = new SPO(r1.getStatement()); + final ISPO spo2 = new SPO(r2.getStatement()); + + return SPOComparator.INSTANCE.compare(spo1, spo2); + + } + + }; + } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -106,14 +106,14 @@ */ ChangeAction getChangeAction(); - /** - * If the change action is {@link ChangeAction#TYPE_CHANGE}, this method - * will return the old statement type of the focus statement. The - * new statement type is available on the focus statement itself. - * - * @return - * the old statement type of the focus statement - */ - StatementEnum getOldStatementType(); +// /** +// * If the change action is {@link ChangeAction#TYPE_CHANGE}, this method +// * will return the old statement type of the focus statement. The +// * new statement type is available on the focus statement itself. +// * +// * @return +// * the old statement type of the focus statement +// */ +// StatementEnum getOldStatementType(); } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -274,8 +274,8 @@ return bindingSet; } - protected void compare(final TupleQueryResult result, - final Collection<BindingSet> answer) + protected void compare(final TupleQueryResult actual, + final Collection<BindingSet> expected) throws QueryEvaluationException { try { @@ -285,13 +285,13 @@ int resultCount = 0; int nmatched = 0; - while (result.hasNext()) { - BindingSet bindingSet = result.next(); + while (actual.hasNext()) { + BindingSet bindingSet = actual.next(); resultCount++; boolean match = false; if(log.isInfoEnabled()) log.info(bindingSet); - Iterator<BindingSet> it = answer.iterator(); + Iterator<BindingSet> it = expected.iterator(); while (it.hasNext()) { if (it.next().equals(bindingSet)) { it.remove(); @@ -304,7 +304,7 @@ extraResults.add(bindingSet); } } - missingResults = answer; + missingResults = expected; for (BindingSet bs : extraResults) { if (log.isInfoEnabled()) { @@ -326,7 +326,7 @@ } finally { - result.close(); + actual.close(); } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -26,14 +26,21 @@ package com.bigdata.rdf.sail; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.log4j.Logger; import org.openrdf.model.URI; import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.query.BindingSet; +import com.bigdata.rdf.axioms.NoAxioms; +import com.bigdata.rdf.axioms.OwlAxioms; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.sail.changesets.ChangeRecord; @@ -41,6 +48,8 @@ import com.bigdata.rdf.sail.changesets.IChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.store.BD; +import com.bigdata.rdf.vocab.NoVocabulary; +import com.bigdata.rdf.vocab.RDFSVocabulary; /** * @author <a href="mailto:mrp...@us...">Mike Personick</a> @@ -50,6 +59,44 @@ protected static final Logger log = Logger.getLogger(TestChangeSets.class); + public Properties getTriplesNoInference() { + + Properties props = super.getProperties(); + + // triples with sids + props.setProperty(BigdataSail.Options.QUADS, "false"); + props.setProperty(BigdataSail.Options.STATEMENT_IDENTIFIERS, "true"); + + // no inference + props.setProperty(BigdataSail.Options.TRUTH_MAINTENANCE, "false"); + props.setProperty(BigdataSail.Options.AXIOMS_CLASS, NoAxioms.class.getName()); + props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, NoVocabulary.class.getName()); + props.setProperty(BigdataSail.Options.JUSTIFY, "false"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + + return props; + + } + + public Properties getTriplesWithInference() { + + Properties props = super.getProperties(); + + // triples with sids + props.setProperty(BigdataSail.Options.QUADS, "false"); + props.setProperty(BigdataSail.Options.STATEMENT_IDENTIFIERS, "true"); + + // no inference + props.setProperty(BigdataSail.Options.TRUTH_MAINTENANCE, "true"); + props.setProperty(BigdataSail.Options.AXIOMS_CLASS, OwlAxioms.class.getName()); + props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, RDFSVocabulary.class.getName()); + props.setProperty(BigdataSail.Options.JUSTIFY, "true"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + + return props; + + } + /** * */ @@ -63,9 +110,9 @@ super(arg0); } - public void testChangeSets() throws Exception { + public void testSimpleAdd() throws Exception { - final BigdataSail sail = getSail(); + final BigdataSail sail = getSail(getTriplesNoInference()); sail.initialize(); final BigdataSailRepository repo = new BigdataSailRepository(sail); final BigdataSailRepositoryConnection cxn = @@ -85,6 +132,188 @@ final URI b = vf.createURI(ns+"B"); final URI c = vf.createURI(ns+"C"); + final BigdataStatement[] stmts = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] stmts2 = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + // add the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + { // should see all of the stmts[] added + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + // add the stmts[] again + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + { // shouldn't see any change records + + compare(new LinkedList<IChangeRecord>(), changeLog.getChangeSet()); + + } + + // add the stmts2[] + + for (BigdataStatement stmt : stmts2) { + cxn.add(stmt); + } + + cxn.commit();// + + { // should see all of the stmts2[] added + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts2) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + if (log.isDebugEnabled()) { + log.debug("\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + + public void testSimpleRemove() throws Exception { + + final BigdataSail sail = getSail(getTriplesNoInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + + final BigdataStatement[] stmts = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + // add the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + // remove the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.remove(stmt); + } + + cxn.commit();// + + if (log.isDebugEnabled()) { + log.debug("\ndump store:\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + { // should see all of the stmts[] removed + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + // remove the stmts[] again + + for (BigdataStatement stmt : stmts) { + cxn.remove(stmt); + } + + cxn.commit();// + + { // shouldn't see any change records + + compare(new LinkedList<IChangeRecord>(), changeLog.getChangeSet()); + + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + + public void testTruthMaintenance() throws Exception { + + final BigdataSail sail = getSail(getTriplesWithInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + final BigdataStatement[] explicit = new BigdataStatement[] { vf.createStatement(a, RDFS.SUBCLASSOF, b), vf.createStatement(b, RDFS.SUBCLASSOF, c), @@ -134,11 +363,52 @@ } - private void compare(final Collection<IChangeRecord> expected, + private void compare(final Collection<IChangeRecord> expected, final Collection<IChangeRecord> actual) { - fail(); + final Collection<IChangeRecord> extra = new LinkedList<IChangeRecord>(); + Collection<IChangeRecord> missing = new LinkedList<IChangeRecord>(); + + int resultCount = 0; + int nmatched = 0; + for (IChangeRecord rec : actual) { + resultCount++; + boolean match = false; + if(log.isInfoEnabled()) + log.info(rec); + Iterator<IChangeRecord> it = expected.iterator(); + while (it.hasNext()) { + if (it.next().equals(rec)) { + it.remove(); + match = true; + nmatched++; + break; + } + } + if (match == false) { + extra.add(rec); + } + } + missing = expected; + + for (IChangeRecord rec : extra) { + if (log.isInfoEnabled()) { + log.info("extra result: " + rec); + } + } + for (IChangeRecord rec : missing) { + if (log.isInfoEnabled()) { + log.info("missing result: " + rec); + } + } + + if (!extra.isEmpty() || !missing.isEmpty()) { + fail("matchedResults=" + nmatched + ", extraResults=" + + extra.size() + ", missingResults=" + + missing.size()); + } + } /** @@ -165,12 +435,16 @@ public synchronized void changeEvent(final IChangeRecord record) { + System.err.println(record); + uncommitted.put(record.getStatement(), record); } public synchronized void transactionCommited() { + System.err.println("transaction committed"); + committed.clear(); committed.putAll(uncommitted); @@ -181,6 +455,8 @@ public synchronized void transactionAborted() { + System.err.println("transaction aborted"); + uncommitted.clear(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-10-04 23:12:53
|
Revision: 3728 http://bigdata.svn.sourceforge.net/bigdata/?rev=3728&view=rev Author: btmurphy Date: 2010-10-04 23:12:46 +0000 (Mon, 04 Oct 2010) Log Message: ----------- [branch dev-btm]: hand-merged changeset 3542 from trunk --> dev-btm Modified Paths: -------------- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/BasicRioLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/IRioLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/PresortRioLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/util/Splitter.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/AbstractRIOTestCase.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestAsynchronousStatementBufferFactory.java branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -272,6 +272,17 @@ // /** {@value #DEFAULT_MAX_TRIES} */ // int DEFAULT_MAX_TRIES = 3; + /** + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. + */ + String DEFAULT_GRAPH = "defaultGraph" ; + + /** + * TODO Should we always enforce a real value? i.e. provide a real default + * or abort the load. + */ + String DEFAULT_DEFAULT_GRAPH = null ; } /** @@ -400,6 +411,12 @@ private transient RDFFormat rdfFormat; /** + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. + */ + public final String defaultGraph ; + + /** * Force the load of the NxParser integration class and its registration * of the NQuadsParser#nquads RDFFormat. * @@ -494,6 +511,8 @@ sb.append(", " + ConfigurationOptions.RDF_FORMAT + "=" + rdfFormat); + sb.append(", " + ConfigurationOptions.DEFAULT_GRAPH + "=" + defaultGraph) ; + sb.append(", " + ConfigurationOptions.FORCE_OVERFLOW_BEFORE_CLOSURE + "=" + forceOverflowBeforeClosure); @@ -624,6 +643,10 @@ } + defaultGraph = (String) config.getEntry(component, + ConfigurationOptions.DEFAULT_GRAPH, String.class, + ConfigurationOptions.DEFAULT_DEFAULT_GRAPH); + rejectedExecutionDelay = (Long) config.getEntry( component, ConfigurationOptions.REJECTED_EXECUTION_DELAY, Long.TYPE, @@ -1005,6 +1028,7 @@ jobState.ontology,//file jobState.ontology.getPath(),//baseURI jobState.getRDFFormat(),// + jobState.defaultGraph, jobState.ontologyFileFilter // ); Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -213,7 +213,8 @@ jobState.producerChunkSize,// jobState.valuesInitialCapacity,// jobState.bnodesInitialCapacity,// - jobState.getRDFFormat(), // + jobState.getRDFFormat(), // + jobState.defaultGraph, parserOptions,// false, // deleteAfter is handled by the master! jobState.parserPoolSize, // Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/load/SingleResourceReaderTask.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -23,7 +23,6 @@ * load and data verify is just the behavior of the {@link IStatementBuffer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class SingleResourceReaderTask implements Runnable { @@ -186,7 +185,7 @@ // run the parser. // @todo reuse the same underlying parser instance? - loader.loadRdf(reader, baseURL, rdfFormat, parserOptions); + loader.loadRdf(reader, baseURL, rdfFormat, null, parserOptions); success = true; Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/AsynchronousStatementBufferFactory.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -177,7 +177,6 @@ * Note: This DOES NOT support SIDS. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * * FIXME Modify to support SIDs. We basically need to loop in the * {@link #workflowLatch_bufferTerm2Id} workflow state until all SIDs have been @@ -305,7 +304,6 @@ * write API for BTree and friends). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * * @param <S> * The generic type of the statement objects. @@ -356,8 +354,14 @@ * The default {@link RDFFormat}. */ private final RDFFormat defaultFormat; - + /** + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. + */ + private final String defaultGraph; + + /** * Options for the {@link RDFParser}. */ private final RDFParserOptions parserOptions; @@ -1363,7 +1367,6 @@ * load and data verify is just the behavior of the {@link IStatementBuffer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected class ParserTask implements Callable<Void> { @@ -1423,7 +1426,7 @@ try { // run the parser. new PresortRioLoader(buffer).loadRdf(reader, baseURL, - rdfFormat, parserOptions); + rdfFormat, defaultGraph, parserOptions); } finally { reader.close(); } @@ -1490,6 +1493,9 @@ * {@link BNode}s parsed from a single document. * @param defaultFormat * The default {@link RDFFormat} which will be assumed. + * @param defaultGraph + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. * @param parserOptions * Options for the {@link RDFParser}. * @param deleteAfter @@ -1529,6 +1535,7 @@ final int valuesInitialCapacity,// final int bnodesInitialCapacity, // final RDFFormat defaultFormat,// + final String defaultGraph,// final RDFParserOptions parserOptions,// final boolean deleteAfter,// final int parserPoolSize,// @@ -1566,6 +1573,8 @@ this.defaultFormat = defaultFormat; + this.defaultGraph = defaultGraph; + this.parserOptions = parserOptions; this.deleteAfter = deleteAfter; @@ -2250,7 +2259,6 @@ * Task deletes a resource from the local file system. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected class DeleteTask implements Runnable { @@ -2712,7 +2720,6 @@ * the array is given by {@link Split#fromIndex}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ static private class Term2IdWriteProcAsyncResultHandler implements @@ -2859,7 +2866,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ * * @todo something similar for the SIDs */ @@ -2990,7 +2996,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ */ static class AsyncId2TermIndexWriteTask implements Callable<Void> { @@ -3140,7 +3145,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ */ static class AsyncSPOIndexWriteTask implements Callable<Void> { @@ -3277,7 +3281,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ * @param <S> * @param <F> */ @@ -4083,7 +4086,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ */ private class BufferOtherWritesTask implements Callable<Void> { @@ -4153,7 +4155,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ */ private class ParserThreadPoolExecutor extends ThreadPoolExecutor { Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/BasicRioLoader.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/BasicRioLoader.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/BasicRioLoader.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -41,7 +41,6 @@ * Parses data but does not load it into the indices. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class BasicRioLoader implements IRioLoader { @@ -74,6 +73,8 @@ private final ValueFactory valueFactory; + protected String defaultGraph; + public BasicRioLoader(final ValueFactory valueFactory) { if (valueFactory == null) @@ -153,18 +154,20 @@ } final public void loadRdf(final InputStream is, final String baseURI, - final RDFFormat rdfFormat, final RDFParserOptions options) + final RDFFormat rdfFormat, final String defaultGraph, + final RDFParserOptions options) throws Exception { - loadRdf2(is, baseURI, rdfFormat, options); + loadRdf2(is, baseURI, rdfFormat, defaultGraph, options); } final public void loadRdf(final Reader reader, final String baseURI, - final RDFFormat rdfFormat, final RDFParserOptions options) + final RDFFormat rdfFormat, final String defaultGraph, + final RDFParserOptions options) throws Exception { - loadRdf2(reader, baseURI, rdfFormat, options); + loadRdf2(reader, baseURI, rdfFormat, defaultGraph, options); } @@ -180,7 +183,7 @@ * @throws Exception */ protected void loadRdf2(final Object source, final String baseURI, - final RDFFormat rdfFormat, final RDFParserOptions options) + final RDFFormat rdfFormat, final String defaultGraph, final RDFParserOptions options) throws Exception { if (source == null) @@ -198,6 +201,8 @@ if (log.isInfoEnabled()) log.info("format=" + rdfFormat + ", options=" + options); + this.defaultGraph = defaultGraph ; + final RDFParser parser = getParser(rdfFormat); // apply options to the parser @@ -212,7 +217,7 @@ // Note: reset so that rates are correct for each source loaded. stmtsAdded = 0; - + try { before(); Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/IRioLoader.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/IRioLoader.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/IRioLoader.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -37,7 +37,6 @@ * Interface for parsing RDF data using the Sesame RIO parser. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public interface IRioLoader { @@ -72,12 +71,14 @@ * The base URL for those data. * @param rdfFormat * The interchange format. + * @param defaultGraph + * The default graph. * @param options * Options to be applied to the {@link RDFParser}. * @throws Exception */ public void loadRdf(Reader reader, String baseURL, RDFFormat rdfFormat, - RDFParserOptions options) throws Exception; + String defaultGraph, RDFParserOptions options) throws Exception; /** * Parse RDF data. @@ -88,11 +89,13 @@ * The base URL for those data. * @param rdfFormat * The interchange format. + * @param defaultGraph + * The default graph. * @param options * Options to be applied to the {@link RDFParser}. * @throws Exception */ public void loadRdf(InputStream is, String baseURI, RDFFormat rdfFormat, - RDFParserOptions options) throws Exception; + String defaultGraph, RDFParserOptions options) throws Exception; } Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/PresortRioLoader.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/PresortRioLoader.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/rio/PresortRioLoader.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -23,17 +23,19 @@ */ package com.bigdata.rdf.rio; +import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.Value; import org.openrdf.rio.RDFHandler; import org.openrdf.rio.RDFHandlerException; +import com.bigdata.rdf.model.BigdataURI; + /** * Statement handler for the RIO RDF Parser that writes on a * {@link StatementBuffer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class PresortRioLoader extends BasicRioLoader implements RDFHandler { @@ -45,6 +47,12 @@ final protected IStatementBuffer<?> buffer; /** + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. + */ + private BigdataURI defaultGraphURI = null ; + + /** * Sets up parser to load RDF. * * @param buffer @@ -58,7 +66,7 @@ this.buffer = buffer; } - + /** * bulk insert the buffered data into the store. */ @@ -87,8 +95,11 @@ public RDFHandler newRDFHandler() { + defaultGraphURI = null != defaultGraph && 4 == buffer.getDatabase ().getSPOKeyArity () + ? buffer.getDatabase ().getValueFactory ().createURI ( defaultGraph ) + : null + ; return this; - } public void handleStatement( final Statement stmt ) { @@ -98,9 +109,13 @@ log.debug(stmt); } - + + Resource graph = stmt.getContext() ; + if ( null == graph + && null != defaultGraphURI ) // only true when we know we are loading a quad store + graph = defaultGraphURI ; // buffer the write (handles overflow). - buffer.add( stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext() ); + buffer.add( stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), graph ); stmtsAdded++; Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/store/DataLoader.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -638,7 +638,7 @@ final LoadStats totals = new LoadStats(); - loadData3(totals, reader, baseURL, rdfFormat, true/*endOfBatch*/); + loadData3(totals, reader, baseURL, rdfFormat, null, true/*endOfBatch*/); return totals; @@ -666,7 +666,7 @@ final LoadStats totals = new LoadStats(); - loadData3(totals, is, baseURL, rdfFormat, true/* endOfBatch */); + loadData3(totals, is, baseURL, rdfFormat, null, true/* endOfBatch */); return totals; @@ -702,7 +702,7 @@ final LoadStats totals = new LoadStats(); - loadData3(totals, is, baseURL, rdfFormat, true/*endOfBatch*/); + loadData3(totals, is, baseURL, rdfFormat, null, true/*endOfBatch*/); return totals; @@ -773,7 +773,7 @@ final File file = new File(resource); if(file.exists()) { loadFiles(totals, 0/* depth */, file.toURI().toURL(), - baseURL, rdfFormat, filter, endOfBatch); + baseURL, rdfFormat, null, filter, endOfBatch); return; } // not on the file system, try the classpath next @@ -807,7 +807,7 @@ try { - loadData3(totals, reader, baseURL, rdfFormat, endOfBatch); + loadData3(totals, reader, baseURL, rdfFormat, null, endOfBatch); } catch (Exception ex) { @@ -833,6 +833,9 @@ * The format of the file (optional, when not specified the * format is deduced for each file in turn using the * {@link RDFFormat} static methods). + * @param defaultGraph + * The value that will be used for the graph/context co-ordinate when + * loading data represented in a triple format into a quad store. * @param filter * A filter selecting the file names that will be loaded * (optional). When specified, the filter MUST accept directories @@ -843,7 +846,8 @@ * @throws IOException */ public LoadStats loadFiles(final URL url, final String baseURI, - final RDFFormat rdfFormat, final FilenameFilter filter) + final RDFFormat rdfFormat, final String defaultGraph, + final FilenameFilter filter) throws IOException { if (url == null) @@ -851,8 +855,8 @@ final LoadStats totals = new LoadStats(); - loadFiles(totals, 0/* depth */, url, baseURI, - rdfFormat, filter, true/* endOfBatch */); + loadFiles(totals, 0/* depth */, url, baseURI, rdfFormat, defaultGraph, filter, true/* endOfBatch */ + ); return totals; @@ -860,7 +864,8 @@ protected void loadFiles(final LoadStats totals, final int depth, final URL url, final String baseURI, final RDFFormat rdfFormat, - final FilenameFilter filter, final boolean endOfBatch) + final String defaultGraph, final FilenameFilter filter, + final boolean endOfBatch) throws IOException { // Legacy behavior - allow local files and directories for now, @@ -886,7 +891,7 @@ final File f = files[i]; loadFiles(totals, depth + 1, f.toURI().toURL(), baseURI, - rdfFormat, filter, + rdfFormat, defaultGraph, filter, (depth == 0 && i < files.length ? false : endOfBatch)); } @@ -930,7 +935,7 @@ final String s = baseURI != null ? baseURI : url.toURI() .toString(); - loadData3(totals, reader, s, rdfFormat, endOfBatch); + loadData3(totals, reader, s, rdfFormat, defaultGraph, endOfBatch); return; @@ -962,7 +967,7 @@ */ protected void loadData3(final LoadStats totals, final Object source, final String baseURL, final RDFFormat rdfFormat, - final boolean endOfBatch) throws IOException { + final String defaultGraph, final boolean endOfBatch) throws IOException { final long begin = System.currentTimeMillis(); @@ -985,11 +990,10 @@ } // Setup the loader. - final PresortRioLoader loader = new PresortRioLoader(buffer); + final PresortRioLoader loader = new PresortRioLoader( buffer ); // @todo review: disable auto-flush - caller will handle flush of the buffer. // loader.setFlush(false); - // add listener to log progress. loader.addRioLoaderListener( new RioLoaderListener() { @@ -1013,12 +1017,12 @@ if(source instanceof Reader) { - loader.loadRdf((Reader) source, baseURL, rdfFormat, parserOptions); + loader.loadRdf((Reader) source, baseURL, rdfFormat, defaultGraph, parserOptions); } else if (source instanceof InputStream) { loader.loadRdf((InputStream) source, baseURL, rdfFormat, - parserOptions); + defaultGraph, parserOptions); } else throw new AssertionError(); @@ -1368,7 +1372,7 @@ dataLoader.loadFiles(totals, 0/* depth */, fileOrDir.toURI().toURL(), baseURI, - rdfFormat, filter, true/* endOfBatch */ + rdfFormat, null, filter, true/* endOfBatch */ ); } Modified: branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/util/Splitter.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/util/Splitter.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/java/com/bigdata/rdf/util/Splitter.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -99,7 +99,6 @@ * </pre> * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class Splitter { @@ -606,7 +605,6 @@ * Tasks parses an RDF document, writing a new file every N statements. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected class ParserTask implements Callable<Void> { @@ -714,7 +712,7 @@ try { // run the parser. new MyLoader(buffer).loadRdf(reader, baseURL, - defaultRDFFormat, s.parserOptions); + defaultRDFFormat, null, s.parserOptions); } finally { reader.close(); } @@ -737,7 +735,6 @@ * {@link StatementBuffer}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ private static class MyLoader extends BasicRioLoader implements RDFHandler @@ -843,7 +840,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ * @param <S> */ private class MyStatementBuffer implements Modified: branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/AbstractRIOTestCase.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/AbstractRIOTestCase.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/AbstractRIOTestCase.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -401,7 +401,7 @@ }); - loader.loadRdf((Reader) reader, baseURI, rdfFormat, options); + loader.loadRdf((Reader) reader, baseURI, rdfFormat, null, options); if (log.isInfoEnabled()) log.info("Done: " + resource); @@ -681,7 +681,7 @@ loader.loadRdf(new BufferedReader(new InputStreamReader( new FileInputStream(resource))), baseURI, rdfFormat, - options); + null, options); if(log.isInfoEnabled()) log.info("End of reparse: nerrors=" + nerrs + ", file=" Modified: branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/EDSAsyncLoader.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -52,7 +52,6 @@ * </pre> * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class EDSAsyncLoader { @@ -161,6 +160,7 @@ valuesInitialCapacity,// bnodesInitialCapacity,// RDFFormat.RDFXML, // defaultFormat + null, // defaultGraph parserOptions, // parserOptions false, // deleteAfter poolSize, // parserPoolSize, Modified: branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestAsynchronousStatementBufferFactory.java =================================================================== --- branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestAsynchronousStatementBufferFactory.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-rdf/src/test/com/bigdata/rdf/rio/TestAsynchronousStatementBufferFactory.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -400,6 +400,7 @@ valuesInitialCapacity,// bnodesInitialCapacity,// RDFFormat.RDFXML, // defaultFormat + null, // defaultGraph parserOptions, // false, // deleteAfter parallel?5:1, // parserPoolSize, Modified: branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java =================================================================== --- branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java 2010-10-04 20:53:59 UTC (rev 3727) +++ branches/dev-btm/bigdata-sails/src/test/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java 2010-10-04 23:12:46 UTC (rev 3728) @@ -1204,7 +1204,7 @@ try { dataLoader.loadFiles(dataDir.toURI().toURL(), null/* baseURI */, - null/* rdfFormat */, filter); + null/* rdfFormat */, null, /* defaultGraph */filter); } catch (IOException ex) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-10-04 20:54:05
|
Revision: 3727 http://bigdata.svn.sourceforge.net/bigdata/?rev=3727&view=rev Author: btmurphy Date: 2010-10-04 20:53:59 +0000 (Mon, 04 Oct 2010) Log Message: ----------- [branch dev-btm]: hand-merged changeset 3528 from trunk --> dev-btm Modified Paths: -------------- branches/dev-btm/src/resources/config/bigdataCluster.config branches/dev-btm/src/resources/config/bigdataCluster16.config branches/dev-btm/src/resources/config/bigdataStandalone.config Modified: branches/dev-btm/src/resources/config/bigdataCluster.config =================================================================== --- branches/dev-btm/src/resources/config/bigdataCluster.config 2010-10-04 20:44:17 UTC (rev 3726) +++ branches/dev-btm/src/resources/config/bigdataCluster.config 2010-10-04 20:53:59 UTC (rev 3727) @@ -768,10 +768,11 @@ * have for your applications! */ "-Xmx1600m",// was 800 - /* Optionally, grab all/most of the max heap at once. This makes sense for - * DS but is less necessary for other bigdata services. + /* Pre-allocation of the DS heap is no longer recommended. + * + * See https://sourceforge.net/apps/trac/bigdata/ticket/157 + "-Xms800m", */ - "-Xms800m", // 1/2 of the max heap is a good value. /* * This option will keep the JVM "alive" even when it is memory starved * but perform of a memory starved JVM is terrible. Modified: branches/dev-btm/src/resources/config/bigdataCluster16.config =================================================================== --- branches/dev-btm/src/resources/config/bigdataCluster16.config 2010-10-04 20:44:17 UTC (rev 3726) +++ branches/dev-btm/src/resources/config/bigdataCluster16.config 2010-10-04 20:53:59 UTC (rev 3727) @@ -813,12 +813,11 @@ * http://blogs.msdn.com/ntdebugging/archive/2009/02/06/microsoft-windows-dynamic-cache-service.aspx */ "-Xmx9G", // Note: out of 32 available! - /* Optionally, grab all/most of the max heap at once. This makes sense for - * DS, but is less necessary for other bigdata services. If the machine is - * dedicated to the DataService then use the maximum heap. Otherwise 1/2 of - * the maximum heap is a good value. - */ + /* Pre-allocation of the DS heap is no longer recommended. + * + * See https://sourceforge.net/apps/trac/bigdata/ticket/157 "-Xms9G", + */ /* * FIXME This might not be required, so that should be tested. * However, you don't want the JVM to just die if it is being Modified: branches/dev-btm/src/resources/config/bigdataStandalone.config =================================================================== --- branches/dev-btm/src/resources/config/bigdataStandalone.config 2010-10-04 20:44:17 UTC (rev 3726) +++ branches/dev-btm/src/resources/config/bigdataStandalone.config 2010-10-04 20:53:59 UTC (rev 3727) @@ -781,10 +781,11 @@ * have for your applications! */ "-Xmx4g",// was 800 - /* Optionally, grab all/most of the max heap at once. This makes sense for - * DS but is less necessary for other bigdata services. + /* Pre-allocation of the DS heap is no longer recommended. + * + * See https://sourceforge.net/apps/trac/bigdata/ticket/157 + "-Xms2G", */ - "-Xms2G", // 1/2 of the max heap is a good value. /* * This option will keep the JVM "alive" even when it is memory starved * but perform of a memory starved JVM is terrible. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-10-04 20:44:23
|
Revision: 3726 http://bigdata.svn.sourceforge.net/bigdata/?rev=3726&view=rev Author: btmurphy Date: 2010-10-04 20:44:17 +0000 (Mon, 04 Oct 2010) Log Message: ----------- [branch dev-btm]: hand-merged changeset 3516 from trunk --> dev-btm Modified Paths: -------------- branches/dev-btm/src/resources/config/bigdataCluster16.config Modified: branches/dev-btm/src/resources/config/bigdataCluster16.config =================================================================== --- branches/dev-btm/src/resources/config/bigdataCluster16.config 2010-10-04 20:36:31 UTC (rev 3725) +++ branches/dev-btm/src/resources/config/bigdataCluster16.config 2010-10-04 20:44:17 UTC (rev 3726) @@ -1298,11 +1298,11 @@ static private namespace = "U"+univNum+""; // minimum #of data services to run. - static private minDataServices = bigdata.dataServiceCount; +// static private minDataServices = bigdata.dataServiceCount; // unused // How long the master will wait to discover the minimum #of data // services that you specified (ms). - static private awaitDataServicesTimeout = 8000; +// static private awaitDataServicesTimeout = 8000; // unused. /* Multiplier for the scatter effect. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <btm...@us...> - 2010-10-04 20:36:37
|
Revision: 3725 http://bigdata.svn.sourceforge.net/bigdata/?rev=3725&view=rev Author: btmurphy Date: 2010-10-04 20:36:31 +0000 (Mon, 04 Oct 2010) Log Message: ----------- [branch dev-btm]: hand-merged changeset 3503 from trunk --> dev-btm Modified Paths: -------------- branches/dev-btm/bigdata-sails/src/samples/com/bigdata/samples/fastload.properties Modified: branches/dev-btm/bigdata-sails/src/samples/com/bigdata/samples/fastload.properties =================================================================== --- branches/dev-btm/bigdata-sails/src/samples/com/bigdata/samples/fastload.properties 2010-10-04 20:30:13 UTC (rev 3724) +++ branches/dev-btm/bigdata-sails/src/samples/com/bigdata/samples/fastload.properties 2010-10-04 20:36:31 UTC (rev 3725) @@ -1,7 +1,8 @@ -# Be very careful when you use this configuration! This turns off incremental -# inference for load and retract, so you must explicitly force these operations, -# which requires punching through the SAIL layer. Of course, if you are not -# using inference then this is just the ticket and quite fast. +# This configuration turns off incremental inference for load and retract, so +# you must explicitly force these operations if you want to compute the closure +# of the knowledge base. Forcing the closure requires punching through the SAIL +# layer. Of course, if you are not using inference then this configuration is +# just the ticket and is quite fast. # set the initial and maximum extent of the journal com.bigdata.journal.AbstractJournal.initialExtent=209715200 This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |