From: <tho...@us...> - 2013-09-04 20:18:38
|
Revision: 7389 http://bigdata.svn.sourceforge.net/bigdata/?rev=7389&view=rev Author: thompsonbry Date: 2013-09-04 20:18:31 +0000 (Wed, 04 Sep 2013) Log Message: ----------- I've added a simple InferenceChangeLogReporter. This class may be used to obtain the actual inferences computed during a transaction. I modified the change log listener test suite to also test this this new listener. The present implementation uses a LinkedHashMap to store the ISPOs for the inferences. This should be scalable up to millions of inferences. If very large inferences will be drawn, then we could substitute an HTree index for the LinkedHashMap. The existing NativeDistinctFilter already does exactly this and simply replace the hard coded use of the LinkedHashSet for improved scaling. In this case, the native memory associated with the HTree needs to be released, e.g., through an ICloseable protocol on the change listener. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InferenceChangeLogReporter.java Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InferenceChangeLogReporter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InferenceChangeLogReporter.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InferenceChangeLogReporter.java 2013-09-04 20:18:31 UTC (rev 7389) @@ -0,0 +1,145 @@ +package com.bigdata.rdf.changesets; + +import java.beans.Statement; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.openrdf.model.Value; + +import com.bigdata.bop.rdf.filter.NativeDistinctFilter; +import com.bigdata.htree.HTree; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIterator; +import com.bigdata.rdf.store.BigdataStatementIteratorImpl; +import com.bigdata.striterator.ChunkedWrappedIterator; +import com.bigdata.striterator.IChunkedOrderedIterator; +import com.bigdata.striterator.ICloseable; + +/** + * {@link IChangeLog} implementation reports inferences as RDF {@link Statement} + * s. You install this change listener before writing on the sail connection. + * After the commit, you use {@link #addedIterator()} ( + * {@link #removedIterator()}) to visit the inferences that were added to + * (removed from) the KB by the transaction. If the transaction is aborted, + * simply discard the {@link InferenceChangeLogReporter} object. Always use a + * new instance of this object for each transaction. + * + * TODO The present implementation uses a LinkedHashMap to store the ISPOs for + * the inferences. This should be scalable up to millions of inferences, and + * maybe the low 10s of millions. If very large sets of inferences will be + * drawn, then we could substitute an {@link HTree} index for the + * {@link LinkedHashSet}. The existing {@link NativeDistinctFilter} class + * automatically converts from a JVM hash collection to an {@link HTree} and and + * could be used trivially as a replacement for the {@link LinkedHashSet} in + * this class. In this case, the native memory associated with the HTree needs + * to be released, e.g., through an {@link ICloseable} protocol on the change + * listener. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class InferenceChangeLogReporter implements IChangeLog { + + /** + * The KB. + */ + private final AbstractTripleStore kb; + + /** New inferences. */ + private final Set<ISPO> added = new LinkedHashSet<ISPO>(); + + /** Removed inferences. */ + private final Set<ISPO> removed = new LinkedHashSet<ISPO>(); + + /** + * + * @param kb + * The KB (used to resolve {@link IV}s to {@link Value}s). + */ + public InferenceChangeLogReporter(final AbstractTripleStore kb) { + this.kb = kb; + } + + /** + * Clear the internal state. This may be used to reset the listener if + * multiple commits are used for the same connection. + * <p> + * Note: It is faster to get a new {@link InferenceChangeLogReporter} than + * to clear the internal maps, but you can not replace an {@link IChangeLog} + * listener once established on a connection. + */ + public void clear() { + added.clear(); + removed.clear(); + } + + @Override + public void changeEvent(IChangeRecord record) { + final ISPO spo = record.getStatement(); + if (!spo.isInferred()) + return; + switch (record.getChangeAction()) { + case INSERTED: + added.add(spo); + break; + case REMOVED: + removed.add(spo); + break; + case UPDATED: + // ignore. statement already exists. + break; + default: + throw new AssertionError(); + } + } + + @Override + public void transactionBegin() { + } + + @Override + public void transactionPrepare() { + } + + @Override + public void transactionCommited(long commitTime) { + + } + + @Override + public void transactionAborted() { + + } + + /** + * Return iterator visiting the inferences that were added to the KB. + */ + public BigdataStatementIterator addedIterator() { + + // Wrap as chunked iterator. + final IChunkedOrderedIterator<ISPO> src = new ChunkedWrappedIterator<ISPO>( + added.iterator()); + + // Asynchronous conversion of ISPOs to Statements. + return new BigdataStatementIteratorImpl(kb, src).start(kb + .getExecutorService()); + + } + + /** + * Return iterator visiting the inferences that were removed from the KB. + */ + public BigdataStatementIterator removedIterator() { + + // Wrap as chunked iterator. + final IChunkedOrderedIterator<ISPO> src = new ChunkedWrappedIterator<ISPO>( + removed.iterator()); + + // Asynchronous conversion of ISPOs to Statements. + return new BigdataStatementIteratorImpl(kb, src).start(kb + .getExecutorService()); + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2013-09-04 19:56:42 UTC (rev 7388) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2013-09-04 20:18:31 UTC (rev 7389) @@ -44,6 +44,7 @@ import com.bigdata.rdf.changesets.IChangeLog; import com.bigdata.rdf.changesets.IChangeRecord; import com.bigdata.rdf.changesets.InMemChangeLog; +import com.bigdata.rdf.changesets.InferenceChangeLogReporter; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.spo.ModifiedEnum; @@ -549,6 +550,10 @@ final InMemChangeLog changeLog = new InMemChangeLog(); cxn.addChangeLog(changeLog); + + final InferenceChangeLogReporter changeLog2 = new InferenceChangeLogReporter( + sail.getDatabase()); + cxn.addChangeLog(changeLog2); final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); @@ -604,6 +609,8 @@ } compare(expected, changeLog.getLastCommit(sail.getDatabase())); + assertSameIteratorAnyOrder(inferred, changeLog2.addedIterator()); + assertSameIteratorAnyOrder(new BigdataStatement[]{}, changeLog2.removedIterator()); } for (BigdataStatement stmt : upgrades) { @@ -656,6 +663,10 @@ final InMemChangeLog changeLog = new InMemChangeLog(); cxn.addChangeLog(changeLog); + final InferenceChangeLogReporter changeLog2 = new InferenceChangeLogReporter( + sail.getDatabase()); + cxn.addChangeLog(changeLog2); + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); final String ns = BD.NAMESPACE; @@ -714,9 +725,14 @@ } compare(expected, changeLog.getLastCommit(sail.getDatabase())); + assertSameIteratorAnyOrder(inferredAdd, changeLog2.addedIterator()); + assertSameIteratorAnyOrder(new BigdataStatement[]{}, changeLog2.removedIterator()); } + // reset + changeLog2.clear(); + for (BigdataStatement stmt : explicitRemove) { cxn.remove(stmt); } @@ -735,7 +751,9 @@ } compare(expected, changeLog.getLastCommit(sail.getDatabase())); - + assertSameIteratorAnyOrder(new BigdataStatement[]{}, changeLog2.addedIterator()); + assertSameIteratorAnyOrder(inferredRemove, changeLog2.removedIterator()); + } if (log.isDebugEnabled()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |