|
From: <mrp...@us...> - 2013-10-28 13:23:06
|
Revision: 7486
http://bigdata.svn.sourceforge.net/bigdata/?rev=7486&view=rev
Author: mrpersonick
Date: 2013-10-28 13:22:57 +0000 (Mon, 28 Oct 2013)
Log Message:
-----------
fixing some comments
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-25 21:18:46 UTC (rev 7485)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-10-28 13:22:57 UTC (rev 7486)
@@ -326,6 +326,17 @@
final boolean noInput = chunkIn == null || chunkIn.length == 0 ||
(chunkIn.length == 1 && chunkIn[0].isEmpty());
+ /*
+ * We need to keep a collection of parent solutions to join
+ * against the output from the fixed point operation.
+ */
+ final Map<IConstant<?>, List<IBindingSet>> parentSolutionsToJoin =
+ noInput ? null : new LinkedHashMap<IConstant<?>, List<IBindingSet>>();
+
+ /*
+ * The join var is what we use to join the parent solutions to the
+ * output from the fixed point operation.
+ */
final IVariable<?> joinVar = gearing.inVar != null ?
gearing.inVar :
(gearing.outVar != null ? gearing.outVar : null);
@@ -334,29 +345,22 @@
log.debug("join var: " + joinVar);
}
- /*
- * Fix cardinality problem here
- */
- final Map<IConstant<?>, List<IBindingSet>> chunkInBySolutionKey =
- noInput ? null :
- new LinkedHashMap<IConstant<?>, List<IBindingSet>>();
-
if (!noInput) {
for (IBindingSet parentSolutionIn : chunkIn) {
- final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null; //newSolutionKey(gearing, parentSolutionIn);
+ final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null;
if (log.isDebugEnabled()) {
log.debug("adding parent solution for joining: " + parentSolutionIn);
log.debug("join key: " + key);
}
- if (!chunkInBySolutionKey.containsKey(key)) {
- chunkInBySolutionKey.put(key, new ArrayList<IBindingSet>());
+ if (!parentSolutionsToJoin.containsKey(key)) {
+ parentSolutionsToJoin.put(key, new ArrayList<IBindingSet>());
}
- chunkInBySolutionKey.get(key).add(parentSolutionIn);
+ parentSolutionsToJoin.get(key).add(parentSolutionIn);
}
@@ -628,13 +632,21 @@
}
+ /*
+ * Add the necessary zero-length path solutions for the case where
+ * there are variables on both side of the operator.
+ */
if (lowerBound == 0 && (gearing.inVar != null && gearing.outVar != null)) {
- final Map<SolutionKey, IBindingSet> zlps = new LinkedHashMap<SolutionKey, IBindingSet>();
+ final Map<SolutionKey, IBindingSet> zlps =
+ new LinkedHashMap<SolutionKey, IBindingSet>();
for (IBindingSet bs : solutionsOut.values()) {
- // is this right??
+ /*
+ * Do not handle the case where the out var is bound by
+ * the incoming solutions.
+ */
if (bs.isBound(gearing.outVar)) {
continue;
@@ -735,9 +747,10 @@
final IConstant<?> key = joinVar != null ? bs.get(joinVar) : null;
- if (key != null && chunkInBySolutionKey.containsKey(key)) {
+ if (key != null && parentSolutionsToJoin.containsKey(key)) {
- final List<IBindingSet> parentSolutionsIn = chunkInBySolutionKey.get(key);
+ final List<IBindingSet> parentSolutionsIn =
+ parentSolutionsToJoin.get(key);
if (log.isDebugEnabled()) {
log.debug("join key: " + key);
@@ -818,13 +831,13 @@
/*
* Always do the null solutions if there are any.
*/
- if (chunkInBySolutionKey.containsKey(null)) {
+ if (parentSolutionsToJoin.containsKey(null)) {
/*
* Join the null solutions. These solutions represent
* a cross product (no shared variables with the ALP node).
*/
- final List<IBindingSet> nullSolutions = chunkInBySolutionKey.get(null);
+ final List<IBindingSet> nullSolutions = parentSolutionsToJoin.get(null);
if (log.isDebugEnabled()) {
log.debug("null solutions to join: " + nullSolutions);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <tho...@us...> - 2014-07-13 12:23:33
|
Revision: 8540
http://sourceforge.net/p/bigdata/code/8540
Author: thompsonbry
Date: 2014-07-13 12:23:29 +0000 (Sun, 13 Jul 2014)
Log Message:
-----------
Looking back at the thread dump attached to the ticket, I see very little in the way of stack traces through bigdata. The only one in the property path code is this.
{{{
"com.bigdata.journal.Journal.executorService5" - Thread t@43
java.lang.Thread.State: RUNNABLE
at com.bigdata.bop.bindingSet.ListBindingSet.copy(ListBindingSet.java:290)
at com.bigdata.bop.bindingSet.ListBindingSet.<init>(ListBindingSet.java:267)
at com.bigdata.bop.bindingSet.ListBindingSet.clone(ListBindingSet.java:325)
at com.bigdata.bop.bindingSet.ListBindingSet.clone(ListBindingSet.java:43)
at com.bigdata.bop.paths.ArbitraryLengthPathOp$ArbitraryLengthPathTask.processChunk(ArbitraryLengthPathOp.java:511)
at com.bigdata.bop.paths.ArbitraryLengthPathOp$ArbitraryLengthPathTask.call(ArbitraryLengthPathOp.java:270)
at com.bigdata.bop.paths.ArbitraryLengthPathOp$ArbitraryLengthPathTask.call(ArbitraryLengthPathOp.java:196)
at java.util.concurrent.FutureTask.run(FutureTask.java:273)
at com.bigdata.bop.engine.ChunkedRunningQuery$ChunkTask.call(ChunkedRunningQuery.java:1281)
at com.bigdata.bop.engine.ChunkedRunningQuery$ChunkTaskWrapper.run(ChunkedRunningQuery.java:836)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:482)
at java.util.concurrent.FutureTask.run(FutureTask.java:273)
at com.bigdata.concurrent.FutureTaskMon.run(FutureTaskMon.java:63)
at com.bigdata.bop.engine.ChunkedRunningQuery$ChunkFutureTask.run(ChunkedRunningQuery.java:731)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
at java.lang.Thread.run(Thread.java:804)
}}}
I've added tests for interrupts to two locations in the processChunk() code. One corresponds to the point where this stack trace passes through processChunk(). The other corresponds to the point where the initial solutions are flowing into the property path operator. Both check for an interrupt every 10 solutions.
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2014-07-12 01:23:36 UTC (rev 8539)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2014-07-13 12:23:29 UTC (rev 8540)
@@ -52,6 +52,7 @@
import com.bigdata.bop.engine.AbstractRunningQuery;
import com.bigdata.bop.engine.IRunningQuery;
import com.bigdata.bop.engine.QueryEngine;
+import com.bigdata.bop.join.JVMDistinctFilter;
import cutthecrap.utils.striterators.ICloseableIterator;
@@ -83,6 +84,10 @@
* solutions from the subquery with those in the parent context.
*
* @author <a href="mailto:mpe...@us...">Mike Personick</a>
+ *
+ * TODO There should be two version of this operator. One for the JVM
+ * heap and another for the native heap. This will help when large
+ * amounts of data are materialized by the internal collections.
*/
public class ArbitraryLengthPathOp extends PipelineOp {
@@ -187,6 +192,7 @@
}
+ @Override
public FutureTask<Void> eval(final BOpContext<IBindingSet> context) {
return new FutureTask<Void>(new ArbitraryLengthPathTask(this, context));
@@ -251,6 +257,7 @@
}
+ @Override
public Void call() throws Exception {
try {
@@ -346,10 +353,21 @@
}
if (!noInput) {
+
+ long chunksIn = 0L;
+
+ for (IBindingSet parentSolutionIn : chunkIn) {
+
+ /**
+ * @see <a href="http://trac.bigdata.com/ticket/865"
+ * >OutOfMemoryError instead of Timeout for SPARQL
+ * Property Paths </a>
+ */
+ if (chunksIn++ % 10 == 0 && Thread.interrupted()) {
+ throw new InterruptedException();
+ }
- for (IBindingSet parentSolutionIn : chunkIn) {
-
- final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null;
+ final IConstant<?> key = joinVar != null ? parentSolutionIn.get(joinVar) : null;
if (log.isDebugEnabled()) {
log.debug("adding parent solution for joining: " + parentSolutionIn);
@@ -451,13 +469,16 @@
try {
- /*
- * TODO replace with code that does the PipelineJoins manually
- */
+ /*
+ * TODO Replace with code that does the PipelineJoins
+ * manually. Unrolling these iterations can be a major
+ * performance benefit. Another possibility is to use
+ * the GASEngine to expand the paths.
+ */
runningSubquery = queryEngine.eval(subquery,
nextRoundInput.toArray(new IBindingSet[nextRoundInput.size()]));
- long count = 0L;
+ long subqueryChunksOut = 0L; // #of chunks read from subquery
try {
// Declare the child query to the parent.
@@ -476,7 +497,14 @@
for (IBindingSet bs : chunk) {
- count++;
+ /**
+ * @see <a href="http://trac.bigdata.com/ticket/865"
+ * >OutOfMemoryError instead of Timeout for SPARQL
+ * Property Paths </a>
+ */
+ if (subqueryChunksOut++ % 10 == 0 && Thread.interrupted()) {
+ throw new InterruptedException();
+ }
if (log.isDebugEnabled()) {
log.debug("round " + i + " solution: " + bs);
@@ -532,7 +560,7 @@
if (log.isDebugEnabled()) {
log.debug("done with round " + i +
- ", count=" + count +
+ ", count=" + subqueryChunksOut +
", totalBefore=" + sizeBefore +
", totalAfter=" + solutionsOut.size() +
", totalNew=" + (solutionsOut.size() - sizeBefore));
@@ -1116,6 +1144,7 @@
}
+ @Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -1144,7 +1173,11 @@
}
/**
- * Lifted directly from the JVMDistinctBindingSetsOp.
+ * Lifted directly from the {@link JVMDistinctFilter}.
+ *
+ * TODO Refactor to use {@link JVMDistinctFilter} directly iff possible
+ * (e.g., a chain of the AALP operator followed by the DISTINCT
+ * solutions operator)
*/
private final static class SolutionKey {
@@ -1157,10 +1190,12 @@
this.hash = java.util.Arrays.hashCode(vals);
}
+ @Override
public int hashCode() {
return hash;
}
+ @Override
public boolean equals(final Object o) {
if (this == o)
return true;
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|