From: <mrp...@us...> - 2010-10-05 02:33:17
|
Revision: 3729 http://bigdata.svn.sourceforge.net/bigdata/?rev=3729&view=rev Author: mrpersonick Date: 2010-10-05 02:33:11 +0000 (Tue, 05 Oct 2010) Log Message: ----------- incremental progress on change sets: simple add/remove Modified Paths: -------------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java Added Paths: ----------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/rio/StatementBuffer.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -48,6 +48,9 @@ import com.bigdata.rdf.model.BigdataValue; import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.model.StatementEnum; +import com.bigdata.rdf.sail.changesets.ChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.store.AbstractTripleStore; @@ -252,6 +255,16 @@ private boolean readOnly = false; + public void setChangeLog(final IChangeLog changeLog) { + + this.changeLog = changeLog; + + } + + protected IChangeLog changeLog; + + + /** * Create a buffer that converts Sesame {@link Value} objects to {@link SPO}s * and writes on the <i>database</i> when it is {@link #flush()}ed. This @@ -297,7 +310,7 @@ */ public StatementBuffer(final TempTripleStore statementStore, final AbstractTripleStore database, final int capacity) { - + if (database == null) throw new IllegalArgumentException(); @@ -362,7 +375,7 @@ * @todo this implementation always returns ZERO (0). */ public long flush() { - + log.info(""); /* @@ -874,6 +887,13 @@ if (tmp[i].isModified()) { stmts[i].setModified(true); + + if (changeLog != null) { + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.ADDED)); + + } } Added: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java (rev 0) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOIndexMutation.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -0,0 +1,11 @@ +package com.bigdata.rdf.spo; + +public enum SPOIndexMutation { + + ADDED, + + REMOVED, + + TYPE_CHANGE + +} Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -129,8 +129,10 @@ import com.bigdata.rdf.rio.StatementBuffer; import com.bigdata.rdf.rules.BackchainAccessPath; import com.bigdata.rdf.rules.InferenceEngine; +import com.bigdata.rdf.sail.changesets.ChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.sail.changesets.IChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.InferredSPOFilter; @@ -1447,6 +1449,8 @@ // FIXME bnodes : must also track the reverse mapping [bnodes2]. assertBuffer.setBNodeMap(bnodes); + + assertBuffer.setChangeLog(changeLog); } @@ -2278,7 +2282,7 @@ } // #of explicit statements removed. - final long n; + long n = 0; if (getTruthMaintenance()) { @@ -2319,7 +2323,42 @@ * buffered). */ - n = database.removeStatements(s, p, o, c); + if (changeLog == null) { + + n = database.removeStatements(s, p, o, c); + + } else { + + final IAccessPath<ISPO> ap = + database.getAccessPath(s, p, o, c); + + final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes2, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while (i < stmts.length && itr2.hasNext()) { + stmts[i++] = itr2.next(); + if (i == stmts.length) { + // process stmts[] + n += removeAndNotify(stmts, i); + i = 0; + } + } + if (i > 0) { + n += removeAndNotify(stmts, i); + } + + } + + } } @@ -2327,7 +2366,70 @@ return (int) Math.min(Integer.MAX_VALUE, n); } + + private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { + + final SPO[] tmp = new SPO[numStmts]; + for (int i = 0; i < tmp.length; i++) { + + final BigdataStatement stmt = stmts[i]; + + /* + * Note: context position is not passed when statement identifiers + * are in use since the statement identifier is assigned based on + * the {s,p,o} triple. + */ + + final SPO spo = new SPO(stmt); + + if (log.isDebugEnabled()) + log.debug("adding: " + stmt.toString() + " (" + spo + ")"); + + if(!spo.isFullyBound()) { + + throw new AssertionError("Not fully bound? : " + spo); + + } + + tmp[i] = spo; + + } + + /* + * Note: When handling statement identifiers, we clone tmp[] to avoid a + * side-effect on its order so that we can unify the assigned statement + * identifiers below. + * + * Note: In order to report back the [ISPO#isModified()] flag, we also + * need to clone tmp[] to avoid a side effect on its order. Therefore we + * now always clone tmp[]. + */ +// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); + final long nwritten = database.removeStatements(tmp.clone(), numStmts); + + // Copy the state of the isModified() flag + { + + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.REMOVED)); + + } + + } + + } + + return nwritten; + + } + public synchronized CloseableIteration<? extends Resource, SailException> getContextIDs() throws SailException { @@ -2420,6 +2522,12 @@ // discard the write set. database.abort(); + if (changeLog != null) { + + changeLog.transactionAborted(); + + } + } /** @@ -2444,6 +2552,12 @@ database.commit(); + if (changeLog != null) { + + changeLog.transactionCommited(); + + } + } // /** @@ -3327,8 +3441,18 @@ * @param log * the change log */ - public void setChangeLog(final IChangeLog log) { + public void setChangeLog(final IChangeLog changeLog) { + + this.changeLog = changeLog; + + if (assertBuffer != null) { + + assertBuffer.setChangeLog(changeLog); + + } } + + private IChangeLog changeLog; } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/ChangeRecord.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -1,7 +1,11 @@ package com.bigdata.rdf.sail.changesets; +import java.util.Comparator; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.StatementEnum; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPO; +import com.bigdata.rdf.spo.SPOComparator; public class ChangeRecord implements IChangeRecord { @@ -44,4 +48,55 @@ return stmt; } + + @Override + public boolean equals(Object o) { + + if (o == this) + return true; + + if (o == null || o instanceof IChangeRecord == false) + return false; + + final IChangeRecord rec = (IChangeRecord) o; + + final BigdataStatement stmt2 = rec.getStatement(); + + // statements are equal + if (stmt == stmt2 || + (stmt != null && stmt2 != null && stmt.equals(stmt2))) { + + // actions are equal + return action == rec.getChangeAction(); + + } + + return false; + + } + + public String toString() { + + StringBuilder sb = new StringBuilder(); + + sb.append(action).append(": ").append(stmt); + + return sb.toString(); + + } + + public static final Comparator<IChangeRecord> COMPARATOR = + new Comparator<IChangeRecord>() { + + public int compare(final IChangeRecord r1, final IChangeRecord r2) { + + final ISPO spo1 = new SPO(r1.getStatement()); + final ISPO spo2 = new SPO(r2.getStatement()); + + return SPOComparator.INSTANCE.compare(spo1, spo2); + + } + + }; + } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/IChangeRecord.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -106,14 +106,14 @@ */ ChangeAction getChangeAction(); - /** - * If the change action is {@link ChangeAction#TYPE_CHANGE}, this method - * will return the old statement type of the focus statement. The - * new statement type is available on the focus statement itself. - * - * @return - * the old statement type of the focus statement - */ - StatementEnum getOldStatementType(); +// /** +// * If the change action is {@link ChangeAction#TYPE_CHANGE}, this method +// * will return the old statement type of the focus statement. The +// * new statement type is available on the focus statement itself. +// * +// * @return +// * the old statement type of the focus statement +// */ +// StatementEnum getOldStatementType(); } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/ProxyBigdataSailTestCase.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -274,8 +274,8 @@ return bindingSet; } - protected void compare(final TupleQueryResult result, - final Collection<BindingSet> answer) + protected void compare(final TupleQueryResult actual, + final Collection<BindingSet> expected) throws QueryEvaluationException { try { @@ -285,13 +285,13 @@ int resultCount = 0; int nmatched = 0; - while (result.hasNext()) { - BindingSet bindingSet = result.next(); + while (actual.hasNext()) { + BindingSet bindingSet = actual.next(); resultCount++; boolean match = false; if(log.isInfoEnabled()) log.info(bindingSet); - Iterator<BindingSet> it = answer.iterator(); + Iterator<BindingSet> it = expected.iterator(); while (it.hasNext()) { if (it.next().equals(bindingSet)) { it.remove(); @@ -304,7 +304,7 @@ extraResults.add(bindingSet); } } - missingResults = answer; + missingResults = expected; for (BindingSet bs : extraResults) { if (log.isInfoEnabled()) { @@ -326,7 +326,7 @@ } finally { - result.close(); + actual.close(); } Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-04 23:12:46 UTC (rev 3728) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-05 02:33:11 UTC (rev 3729) @@ -26,14 +26,21 @@ package com.bigdata.rdf.sail; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.log4j.Logger; import org.openrdf.model.URI; import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.query.BindingSet; +import com.bigdata.rdf.axioms.NoAxioms; +import com.bigdata.rdf.axioms.OwlAxioms; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.sail.changesets.ChangeRecord; @@ -41,6 +48,8 @@ import com.bigdata.rdf.sail.changesets.IChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.store.BD; +import com.bigdata.rdf.vocab.NoVocabulary; +import com.bigdata.rdf.vocab.RDFSVocabulary; /** * @author <a href="mailto:mrp...@us...">Mike Personick</a> @@ -50,6 +59,44 @@ protected static final Logger log = Logger.getLogger(TestChangeSets.class); + public Properties getTriplesNoInference() { + + Properties props = super.getProperties(); + + // triples with sids + props.setProperty(BigdataSail.Options.QUADS, "false"); + props.setProperty(BigdataSail.Options.STATEMENT_IDENTIFIERS, "true"); + + // no inference + props.setProperty(BigdataSail.Options.TRUTH_MAINTENANCE, "false"); + props.setProperty(BigdataSail.Options.AXIOMS_CLASS, NoAxioms.class.getName()); + props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, NoVocabulary.class.getName()); + props.setProperty(BigdataSail.Options.JUSTIFY, "false"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + + return props; + + } + + public Properties getTriplesWithInference() { + + Properties props = super.getProperties(); + + // triples with sids + props.setProperty(BigdataSail.Options.QUADS, "false"); + props.setProperty(BigdataSail.Options.STATEMENT_IDENTIFIERS, "true"); + + // no inference + props.setProperty(BigdataSail.Options.TRUTH_MAINTENANCE, "true"); + props.setProperty(BigdataSail.Options.AXIOMS_CLASS, OwlAxioms.class.getName()); + props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, RDFSVocabulary.class.getName()); + props.setProperty(BigdataSail.Options.JUSTIFY, "true"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + + return props; + + } + /** * */ @@ -63,9 +110,9 @@ super(arg0); } - public void testChangeSets() throws Exception { + public void testSimpleAdd() throws Exception { - final BigdataSail sail = getSail(); + final BigdataSail sail = getSail(getTriplesNoInference()); sail.initialize(); final BigdataSailRepository repo = new BigdataSailRepository(sail); final BigdataSailRepositoryConnection cxn = @@ -85,6 +132,188 @@ final URI b = vf.createURI(ns+"B"); final URI c = vf.createURI(ns+"C"); + final BigdataStatement[] stmts = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] stmts2 = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + // add the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + { // should see all of the stmts[] added + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + // add the stmts[] again + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + { // shouldn't see any change records + + compare(new LinkedList<IChangeRecord>(), changeLog.getChangeSet()); + + } + + // add the stmts2[] + + for (BigdataStatement stmt : stmts2) { + cxn.add(stmt); + } + + cxn.commit();// + + { // should see all of the stmts2[] added + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts2) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + if (log.isDebugEnabled()) { + log.debug("\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + + public void testSimpleRemove() throws Exception { + + final BigdataSail sail = getSail(getTriplesNoInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + + final BigdataStatement[] stmts = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + // add the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.add(stmt); + } + + cxn.commit();// + + // remove the stmts[] + + for (BigdataStatement stmt : stmts) { + cxn.remove(stmt); + } + + cxn.commit();// + + if (log.isDebugEnabled()) { + log.debug("\ndump store:\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + { // should see all of the stmts[] removed + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : stmts) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + // remove the stmts[] again + + for (BigdataStatement stmt : stmts) { + cxn.remove(stmt); + } + + cxn.commit();// + + { // shouldn't see any change records + + compare(new LinkedList<IChangeRecord>(), changeLog.getChangeSet()); + + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + + public void testTruthMaintenance() throws Exception { + + final BigdataSail sail = getSail(getTriplesWithInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + final BigdataStatement[] explicit = new BigdataStatement[] { vf.createStatement(a, RDFS.SUBCLASSOF, b), vf.createStatement(b, RDFS.SUBCLASSOF, c), @@ -134,11 +363,52 @@ } - private void compare(final Collection<IChangeRecord> expected, + private void compare(final Collection<IChangeRecord> expected, final Collection<IChangeRecord> actual) { - fail(); + final Collection<IChangeRecord> extra = new LinkedList<IChangeRecord>(); + Collection<IChangeRecord> missing = new LinkedList<IChangeRecord>(); + + int resultCount = 0; + int nmatched = 0; + for (IChangeRecord rec : actual) { + resultCount++; + boolean match = false; + if(log.isInfoEnabled()) + log.info(rec); + Iterator<IChangeRecord> it = expected.iterator(); + while (it.hasNext()) { + if (it.next().equals(rec)) { + it.remove(); + match = true; + nmatched++; + break; + } + } + if (match == false) { + extra.add(rec); + } + } + missing = expected; + + for (IChangeRecord rec : extra) { + if (log.isInfoEnabled()) { + log.info("extra result: " + rec); + } + } + for (IChangeRecord rec : missing) { + if (log.isInfoEnabled()) { + log.info("missing result: " + rec); + } + } + + if (!extra.isEmpty() || !missing.isEmpty()) { + fail("matchedResults=" + nmatched + ", extraResults=" + + extra.size() + ", missingResults=" + + missing.size()); + } + } /** @@ -165,12 +435,16 @@ public synchronized void changeEvent(final IChangeRecord record) { + System.err.println(record); + uncommitted.put(record.getStatement(), record); } public synchronized void transactionCommited() { + System.err.println("transaction committed"); + committed.clear(); committed.putAll(uncommitted); @@ -181,6 +455,8 @@ public synchronized void transactionAborted() { + System.err.println("transaction aborted"); + uncommitted.clear(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |