From: <tho...@us...> - 2014-03-23 17:47:02
|
Revision: 8011 http://sourceforge.net/p/bigdata/code/8011 Author: thompsonbry Date: 2014-03-23 17:46:59 +0000 (Sun, 23 Mar 2014) Log Message: ----------- Added AbstractTripleStore::getStatements(IChunkedOrderedIterator<BigdataTriplePattern> triplePatterns) returning BigdataStatementIterator as partial fix to #866 (parallel streaming resolution of triple patterns to ground statements). This commit includes unit tests of the new feature in TestTripleStore. Hand off to MikeP for the rest of that ticket. Modified Paths: -------------- branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java Added Paths: ----------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java Modified: branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java =================================================================== --- branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -260,7 +260,15 @@ throw new RuntimeException(t); } - assert converted.length == chunk.length; + /** + * Note: This is no longer true. Some conversions can now + * expand or reduce the size of the chunk. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > + * Efficient batch remove of a collection of triple + * patterns </a> + */ +// assert converted.length == chunk.length; // Note: Throws BufferClosedException if closed. buffer.add(converted); Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -165,6 +165,7 @@ import com.bigdata.sparse.GlobalRowStoreUtil; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.ChunkedConvertingIterator; +import com.bigdata.striterator.ChunkedWrappedIterator; import com.bigdata.striterator.DelegateChunkedIterator; import com.bigdata.striterator.EmptyChunkedIterator; import com.bigdata.striterator.IChunkedIterator; @@ -2706,7 +2707,35 @@ return asStatementIterator(getAccessPath(s, p, o, c).iterator()); } + + /** + * Efficient batched, streaming resolution of triple patterns to statements + * spanned by those triple patterns that are present in the data. + * <p> + * Note: If the input contains triple patterns that have a high cardinality + * in the data, then a large number of statements may be returned. + * + * @param triplePatterns + * A collection of triple patterns or fully bound statements. If + * this collection contains triple patterns that have a high + * cardinality in the data, then a large number of statements may + * be returned. + * + * @return An iterator from which the materialized statements spanned by + * those triple patterns may be read. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ + public BigdataStatementIterator getStatements( + final IChunkedOrderedIterator<BigdataTriplePattern> triplePatterns) { + return asStatementIterator(new ChunkedWrappedIterator<ISPO>( + new BigdataTriplePatternMaterializer(this, triplePatterns) + .start(getExecutorService()))); + + } + final public BigdataValue asValue(final Value value) { return getValueFactory().asValue(value); Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -0,0 +1,91 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.store; + +import com.bigdata.rdf.model.BigdataResource; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; + +/** + * A simple class that represents a triple (or quad) pattern. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch remove of + * a collection of triple patterns </a> + */ +public class BigdataTriplePattern { + +// private static final long serialVersionUID = 1L; + + private final BigdataResource s; + private final BigdataURI p; + private final BigdataValue o; + private final BigdataResource c; + + public BigdataTriplePattern(final BigdataResource subject, + final BigdataURI predicate, final BigdataValue object) { + + this(subject, predicate, object, (BigdataResource) null); + + } + + public BigdataTriplePattern(final BigdataResource subject, + final BigdataURI predicate, final BigdataValue object, + final BigdataResource context) { + + this.s = subject; + + this.p = predicate; + + this.o = object; + + this.c = context; + + } + + final public BigdataResource getSubject() { + + return s; + + } + + final public BigdataURI getPredicate() { + + return p; + + } + + final public BigdataValue getObject() { + + return o; + + } + + final public BigdataResource getContext() { + + return c; + + } + +} Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -0,0 +1,289 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.store; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.system.SystemUtil; + +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPOAccessPath; +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.striterator.AbstractChunkedResolverator; +import com.bigdata.striterator.IChunkedOrderedIterator; +import com.bigdata.util.concurrent.LatchedExecutor; + +import cutthecrap.utils.striterators.ICloseableIterator; + +/** + * Efficient batched, streaming resolution of triple patterns to statements + * spanned by those triple patterns that are present in the data. + * <p> + * Note: If the input contains triple patterns that have a high cardinality + * in the data, then a large number of statements may be returned. + * + * @param triplePatterns + * A collection of triple patterns or fully bound statements. If + * this collection contains triple patterns that have a high + * cardinality in the data, then a large number of statements may + * be returned. + * + * @return An iterator from which the materialized statements spanned by + * those triple patterns may be read. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ +public class BigdataTriplePatternMaterializer + extends + AbstractChunkedResolverator<BigdataTriplePattern, ISPO, AbstractTripleStore> + implements ICloseableIterator<ISPO> { +// implements IChunkedOrderedIterator<ISPO> { + + private final int nthreads; + + public BigdataTriplePatternMaterializer(final AbstractTripleStore db, + final IChunkedOrderedIterator<BigdataTriplePattern> src) { + this(db, src, 4/* nthreads */); + } + + public BigdataTriplePatternMaterializer(final AbstractTripleStore db, + final IChunkedOrderedIterator<BigdataTriplePattern> src, + final int nthreads) { + + super(db, src, new BlockingBuffer<ISPO[]>( + db.getChunkOfChunksCapacity(), + db.getChunkCapacity(), + db.getChunkTimeout(), + TimeUnit.MILLISECONDS)); + + if (nthreads < 0) + throw new IllegalArgumentException(); + + // At least 1 thread. At most ncpus*2. + this.nthreads = Math.max( + Math.min(nthreads, SystemUtil.numProcessors() * 2), 1); + + } + + @Override + public BigdataTriplePatternMaterializer start(final ExecutorService service) { + + helperService.set(new LatchedExecutor(service, nthreads)); + + super.start(service); + + return this; + + } + private final AtomicReference<LatchedExecutor> helperService = new AtomicReference<LatchedExecutor>(); + + @Override + protected ISPO[] resolveChunk(final BigdataTriplePattern[] chunk) { + + final LatchedExecutor helperService = this.helperService.get(); + + if (helperService == null) + throw new IllegalStateException(); + + /* + * The output will be at most sizeof(chunk) arrays. Each array will have + * one or more statements. Any triple patterns that have no intersection + * in the data will be dropped and will not put anything into this + * output queue. + */ + final BlockingQueue<ISPO[]> out = new ArrayBlockingQueue<ISPO[]>( + chunk.length); + + final List<FutureTask<Long>> tasks = new LinkedList<FutureTask<Long>>(); + + try { + + final CountDownLatch latch = new CountDownLatch(chunk.length); + + /* + * Create FutureTasks for each subquery. The futures are not + * submitted to the Executor yet. That happens in call(). By + * deferring the evaluation until call() we gain the ability to + * cancel all subqueries if any subquery fails. + */ + for (BigdataTriplePattern stmt : chunk) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] if + * it fails. + */ + final FutureTask<Long> ft = new FutureTask<Long>( + new ResolveTriplePatternTask(stmt, out)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }; + + tasks.add(ft); + + } + + /* + * Run triple pattern resolution with limited parallelism. + */ + for (FutureTask<Long> ft : tasks) { + helperService.execute(ft); + } + + /* + * Wait for all tasks to complete. + */ + latch.await(); + + /* + * Check futures, counting the #of solutions. + */ + long nfound = 0L; + for (FutureTask<Long> ft : tasks) { + nfound += ft.get(); + if (nfound > Integer.MAX_VALUE) + throw new UnsupportedOperationException(); + } + + /* + * Convert into a single ISPO[] chunk. + */ + final ISPO[] dest = new ISPO[(int) nfound]; + int destPos = 0; + ISPO[] src = null; + while ((src = out.poll()) != null) { + System.arraycopy(src/* src */, 0/* srcPos */, dest, destPos, + src.length); + destPos += src.length; + } + + return dest; + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } finally { + + // Cancel any tasks which are still running. + for (FutureTask<Long> ft : tasks) + ft.cancel(true/* mayInterruptIfRunning */); + + } + + } + + /** + * Resolve a triple pattern to the statements that it spans in the data. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class ResolveTriplePatternTask implements Callable<Long> { + + private final BigdataTriplePattern stmt; + private final BlockingQueue<ISPO[]> out; + + public ResolveTriplePatternTask(final BigdataTriplePattern stmt, + final BlockingQueue<ISPO[]> out) { + this.stmt = stmt; + this.out = out; + } + + @Override + public Long call() throws Exception { + /* + * TODO What about closure over the SIDs? + * + * final IChunkedOrderedIterator<ISPO> itr = + * database.computeClosureForStatementIdentifiers( + * database.getAccessPath(s, p, o, c).iterator()); + */ + final SPOAccessPath ap = (SPOAccessPath) state.getAccessPath(stmt.getSubject(), + stmt.getPredicate(), stmt.getObject(), stmt.getContext()); + +// if(ap.isFullyBoundForKey()) { +// /* +// * Optimize when triple pattern is a fully bound statement. +// * In this case, the output is either that statement (with IVs +// * resolved) or the triple pattern is dropped. +// */ +// final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); +// try { +// if (!itr.hasNext()) +// return 0L; +// final ISPO spo = itr.next(); +// out.add(new ISPO[]{spo}); +// return 1L; +// } finally { +// itr.close(); +// } +// } else { + long n = 0L; + final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + try { + while (itr.hasNext()) { + final ISPO[] a = itr.nextChunk(); +// if (true) { +// // verify no null array elements. +// for (int i = 0; i < a.length; i++) { +// if (a[i] == null) +// throw new AssertionError(Arrays.toString(a)); +// } +// } + out.add(a); + n += a.length; + } + return n; + } finally { + itr.close(); + } +// } + + } + + } + +} Modified: branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java =================================================================== --- branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -46,11 +46,14 @@ import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.parser.sparql.FOAF; import com.bigdata.rdf.axioms.NoAxioms; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.lexicon.DumpLexicon; +import com.bigdata.rdf.lexicon.Id2TermWriteProc; import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.Term2IdWriteProc; import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataLiteral; import com.bigdata.rdf.model.BigdataURI; @@ -1453,6 +1456,137 @@ } + /** + * Unit test of the batched parallel resolution of triple patterns. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ + public void test_getStatementsUsingTriplePatterns() { + + final Properties properties = super.getProperties(); + + // override the default axiom model. + properties.setProperty( + com.bigdata.rdf.store.AbstractTripleStore.Options.AXIOMS_CLASS, + NoAxioms.class.getName()); + + final AbstractTripleStore store = getStore(properties); + + try { + + // verify nothing in the store. + assertSameIterator(new Statement[] {}, store.getAccessPath(NULL, + NULL, NULL).iterator()); + + final BigdataValueFactory f = store.getValueFactory(); + + final BigdataURI A = f.createURI("http://www.bigdata.com/A"); + final BigdataURI B = f.createURI("http://www.bigdata.com/B"); + final BigdataURI C = f.createURI("http://www.bigdata.com/C"); + final BigdataURI Person = f.asValue(FOAF.PERSON); + final BigdataURI rdfType = f.asValue(RDF.TYPE); + final BigdataURI foafKnows = f.asValue(FOAF.KNOWS); + + { + + final IStatementBuffer<Statement> buffer = new StatementBuffer<Statement>( + store, 100); + + buffer.add(A, rdfType, Person); + buffer.add(B, rdfType, Person); + buffer.add(C, rdfType, Person); + + buffer.add(A, foafKnows, B); + buffer.add(B, foafKnows, A); + buffer.add(B, foafKnows, C); + buffer.add(C, foafKnows, B); + + buffer.flush(); + + } + + // Empty input. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {}; + assertSameStatements( + new Statement[] {// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Single pattern matching one statement. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, rdfType, null),// + }; + assertSameStatements( + new Statement[] {// + new StatementImpl(A, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Single pattern matching three statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(null, rdfType, Person),// + }; + assertSameStatements( + new Statement[] {// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Two patterns matching various statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, foafKnows, null),// + new BigdataTriplePattern(null, rdfType, Person),// + }; + assertSameIteratorAnyOrder( + new Statement[] {// + new StatementImpl(A, foafKnows, B),// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Three patterns, two of which match various statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, foafKnows, null),// + new BigdataTriplePattern(null, rdfType, Person),// + new BigdataTriplePattern(rdfType, foafKnows, null),// no match + }; + assertSameIteratorAnyOrder( + new Statement[] {// + new StatementImpl(A, foafKnows, B),// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + } finally { + + store.__tearDownUnitTest(); + + } + + } + private String getVeryLargeURI() { final int len = 1024000; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |