From: <mrp...@us...> - 2010-10-06 02:34:21
|
Revision: 3736 http://bigdata.svn.sourceforge.net/bigdata/?rev=3736&view=rev Author: mrpersonick Date: 2010-10-06 02:34:14 +0000 (Wed, 06 Oct 2010) Log Message: ----------- change sets notification for truth maintenance add and remove Modified Paths: -------------- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java Added Paths: ----------- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -29,11 +29,15 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.ISPOAssertionBuffer; import com.bigdata.rdf.spo.JustificationWriter; @@ -101,6 +105,10 @@ * {@link Justification}s for entailments. */ protected final boolean justify; + + protected final IChangeLog changeLog; + + protected final Map<IV, BigdataBNode> bnodes; /** * Create a buffer. @@ -126,6 +134,17 @@ AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity, boolean justified) { + this(focusStore, db, filter, capacity, justified, + null/* changeLog */, null/* bnodes */); + + } + + public SPOAssertionBuffer(AbstractTripleStore focusStore, + AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity, + boolean justified, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes + ) { + super(db, filter, capacity); if (focusStore == null) @@ -142,6 +161,10 @@ justifications = justified ? new Justification[capacity] : null; + this.changeLog = changeLog; + + this.bnodes = bnodes; + } /** @@ -180,12 +203,28 @@ if (numJustifications == 0) { - // batch insert statements into the focusStore. - n = db.addStatements( + if (changeLog == null) { + + // batch insert statements into the focusStore. + n = db.addStatements( focusStore, true/* copyOnly */, new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/), null/*filter*/); + + } else { + + n = com.bigdata.rdf.sail.changesets. + StatementWriter.addStatements( + db, + focusStore, + true/* copyOnly */, + null/* filter */, + new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/), + changeLog, + bnodes); + + } } else { @@ -209,7 +248,8 @@ // task will write SPOs on the statement indices. tasks.add(new StatementWriter(getTermDatabase(), focusStore, false/* copyOnly */, new ChunkedArrayIterator<ISPO>( - numStmts, stmts, null/*keyOrder*/), nwritten)); + numStmts, stmts, null/*keyOrder*/), nwritten, + changeLog, bnodes)); // task will write justifications on the justifications index. final AtomicLong nwrittenj = new AtomicLong(); Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -27,6 +27,11 @@ package com.bigdata.rdf.inf; +import java.util.Map; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.StatementWriter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.store.AbstractTripleStore; @@ -50,6 +55,10 @@ private final AbstractTripleStore store; private final boolean computeClosureForStatementIdentifiers; + protected final IChangeLog changeLog; + + protected final Map<IV, BigdataBNode> bnodes; + /** * @param store * The database from which the statement will be removed when the @@ -63,6 +72,15 @@ public SPORetractionBuffer(AbstractTripleStore store, int capacity, boolean computeClosureForStatementIdentifiers) { + this(store, capacity, computeClosureForStatementIdentifiers, + null/* changeLog */, null/* bnodes */); + + } + + public SPORetractionBuffer(AbstractTripleStore store, int capacity, + boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + super(store, null/*filter*/, capacity); if (store == null) @@ -72,14 +90,34 @@ this.computeClosureForStatementIdentifiers = computeClosureForStatementIdentifiers; + this.changeLog = changeLog; + + this.bnodes = bnodes; + } public int flush() { if (isEmpty()) return 0; - long n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts, + final long n; + + if (changeLog == null) { + + n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts, null/*keyOrder*/), computeClosureForStatementIdentifiers); + + } else { + + n = StatementWriter.removeStatements( + store, + new ChunkedArrayIterator<ISPO>( + numStmts,stmts,null/*keyOrder*/), + computeClosureForStatementIdentifiers, + changeLog, + bnodes); + + } // reset the counter. numStmts = 0; Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -47,21 +47,27 @@ package com.bigdata.rdf.inf; +import java.util.Map; import java.util.Properties; import org.apache.log4j.Logger; import org.apache.log4j.MDC; import com.bigdata.journal.TemporaryStore; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.StatementEnum; import com.bigdata.rdf.rio.IStatementBuffer; import com.bigdata.rdf.rules.InferenceEngine; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.spo.SPOArrayIterator; import com.bigdata.rdf.spo.SPOKeyOrder; import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIterator; import com.bigdata.rdf.store.IRawTripleStore; import com.bigdata.rdf.store.TempTripleStore; import com.bigdata.relation.accesspath.IElementFilter; @@ -234,8 +240,21 @@ static public int applyExistingStatements( final AbstractTripleStore focusStore, final AbstractTripleStore database, - final IElementFilter<ISPO> filter) { + final IElementFilter<ISPO> filter + ) { + + return applyExistingStatements(focusStore, database, filter, + null/* changeLog */, null/* bnodes */); + } + + static public int applyExistingStatements( + final AbstractTripleStore focusStore, + final AbstractTripleStore database, + final IElementFilter<ISPO> filter, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes + ) { + if(INFO) log.info("Filtering statements already known to the database"); @@ -248,7 +267,7 @@ final IChunkedOrderedIterator<ISPO> itr = focusStore.getAccessPath( SPOKeyOrder.SPO, ExplicitSPOFilter.INSTANCE).iterator(); - + int nremoved = 0; int nupgraded = 0; @@ -266,7 +285,8 @@ */ final SPOAssertionBuffer assertionBuffer = new SPOAssertionBuffer( - database, database, filter, capacity, false/* justified */); + database, database, filter, capacity, false/* justified */, + changeLog, bnodes); /* * This buffer will retract statements from the tempStore that are @@ -290,7 +310,7 @@ for(int i=0; i<chunk.length; i++) { final SPO spo = (SPO)chunk[i]; - + // Lookup the statement in the database. final ISPO tmp = database.getStatement(spo.s, spo.p, spo.o); @@ -365,6 +385,13 @@ */ public ClosureStats assertAll(final TempTripleStore tempStore) { + return assertAll(tempStore, null/* changeLog */, null/* bnodes */); + + } + + public ClosureStats assertAll(final TempTripleStore tempStore, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + if (tempStore == null) { throw new IllegalArgumentException(); @@ -409,7 +436,7 @@ * consistent if we change our mind about that practice. */ - applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter); + applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter, changeLog, bnodes); final ClosureStats stats = inferenceEngine.computeClosure(tempStore); @@ -429,7 +456,8 @@ // tempStore.dumpStore(database,true,true,false,true); final long ncopied = tempStore.copyStatements(database, - null/* filter */, true /* copyJustifications */); + null/* filter */, true /* copyJustifications */, + changeLog, bnodes); // database.dumpStore(database,true,true,false,true); @@ -478,6 +506,13 @@ */ public ClosureStats retractAll(final TempTripleStore tempStore) { + return retractAll(tempStore, null/* changeLog */, null/* bnodes */); + + } + + public ClosureStats retractAll(final TempTripleStore tempStore, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + final long begin = System.currentTimeMillis(); final ClosureStats stats = new ClosureStats(); @@ -512,7 +547,7 @@ } // do truth maintenance. - retractAll(stats, tempStore, 0); + retractAll(stats, tempStore, 0, changeLog, bnodes); MDC.remove("depth"); @@ -591,7 +626,8 @@ * explicit statements to be retracted. */ private void retractAll(final ClosureStats stats, - final TempTripleStore tempStore, final int depth) { + final TempTripleStore tempStore, final int depth, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { MDC.put("depth", "depth=" + depth); @@ -640,7 +676,9 @@ database, // the persistent db. null, //filter @todo was inferenceEngine.doNotAddFilter, capacity,// - false // justify + false,// justify + changeLog, + bnodes ); /* @@ -657,7 +695,8 @@ * identifiers. */ final SPORetractionBuffer retractionBuffer = new SPORetractionBuffer( - database, capacity, false/* computeClosureForStatementIdentifiers */); + database, capacity, false/* computeClosureForStatementIdentifiers */, + changeLog, bnodes); /* * Note: when we enter this method recursively statements in the @@ -964,7 +1003,7 @@ * Recursive processing. */ - retractAll(stats, focusStore, depth + 1); + retractAll(stats, focusStore, depth + 1, changeLog, bnodes); } Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -1,9 +1,17 @@ package com.bigdata.rdf.spo; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.log4j.Logger; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; +import com.bigdata.rdf.sail.changesets.ChangeRecord; +import com.bigdata.rdf.sail.changesets.IChangeLog; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIteratorImpl; import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.striterator.IChunkedOrderedIterator; @@ -18,6 +26,8 @@ */ public class StatementWriter implements Callable<Long>{ + protected static final Logger log = Logger.getLogger(StatementWriter.class); + private final AbstractTripleStore database; private final AbstractTripleStore statementStore; private final boolean copyOnly; @@ -27,6 +37,10 @@ * Incremented by the #of statements written on the statements indices. */ public final AtomicLong nwritten; + + private final IChangeLog changeLog; + + private final Map<IV, BigdataBNode> bnodes; /** * @param database @@ -51,7 +65,17 @@ public StatementWriter(AbstractTripleStore database, AbstractTripleStore statementStore, boolean copyOnly, IChunkedOrderedIterator<ISPO> itr, AtomicLong nwritten) { - + + this(database, statementStore, copyOnly, itr, nwritten, + null/* changeLog */, null/* bnodes */); + + } + + public StatementWriter(final AbstractTripleStore database, + final AbstractTripleStore statementStore, final boolean copyOnly, + final IChunkedOrderedIterator<ISPO> itr, final AtomicLong nwritten, + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) { + if (database == null) throw new IllegalArgumentException(); @@ -73,6 +97,10 @@ this.itr = itr; this.nwritten = nwritten; + + this.changeLog = changeLog; + + this.bnodes = bnodes; } @@ -85,11 +113,30 @@ final long begin = System.currentTimeMillis(); - nwritten.addAndGet(database.addStatements(statementStore, copyOnly, - itr, null/* filter */)); + final long n; + + if (changeLog == null) { + + n = database.addStatements(statementStore, copyOnly, + itr, null/* filter */); + + } else { + n = com.bigdata.rdf.sail.changesets.StatementWriter.addStatements( + database, + statementStore, + copyOnly, + null/* filter */, + itr, + changeLog, + bnodes); + + } + + nwritten.addAndGet(n); + return System.currentTimeMillis() - begin; } - + } Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -87,6 +87,7 @@ import com.bigdata.rdf.lexicon.ITermIndexCodes; import com.bigdata.rdf.lexicon.ITextIndexer; import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataResource; import com.bigdata.rdf.model.BigdataStatement; import com.bigdata.rdf.model.BigdataURI; @@ -102,6 +103,7 @@ import com.bigdata.rdf.rules.MatchRule; import com.bigdata.rdf.rules.RDFJoinNexusFactory; import com.bigdata.rdf.rules.RuleContextEnum; +import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.spo.BulkCompleteConverter; import com.bigdata.rdf.spo.BulkFilterConverter; import com.bigdata.rdf.spo.ExplicitSPOFilter; @@ -2982,6 +2984,18 @@ final AbstractTripleStore dst,// final IElementFilter<ISPO> filter,// final boolean copyJustifications// + ) { + + return copyStatements(dst, filter, copyJustifications, + null/* changeLog */, null /* bnodes */); + + } + + public long copyStatements(// + final AbstractTripleStore dst,// + final IElementFilter<ISPO> filter,// + final boolean copyJustifications,// + final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes ) { if (dst == this) @@ -2995,9 +3009,25 @@ if (!copyJustifications) { - // add statements to the target store. - return dst - .addStatements(dst, true/* copyOnly */, itr, null/* filter */); + if (changeLog == null) { + + // add statements to the target store. + return dst + .addStatements(dst, true/* copyOnly */, itr, null/* filter */); + + } else { + + return com.bigdata.rdf.sail.changesets. + StatementWriter.addStatements( + dst, + dst, + true/* copyOnly */, + null/* filter */, + itr, + changeLog, + bnodes); + + } } else { @@ -3020,8 +3050,8 @@ final AtomicLong nwritten = new AtomicLong(); // task will write SPOs on the statement indices. - tasks.add(new StatementWriter(this, dst, true/* copyOnly */, - itr, nwritten)); + tasks.add(new StatementWriter(dst, dst, true/* copyOnly */, + itr, nwritten, changeLog, bnodes)); // task will write justifications on the justifications index. final AtomicLong nwrittenj = new AtomicLong(); Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -132,6 +132,7 @@ import com.bigdata.rdf.sail.changesets.ChangeRecord; import com.bigdata.rdf.sail.changesets.IChangeLog; import com.bigdata.rdf.sail.changesets.IChangeRecord; +import com.bigdata.rdf.sail.changesets.StatementWriter; import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; import com.bigdata.rdf.spo.ExplicitSPOFilter; import com.bigdata.rdf.spo.ISPO; @@ -2329,35 +2330,42 @@ } else { - final IAccessPath<ISPO> ap = - database.getAccessPath(s, p, o, c); - - final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + final IChunkedOrderedIterator<ISPO> itr = + database.getAccessPath(s, p, o, c).iterator(); - if (itr.hasNext()) { - - final BigdataStatementIteratorImpl itr2 = - new BigdataStatementIteratorImpl(database, bnodes2, itr) - .start(database.getExecutorService()); - - final BigdataStatement[] stmts = - new BigdataStatement[database.getChunkCapacity()]; - - int i = 0; - while (i < stmts.length && itr2.hasNext()) { - stmts[i++] = itr2.next(); - if (i == stmts.length) { - // process stmts[] - n += removeAndNotify(stmts, i); - i = 0; - } - } - if (i > 0) { - n += removeAndNotify(stmts, i); - } - - } + n = StatementWriter.removeStatements(database, itr, + true/* computeClosureForStatementIdentifiers */, + changeLog, bnodes2); +// final IAccessPath<ISPO> ap = +// database.getAccessPath(s, p, o, c); +// +// final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); +// +// if (itr.hasNext()) { +// +// final BigdataStatementIteratorImpl itr2 = +// new BigdataStatementIteratorImpl(database, bnodes2, itr) +// .start(database.getExecutorService()); +// +// final BigdataStatement[] stmts = +// new BigdataStatement[database.getChunkCapacity()]; +// +// int i = 0; +// while (i < stmts.length && itr2.hasNext()) { +// stmts[i++] = itr2.next(); +// if (i == stmts.length) { +// // process stmts[] +// n += removeAndNotify(stmts, i); +// i = 0; +// } +// } +// if (i > 0) { +// n += removeAndNotify(stmts, i); +// } +// +// } + } } @@ -2367,69 +2375,69 @@ } - private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { - - final SPO[] tmp = new SPO[numStmts]; +// private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { +// +// final SPO[] tmp = new SPO[numStmts]; +// +// for (int i = 0; i < tmp.length; i++) { +// +// final BigdataStatement stmt = stmts[i]; +// +// /* +// * Note: context position is not passed when statement identifiers +// * are in use since the statement identifier is assigned based on +// * the {s,p,o} triple. +// */ +// +// final SPO spo = new SPO(stmt); +// +// if (log.isDebugEnabled()) +// log.debug("adding: " + stmt.toString() + " (" + spo + ")"); +// +// if(!spo.isFullyBound()) { +// +// throw new AssertionError("Not fully bound? : " + spo); +// +// } +// +// tmp[i] = spo; +// +// } +// +// /* +// * Note: When handling statement identifiers, we clone tmp[] to avoid a +// * side-effect on its order so that we can unify the assigned statement +// * identifiers below. +// * +// * Note: In order to report back the [ISPO#isModified()] flag, we also +// * need to clone tmp[] to avoid a side effect on its order. Therefore we +// * now always clone tmp[]. +// */ +//// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); +// final long nwritten = database.removeStatements(tmp.clone(), numStmts); +// +// // Copy the state of the isModified() flag +// { +// +// for (int i = 0; i < numStmts; i++) { +// +// if (tmp[i].isModified()) { +// +// stmts[i].setModified(true); +// +// changeLog.changeEvent( +// new ChangeRecord(stmts[i], ChangeAction.REMOVED)); +// +// } +// +// } +// +// } +// +// return nwritten; +// +// } - for (int i = 0; i < tmp.length; i++) { - - final BigdataStatement stmt = stmts[i]; - - /* - * Note: context position is not passed when statement identifiers - * are in use since the statement identifier is assigned based on - * the {s,p,o} triple. - */ - - final SPO spo = new SPO(stmt); - - if (log.isDebugEnabled()) - log.debug("adding: " + stmt.toString() + " (" + spo + ")"); - - if(!spo.isFullyBound()) { - - throw new AssertionError("Not fully bound? : " + spo); - - } - - tmp[i] = spo; - - } - - /* - * Note: When handling statement identifiers, we clone tmp[] to avoid a - * side-effect on its order so that we can unify the assigned statement - * identifiers below. - * - * Note: In order to report back the [ISPO#isModified()] flag, we also - * need to clone tmp[] to avoid a side effect on its order. Therefore we - * now always clone tmp[]. - */ -// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts); - final long nwritten = database.removeStatements(tmp.clone(), numStmts); - - // Copy the state of the isModified() flag - { - - for (int i = 0; i < numStmts; i++) { - - if (tmp[i].isModified()) { - - stmts[i].setModified(true); - - changeLog.changeEvent( - new ChangeRecord(stmts[i], ChangeAction.REMOVED)); - - } - - } - - } - - return nwritten; - - } - public synchronized CloseableIteration<? extends Resource, SailException> getContextIDs() throws SailException { @@ -2695,7 +2703,9 @@ if(getTruthMaintenance()) { // do TM, writing on the database. - tm.assertAll((TempTripleStore)assertBuffer.getStatementStore()); + tm.assertAll( + (TempTripleStore)assertBuffer.getStatementStore(), + changeLog, bnodes2); // must be reallocated on demand. assertBuffer = null; @@ -2712,7 +2722,8 @@ if(getTruthMaintenance()) { // do TM, writing on the database. - tm.retractAll((TempTripleStore)retractBuffer.getStatementStore()); + tm.retractAll((TempTripleStore)retractBuffer.getStatementStore(), + changeLog, bnodes2); // must be re-allocated on demand. retractBuffer = null; Added: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java (rev 0) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -0,0 +1,196 @@ +package com.bigdata.rdf.sail.changesets; + +import java.util.Iterator; +import java.util.Map; +import org.apache.log4j.Logger; +import com.bigdata.rdf.internal.IV; +import com.bigdata.rdf.model.BigdataBNode; +import com.bigdata.rdf.model.BigdataStatement; +import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction; +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPO; +import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.rdf.store.BigdataStatementIteratorImpl; +import com.bigdata.relation.accesspath.IElementFilter; +import com.bigdata.striterator.ChunkedArrayIterator; +import com.bigdata.striterator.IChunkedOrderedIterator; + +public class StatementWriter { + + protected static final Logger log = Logger.getLogger(StatementWriter.class); + + public static long addStatements(final AbstractTripleStore database, + final AbstractTripleStore statementStore, + final boolean copyOnly, + final IElementFilter<ISPO> filter, + final IChunkedOrderedIterator<ISPO> itr, + final IChangeLog changeLog, + final Map<IV, BigdataBNode> bnodes) { + + long n = 0; + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while ((i = nextChunk(itr2, stmts)) > 0) { + n += addStatements(database, statementStore, copyOnly, filter, + stmts, i, changeLog); + } + + } + + return n; + + } + + private static long addStatements(final AbstractTripleStore database, + final AbstractTripleStore statementStore, + final boolean copyOnly, + final IElementFilter<ISPO> filter, + final BigdataStatement[] stmts, + final int numStmts, + final IChangeLog changeLog) { + + final SPO[] tmp = allocateSPOs(stmts, numStmts); + + final long n = database.addStatements(statementStore, copyOnly, + new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(), + null/* keyOrder */), filter); + + // Copy the state of the isModified() flag and notify changeLog + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.ADDED)); + + } + + } + + return n; + + } + + public static long removeStatements(final AbstractTripleStore database, + final IChunkedOrderedIterator<ISPO> itr, + final boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog, + final Map<IV, BigdataBNode> bnodes) { + + long n = 0; + + if (itr.hasNext()) { + + final BigdataStatementIteratorImpl itr2 = + new BigdataStatementIteratorImpl(database, bnodes, itr) + .start(database.getExecutorService()); + + final BigdataStatement[] stmts = + new BigdataStatement[database.getChunkCapacity()]; + + int i = 0; + while ((i = nextChunk(itr2, stmts)) > 0) { + n += removeStatements(database, stmts, i, + computeClosureForStatementIdentifiers, changeLog); + } + + } + + return n; + + } + + private static long removeStatements(final AbstractTripleStore database, + final BigdataStatement[] stmts, + final int numStmts, + final boolean computeClosureForStatementIdentifiers, + final IChangeLog changeLog) { + + final SPO[] tmp = allocateSPOs(stmts, numStmts); + + final long n = database.removeStatements( + new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(), + null/* keyOrder */), + computeClosureForStatementIdentifiers); + + // Copy the state of the isModified() flag and notify changeLog + for (int i = 0; i < numStmts; i++) { + + if (tmp[i].isModified()) { + + stmts[i].setModified(true); + + changeLog.changeEvent( + new ChangeRecord(stmts[i], ChangeAction.REMOVED)); + + } + + } + + return n; + + } + + private static int nextChunk(final Iterator<BigdataStatement> itr, + final BigdataStatement[] stmts) { + + assert stmts != null && stmts.length > 0; + + int i = 0; + while (itr.hasNext()) { + stmts[i++] = itr.next(); + if (i == stmts.length) { + // stmts[] is full + return i; + } + } + + /* + * stmts[] is empty (i = 0) or partially + * full (i > 0 && i < stmts.length) + */ + return i; + + } + + private static SPO[] allocateSPOs(final BigdataStatement[] stmts, + final int numStmts) { + + final SPO[] tmp = new SPO[numStmts]; + + for (int i = 0; i < tmp.length; i++) { + + final BigdataStatement stmt = stmts[i]; + + final SPO spo = new SPO(stmt); + + if (log.isDebugEnabled()) + log.debug("writing: " + stmt.toString() + " (" + spo + ")"); + + if(!spo.isFullyBound()) { + + throw new AssertionError("Not fully bound? : " + spo); + + } + + tmp[i] = spo; + + } + + return tmp; + + + } + +} Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java =================================================================== --- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-05 20:30:37 UTC (rev 3735) +++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-06 02:34:14 UTC (rev 3736) @@ -292,7 +292,7 @@ } - public void testTruthMaintenance() throws Exception { + public void testTMAdd() throws Exception { final BigdataSail sail = getSail(getTriplesWithInference()); sail.initialize(); @@ -363,6 +363,113 @@ } + public void testTMRetract() throws Exception { + + final BigdataSail sail = getSail(getTriplesWithInference()); + sail.initialize(); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + final BigdataSailRepositoryConnection cxn = + (BigdataSailRepositoryConnection) repo.getConnection(); + cxn.setAutoCommit(false); + + final TestChangeLog changeLog = new TestChangeLog(); + cxn.setChangeLog(changeLog); + + try { + + final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory(); + + final String ns = BD.NAMESPACE; + + final URI a = vf.createURI(ns+"A"); + final URI b = vf.createURI(ns+"B"); + final URI c = vf.createURI(ns+"C"); + + final BigdataStatement[] explicitAdd = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, b), + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] inferredAdd = new BigdataStatement[] { + vf.createStatement(a, RDF.TYPE, RDFS.CLASS), + vf.createStatement(a, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(a, RDFS.SUBCLASSOF, a), + vf.createStatement(a, RDFS.SUBCLASSOF, c), + vf.createStatement(b, RDF.TYPE, RDFS.CLASS), + vf.createStatement(b, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(b, RDFS.SUBCLASSOF, b), + vf.createStatement(c, RDF.TYPE, RDFS.CLASS), + vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(c, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] explicitRemove = new BigdataStatement[] { + vf.createStatement(b, RDFS.SUBCLASSOF, c), + }; + + final BigdataStatement[] inferredRemove = new BigdataStatement[] { + vf.createStatement(a, RDFS.SUBCLASSOF, c), + vf.createStatement(c, RDF.TYPE, RDFS.CLASS), + vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE), + vf.createStatement(c, RDFS.SUBCLASSOF, c), + }; + +/**/ + cxn.setNamespace("ns", ns); + + for (BigdataStatement stmt : explicitAdd) { + cxn.add(stmt); + } + + cxn.commit();// + + { + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : explicitAdd) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + for (BigdataStatement stmt : inferredAdd) { + expected.add(new ChangeRecord(stmt, ChangeAction.ADDED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + for (BigdataStatement stmt : explicitRemove) { + cxn.remove(stmt); + } + + cxn.commit();// + + { + + final Collection<IChangeRecord> expected = + new LinkedList<IChangeRecord>(); + for (BigdataStatement stmt : explicitRemove) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + for (BigdataStatement stmt : inferredRemove) { + expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED)); + } + + compare(expected, changeLog.getChangeSet()); + + } + + if (log.isDebugEnabled()) { + log.debug("\n" + sail.getDatabase().dumpStore(true, true, false)); + } + + } finally { + cxn.close(); + sail.__tearDownUnitTest(); + } + + } + private void compare(final Collection<IChangeRecord> expected, final Collection<IChangeRecord> actual) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |