|
From: <mrp...@us...> - 2010-10-06 02:34:21
|
Revision: 3736
http://bigdata.svn.sourceforge.net/bigdata/?rev=3736&view=rev
Author: mrpersonick
Date: 2010-10-06 02:34:14 +0000 (Wed, 06 Oct 2010)
Log Message:
-----------
change sets notification for truth maintenance add and remove
Modified Paths:
--------------
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java
branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java
branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java
Added Paths:
-----------
branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java
Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPOAssertionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -29,11 +29,15 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.model.BigdataBNode;
+import com.bigdata.rdf.sail.changesets.IChangeLog;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.ISPOAssertionBuffer;
import com.bigdata.rdf.spo.JustificationWriter;
@@ -101,6 +105,10 @@
* {@link Justification}s for entailments.
*/
protected final boolean justify;
+
+ protected final IChangeLog changeLog;
+
+ protected final Map<IV, BigdataBNode> bnodes;
/**
* Create a buffer.
@@ -126,6 +134,17 @@
AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity,
boolean justified) {
+ this(focusStore, db, filter, capacity, justified,
+ null/* changeLog */, null/* bnodes */);
+
+ }
+
+ public SPOAssertionBuffer(AbstractTripleStore focusStore,
+ AbstractTripleStore db, IElementFilter<ISPO> filter, int capacity,
+ boolean justified,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes
+ ) {
+
super(db, filter, capacity);
if (focusStore == null)
@@ -142,6 +161,10 @@
justifications = justified ? new Justification[capacity] : null;
+ this.changeLog = changeLog;
+
+ this.bnodes = bnodes;
+
}
/**
@@ -180,12 +203,28 @@
if (numJustifications == 0) {
- // batch insert statements into the focusStore.
- n = db.addStatements(
+ if (changeLog == null) {
+
+ // batch insert statements into the focusStore.
+ n = db.addStatements(
focusStore,
true/* copyOnly */,
new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/),
null/*filter*/);
+
+ } else {
+
+ n = com.bigdata.rdf.sail.changesets.
+ StatementWriter.addStatements(
+ db,
+ focusStore,
+ true/* copyOnly */,
+ null/* filter */,
+ new ChunkedArrayIterator<ISPO>(numStmts, stmts, null/*keyOrder*/),
+ changeLog,
+ bnodes);
+
+ }
} else {
@@ -209,7 +248,8 @@
// task will write SPOs on the statement indices.
tasks.add(new StatementWriter(getTermDatabase(), focusStore,
false/* copyOnly */, new ChunkedArrayIterator<ISPO>(
- numStmts, stmts, null/*keyOrder*/), nwritten));
+ numStmts, stmts, null/*keyOrder*/), nwritten,
+ changeLog, bnodes));
// task will write justifications on the justifications index.
final AtomicLong nwrittenj = new AtomicLong();
Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/SPORetractionBuffer.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -27,6 +27,11 @@
package com.bigdata.rdf.inf;
+import java.util.Map;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.model.BigdataBNode;
+import com.bigdata.rdf.sail.changesets.IChangeLog;
+import com.bigdata.rdf.sail.changesets.StatementWriter;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.SPO;
import com.bigdata.rdf.store.AbstractTripleStore;
@@ -50,6 +55,10 @@
private final AbstractTripleStore store;
private final boolean computeClosureForStatementIdentifiers;
+ protected final IChangeLog changeLog;
+
+ protected final Map<IV, BigdataBNode> bnodes;
+
/**
* @param store
* The database from which the statement will be removed when the
@@ -63,6 +72,15 @@
public SPORetractionBuffer(AbstractTripleStore store, int capacity,
boolean computeClosureForStatementIdentifiers) {
+ this(store, capacity, computeClosureForStatementIdentifiers,
+ null/* changeLog */, null/* bnodes */);
+
+ }
+
+ public SPORetractionBuffer(AbstractTripleStore store, int capacity,
+ boolean computeClosureForStatementIdentifiers,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) {
+
super(store, null/*filter*/, capacity);
if (store == null)
@@ -72,14 +90,34 @@
this.computeClosureForStatementIdentifiers = computeClosureForStatementIdentifiers;
+ this.changeLog = changeLog;
+
+ this.bnodes = bnodes;
+
}
public int flush() {
if (isEmpty()) return 0;
- long n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts,
+ final long n;
+
+ if (changeLog == null) {
+
+ n = store.removeStatements(new ChunkedArrayIterator<ISPO>(numStmts,stmts,
null/*keyOrder*/), computeClosureForStatementIdentifiers);
+
+ } else {
+
+ n = StatementWriter.removeStatements(
+ store,
+ new ChunkedArrayIterator<ISPO>(
+ numStmts,stmts,null/*keyOrder*/),
+ computeClosureForStatementIdentifiers,
+ changeLog,
+ bnodes);
+
+ }
// reset the counter.
numStmts = 0;
Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/inf/TruthMaintenance.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -47,21 +47,27 @@
package com.bigdata.rdf.inf;
+import java.util.Map;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
import com.bigdata.journal.TemporaryStore;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.model.BigdataBNode;
+import com.bigdata.rdf.model.BigdataStatement;
import com.bigdata.rdf.model.StatementEnum;
import com.bigdata.rdf.rio.IStatementBuffer;
import com.bigdata.rdf.rules.InferenceEngine;
+import com.bigdata.rdf.sail.changesets.IChangeLog;
import com.bigdata.rdf.spo.ExplicitSPOFilter;
import com.bigdata.rdf.spo.ISPO;
import com.bigdata.rdf.spo.SPO;
import com.bigdata.rdf.spo.SPOArrayIterator;
import com.bigdata.rdf.spo.SPOKeyOrder;
import com.bigdata.rdf.store.AbstractTripleStore;
+import com.bigdata.rdf.store.BigdataStatementIterator;
import com.bigdata.rdf.store.IRawTripleStore;
import com.bigdata.rdf.store.TempTripleStore;
import com.bigdata.relation.accesspath.IElementFilter;
@@ -234,8 +240,21 @@
static public int applyExistingStatements(
final AbstractTripleStore focusStore,
final AbstractTripleStore database,
- final IElementFilter<ISPO> filter) {
+ final IElementFilter<ISPO> filter
+ ) {
+
+ return applyExistingStatements(focusStore, database, filter,
+ null/* changeLog */, null/* bnodes */);
+ }
+
+ static public int applyExistingStatements(
+ final AbstractTripleStore focusStore,
+ final AbstractTripleStore database,
+ final IElementFilter<ISPO> filter,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes
+ ) {
+
if(INFO)
log.info("Filtering statements already known to the database");
@@ -248,7 +267,7 @@
final IChunkedOrderedIterator<ISPO> itr = focusStore.getAccessPath(
SPOKeyOrder.SPO, ExplicitSPOFilter.INSTANCE).iterator();
-
+
int nremoved = 0;
int nupgraded = 0;
@@ -266,7 +285,8 @@
*/
final SPOAssertionBuffer assertionBuffer = new SPOAssertionBuffer(
- database, database, filter, capacity, false/* justified */);
+ database, database, filter, capacity, false/* justified */,
+ changeLog, bnodes);
/*
* This buffer will retract statements from the tempStore that are
@@ -290,7 +310,7 @@
for(int i=0; i<chunk.length; i++) {
final SPO spo = (SPO)chunk[i];
-
+
// Lookup the statement in the database.
final ISPO tmp = database.getStatement(spo.s, spo.p, spo.o);
@@ -365,6 +385,13 @@
*/
public ClosureStats assertAll(final TempTripleStore tempStore) {
+ return assertAll(tempStore, null/* changeLog */, null/* bnodes */);
+
+ }
+
+ public ClosureStats assertAll(final TempTripleStore tempStore,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) {
+
if (tempStore == null) {
throw new IllegalArgumentException();
@@ -409,7 +436,7 @@
* consistent if we change our mind about that practice.
*/
- applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter);
+ applyExistingStatements(tempStore, database, inferenceEngine.doNotAddFilter, changeLog, bnodes);
final ClosureStats stats = inferenceEngine.computeClosure(tempStore);
@@ -429,7 +456,8 @@
// tempStore.dumpStore(database,true,true,false,true);
final long ncopied = tempStore.copyStatements(database,
- null/* filter */, true /* copyJustifications */);
+ null/* filter */, true /* copyJustifications */,
+ changeLog, bnodes);
// database.dumpStore(database,true,true,false,true);
@@ -478,6 +506,13 @@
*/
public ClosureStats retractAll(final TempTripleStore tempStore) {
+ return retractAll(tempStore, null/* changeLog */, null/* bnodes */);
+
+ }
+
+ public ClosureStats retractAll(final TempTripleStore tempStore,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) {
+
final long begin = System.currentTimeMillis();
final ClosureStats stats = new ClosureStats();
@@ -512,7 +547,7 @@
}
// do truth maintenance.
- retractAll(stats, tempStore, 0);
+ retractAll(stats, tempStore, 0, changeLog, bnodes);
MDC.remove("depth");
@@ -591,7 +626,8 @@
* explicit statements to be retracted.
*/
private void retractAll(final ClosureStats stats,
- final TempTripleStore tempStore, final int depth) {
+ final TempTripleStore tempStore, final int depth,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) {
MDC.put("depth", "depth=" + depth);
@@ -640,7 +676,9 @@
database, // the persistent db.
null, //filter @todo was inferenceEngine.doNotAddFilter,
capacity,//
- false // justify
+ false,// justify
+ changeLog,
+ bnodes
);
/*
@@ -657,7 +695,8 @@
* identifiers.
*/
final SPORetractionBuffer retractionBuffer = new SPORetractionBuffer(
- database, capacity, false/* computeClosureForStatementIdentifiers */);
+ database, capacity, false/* computeClosureForStatementIdentifiers */,
+ changeLog, bnodes);
/*
* Note: when we enter this method recursively statements in the
@@ -964,7 +1003,7 @@
* Recursive processing.
*/
- retractAll(stats, focusStore, depth + 1);
+ retractAll(stats, focusStore, depth + 1, changeLog, bnodes);
}
Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/spo/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -1,9 +1,17 @@
package com.bigdata.rdf.spo;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.log4j.Logger;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.model.BigdataBNode;
+import com.bigdata.rdf.model.BigdataStatement;
+import com.bigdata.rdf.sail.changesets.ChangeRecord;
+import com.bigdata.rdf.sail.changesets.IChangeLog;
+import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction;
import com.bigdata.rdf.store.AbstractTripleStore;
+import com.bigdata.rdf.store.BigdataStatementIteratorImpl;
import com.bigdata.relation.accesspath.IElementFilter;
import com.bigdata.striterator.IChunkedOrderedIterator;
@@ -18,6 +26,8 @@
*/
public class StatementWriter implements Callable<Long>{
+ protected static final Logger log = Logger.getLogger(StatementWriter.class);
+
private final AbstractTripleStore database;
private final AbstractTripleStore statementStore;
private final boolean copyOnly;
@@ -27,6 +37,10 @@
* Incremented by the #of statements written on the statements indices.
*/
public final AtomicLong nwritten;
+
+ private final IChangeLog changeLog;
+
+ private final Map<IV, BigdataBNode> bnodes;
/**
* @param database
@@ -51,7 +65,17 @@
public StatementWriter(AbstractTripleStore database,
AbstractTripleStore statementStore, boolean copyOnly,
IChunkedOrderedIterator<ISPO> itr, AtomicLong nwritten) {
-
+
+ this(database, statementStore, copyOnly, itr, nwritten,
+ null/* changeLog */, null/* bnodes */);
+
+ }
+
+ public StatementWriter(final AbstractTripleStore database,
+ final AbstractTripleStore statementStore, final boolean copyOnly,
+ final IChunkedOrderedIterator<ISPO> itr, final AtomicLong nwritten,
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes) {
+
if (database == null)
throw new IllegalArgumentException();
@@ -73,6 +97,10 @@
this.itr = itr;
this.nwritten = nwritten;
+
+ this.changeLog = changeLog;
+
+ this.bnodes = bnodes;
}
@@ -85,11 +113,30 @@
final long begin = System.currentTimeMillis();
- nwritten.addAndGet(database.addStatements(statementStore, copyOnly,
- itr, null/* filter */));
+ final long n;
+
+ if (changeLog == null) {
+
+ n = database.addStatements(statementStore, copyOnly,
+ itr, null/* filter */);
+
+ } else {
+ n = com.bigdata.rdf.sail.changesets.StatementWriter.addStatements(
+ database,
+ statementStore,
+ copyOnly,
+ null/* filter */,
+ itr,
+ changeLog,
+ bnodes);
+
+ }
+
+ nwritten.addAndGet(n);
+
return System.currentTimeMillis() - begin;
}
-
+
}
Modified: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -87,6 +87,7 @@
import com.bigdata.rdf.lexicon.ITermIndexCodes;
import com.bigdata.rdf.lexicon.ITextIndexer;
import com.bigdata.rdf.lexicon.LexiconRelation;
+import com.bigdata.rdf.model.BigdataBNode;
import com.bigdata.rdf.model.BigdataResource;
import com.bigdata.rdf.model.BigdataStatement;
import com.bigdata.rdf.model.BigdataURI;
@@ -102,6 +103,7 @@
import com.bigdata.rdf.rules.MatchRule;
import com.bigdata.rdf.rules.RDFJoinNexusFactory;
import com.bigdata.rdf.rules.RuleContextEnum;
+import com.bigdata.rdf.sail.changesets.IChangeLog;
import com.bigdata.rdf.spo.BulkCompleteConverter;
import com.bigdata.rdf.spo.BulkFilterConverter;
import com.bigdata.rdf.spo.ExplicitSPOFilter;
@@ -2982,6 +2984,18 @@
final AbstractTripleStore dst,//
final IElementFilter<ISPO> filter,//
final boolean copyJustifications//
+ ) {
+
+ return copyStatements(dst, filter, copyJustifications,
+ null/* changeLog */, null /* bnodes */);
+
+ }
+
+ public long copyStatements(//
+ final AbstractTripleStore dst,//
+ final IElementFilter<ISPO> filter,//
+ final boolean copyJustifications,//
+ final IChangeLog changeLog, final Map<IV, BigdataBNode> bnodes
) {
if (dst == this)
@@ -2995,9 +3009,25 @@
if (!copyJustifications) {
- // add statements to the target store.
- return dst
- .addStatements(dst, true/* copyOnly */, itr, null/* filter */);
+ if (changeLog == null) {
+
+ // add statements to the target store.
+ return dst
+ .addStatements(dst, true/* copyOnly */, itr, null/* filter */);
+
+ } else {
+
+ return com.bigdata.rdf.sail.changesets.
+ StatementWriter.addStatements(
+ dst,
+ dst,
+ true/* copyOnly */,
+ null/* filter */,
+ itr,
+ changeLog,
+ bnodes);
+
+ }
} else {
@@ -3020,8 +3050,8 @@
final AtomicLong nwritten = new AtomicLong();
// task will write SPOs on the statement indices.
- tasks.add(new StatementWriter(this, dst, true/* copyOnly */,
- itr, nwritten));
+ tasks.add(new StatementWriter(dst, dst, true/* copyOnly */,
+ itr, nwritten, changeLog, bnodes));
// task will write justifications on the justifications index.
final AtomicLong nwrittenj = new AtomicLong();
Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -132,6 +132,7 @@
import com.bigdata.rdf.sail.changesets.ChangeRecord;
import com.bigdata.rdf.sail.changesets.IChangeLog;
import com.bigdata.rdf.sail.changesets.IChangeRecord;
+import com.bigdata.rdf.sail.changesets.StatementWriter;
import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction;
import com.bigdata.rdf.spo.ExplicitSPOFilter;
import com.bigdata.rdf.spo.ISPO;
@@ -2329,35 +2330,42 @@
} else {
- final IAccessPath<ISPO> ap =
- database.getAccessPath(s, p, o, c);
-
- final IChunkedOrderedIterator<ISPO> itr = ap.iterator();
+ final IChunkedOrderedIterator<ISPO> itr =
+ database.getAccessPath(s, p, o, c).iterator();
- if (itr.hasNext()) {
-
- final BigdataStatementIteratorImpl itr2 =
- new BigdataStatementIteratorImpl(database, bnodes2, itr)
- .start(database.getExecutorService());
-
- final BigdataStatement[] stmts =
- new BigdataStatement[database.getChunkCapacity()];
-
- int i = 0;
- while (i < stmts.length && itr2.hasNext()) {
- stmts[i++] = itr2.next();
- if (i == stmts.length) {
- // process stmts[]
- n += removeAndNotify(stmts, i);
- i = 0;
- }
- }
- if (i > 0) {
- n += removeAndNotify(stmts, i);
- }
-
- }
+ n = StatementWriter.removeStatements(database, itr,
+ true/* computeClosureForStatementIdentifiers */,
+ changeLog, bnodes2);
+// final IAccessPath<ISPO> ap =
+// database.getAccessPath(s, p, o, c);
+//
+// final IChunkedOrderedIterator<ISPO> itr = ap.iterator();
+//
+// if (itr.hasNext()) {
+//
+// final BigdataStatementIteratorImpl itr2 =
+// new BigdataStatementIteratorImpl(database, bnodes2, itr)
+// .start(database.getExecutorService());
+//
+// final BigdataStatement[] stmts =
+// new BigdataStatement[database.getChunkCapacity()];
+//
+// int i = 0;
+// while (i < stmts.length && itr2.hasNext()) {
+// stmts[i++] = itr2.next();
+// if (i == stmts.length) {
+// // process stmts[]
+// n += removeAndNotify(stmts, i);
+// i = 0;
+// }
+// }
+// if (i > 0) {
+// n += removeAndNotify(stmts, i);
+// }
+//
+// }
+
}
}
@@ -2367,69 +2375,69 @@
}
- private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) {
-
- final SPO[] tmp = new SPO[numStmts];
+// private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) {
+//
+// final SPO[] tmp = new SPO[numStmts];
+//
+// for (int i = 0; i < tmp.length; i++) {
+//
+// final BigdataStatement stmt = stmts[i];
+//
+// /*
+// * Note: context position is not passed when statement identifiers
+// * are in use since the statement identifier is assigned based on
+// * the {s,p,o} triple.
+// */
+//
+// final SPO spo = new SPO(stmt);
+//
+// if (log.isDebugEnabled())
+// log.debug("adding: " + stmt.toString() + " (" + spo + ")");
+//
+// if(!spo.isFullyBound()) {
+//
+// throw new AssertionError("Not fully bound? : " + spo);
+//
+// }
+//
+// tmp[i] = spo;
+//
+// }
+//
+// /*
+// * Note: When handling statement identifiers, we clone tmp[] to avoid a
+// * side-effect on its order so that we can unify the assigned statement
+// * identifiers below.
+// *
+// * Note: In order to report back the [ISPO#isModified()] flag, we also
+// * need to clone tmp[] to avoid a side effect on its order. Therefore we
+// * now always clone tmp[].
+// */
+//// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts);
+// final long nwritten = database.removeStatements(tmp.clone(), numStmts);
+//
+// // Copy the state of the isModified() flag
+// {
+//
+// for (int i = 0; i < numStmts; i++) {
+//
+// if (tmp[i].isModified()) {
+//
+// stmts[i].setModified(true);
+//
+// changeLog.changeEvent(
+// new ChangeRecord(stmts[i], ChangeAction.REMOVED));
+//
+// }
+//
+// }
+//
+// }
+//
+// return nwritten;
+//
+// }
- for (int i = 0; i < tmp.length; i++) {
-
- final BigdataStatement stmt = stmts[i];
-
- /*
- * Note: context position is not passed when statement identifiers
- * are in use since the statement identifier is assigned based on
- * the {s,p,o} triple.
- */
-
- final SPO spo = new SPO(stmt);
-
- if (log.isDebugEnabled())
- log.debug("adding: " + stmt.toString() + " (" + spo + ")");
-
- if(!spo.isFullyBound()) {
-
- throw new AssertionError("Not fully bound? : " + spo);
-
- }
-
- tmp[i] = spo;
-
- }
-
- /*
- * Note: When handling statement identifiers, we clone tmp[] to avoid a
- * side-effect on its order so that we can unify the assigned statement
- * identifiers below.
- *
- * Note: In order to report back the [ISPO#isModified()] flag, we also
- * need to clone tmp[] to avoid a side effect on its order. Therefore we
- * now always clone tmp[].
- */
-// final long nwritten = writeSPOs(sids ? tmp.clone() : tmp, numStmts);
- final long nwritten = database.removeStatements(tmp.clone(), numStmts);
-
- // Copy the state of the isModified() flag
- {
-
- for (int i = 0; i < numStmts; i++) {
-
- if (tmp[i].isModified()) {
-
- stmts[i].setModified(true);
-
- changeLog.changeEvent(
- new ChangeRecord(stmts[i], ChangeAction.REMOVED));
-
- }
-
- }
-
- }
-
- return nwritten;
-
- }
-
public synchronized CloseableIteration<? extends Resource, SailException> getContextIDs()
throws SailException {
@@ -2695,7 +2703,9 @@
if(getTruthMaintenance()) {
// do TM, writing on the database.
- tm.assertAll((TempTripleStore)assertBuffer.getStatementStore());
+ tm.assertAll(
+ (TempTripleStore)assertBuffer.getStatementStore(),
+ changeLog, bnodes2);
// must be reallocated on demand.
assertBuffer = null;
@@ -2712,7 +2722,8 @@
if(getTruthMaintenance()) {
// do TM, writing on the database.
- tm.retractAll((TempTripleStore)retractBuffer.getStatementStore());
+ tm.retractAll((TempTripleStore)retractBuffer.getStatementStore(),
+ changeLog, bnodes2);
// must be re-allocated on demand.
retractBuffer = null;
Added: branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java (rev 0)
+++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/changesets/StatementWriter.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -0,0 +1,196 @@
+package com.bigdata.rdf.sail.changesets;
+
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import com.bigdata.rdf.internal.IV;
+import com.bigdata.rdf.model.BigdataBNode;
+import com.bigdata.rdf.model.BigdataStatement;
+import com.bigdata.rdf.sail.changesets.IChangeRecord.ChangeAction;
+import com.bigdata.rdf.spo.ISPO;
+import com.bigdata.rdf.spo.SPO;
+import com.bigdata.rdf.store.AbstractTripleStore;
+import com.bigdata.rdf.store.BigdataStatementIteratorImpl;
+import com.bigdata.relation.accesspath.IElementFilter;
+import com.bigdata.striterator.ChunkedArrayIterator;
+import com.bigdata.striterator.IChunkedOrderedIterator;
+
+public class StatementWriter {
+
+ protected static final Logger log = Logger.getLogger(StatementWriter.class);
+
+ public static long addStatements(final AbstractTripleStore database,
+ final AbstractTripleStore statementStore,
+ final boolean copyOnly,
+ final IElementFilter<ISPO> filter,
+ final IChunkedOrderedIterator<ISPO> itr,
+ final IChangeLog changeLog,
+ final Map<IV, BigdataBNode> bnodes) {
+
+ long n = 0;
+
+ if (itr.hasNext()) {
+
+ final BigdataStatementIteratorImpl itr2 =
+ new BigdataStatementIteratorImpl(database, bnodes, itr)
+ .start(database.getExecutorService());
+
+ final BigdataStatement[] stmts =
+ new BigdataStatement[database.getChunkCapacity()];
+
+ int i = 0;
+ while ((i = nextChunk(itr2, stmts)) > 0) {
+ n += addStatements(database, statementStore, copyOnly, filter,
+ stmts, i, changeLog);
+ }
+
+ }
+
+ return n;
+
+ }
+
+ private static long addStatements(final AbstractTripleStore database,
+ final AbstractTripleStore statementStore,
+ final boolean copyOnly,
+ final IElementFilter<ISPO> filter,
+ final BigdataStatement[] stmts,
+ final int numStmts,
+ final IChangeLog changeLog) {
+
+ final SPO[] tmp = allocateSPOs(stmts, numStmts);
+
+ final long n = database.addStatements(statementStore, copyOnly,
+ new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(),
+ null/* keyOrder */), filter);
+
+ // Copy the state of the isModified() flag and notify changeLog
+ for (int i = 0; i < numStmts; i++) {
+
+ if (tmp[i].isModified()) {
+
+ stmts[i].setModified(true);
+
+ changeLog.changeEvent(
+ new ChangeRecord(stmts[i], ChangeAction.ADDED));
+
+ }
+
+ }
+
+ return n;
+
+ }
+
+ public static long removeStatements(final AbstractTripleStore database,
+ final IChunkedOrderedIterator<ISPO> itr,
+ final boolean computeClosureForStatementIdentifiers,
+ final IChangeLog changeLog,
+ final Map<IV, BigdataBNode> bnodes) {
+
+ long n = 0;
+
+ if (itr.hasNext()) {
+
+ final BigdataStatementIteratorImpl itr2 =
+ new BigdataStatementIteratorImpl(database, bnodes, itr)
+ .start(database.getExecutorService());
+
+ final BigdataStatement[] stmts =
+ new BigdataStatement[database.getChunkCapacity()];
+
+ int i = 0;
+ while ((i = nextChunk(itr2, stmts)) > 0) {
+ n += removeStatements(database, stmts, i,
+ computeClosureForStatementIdentifiers, changeLog);
+ }
+
+ }
+
+ return n;
+
+ }
+
+ private static long removeStatements(final AbstractTripleStore database,
+ final BigdataStatement[] stmts,
+ final int numStmts,
+ final boolean computeClosureForStatementIdentifiers,
+ final IChangeLog changeLog) {
+
+ final SPO[] tmp = allocateSPOs(stmts, numStmts);
+
+ final long n = database.removeStatements(
+ new ChunkedArrayIterator<ISPO>(numStmts, tmp.clone(),
+ null/* keyOrder */),
+ computeClosureForStatementIdentifiers);
+
+ // Copy the state of the isModified() flag and notify changeLog
+ for (int i = 0; i < numStmts; i++) {
+
+ if (tmp[i].isModified()) {
+
+ stmts[i].setModified(true);
+
+ changeLog.changeEvent(
+ new ChangeRecord(stmts[i], ChangeAction.REMOVED));
+
+ }
+
+ }
+
+ return n;
+
+ }
+
+ private static int nextChunk(final Iterator<BigdataStatement> itr,
+ final BigdataStatement[] stmts) {
+
+ assert stmts != null && stmts.length > 0;
+
+ int i = 0;
+ while (itr.hasNext()) {
+ stmts[i++] = itr.next();
+ if (i == stmts.length) {
+ // stmts[] is full
+ return i;
+ }
+ }
+
+ /*
+ * stmts[] is empty (i = 0) or partially
+ * full (i > 0 && i < stmts.length)
+ */
+ return i;
+
+ }
+
+ private static SPO[] allocateSPOs(final BigdataStatement[] stmts,
+ final int numStmts) {
+
+ final SPO[] tmp = new SPO[numStmts];
+
+ for (int i = 0; i < tmp.length; i++) {
+
+ final BigdataStatement stmt = stmts[i];
+
+ final SPO spo = new SPO(stmt);
+
+ if (log.isDebugEnabled())
+ log.debug("writing: " + stmt.toString() + " (" + spo + ")");
+
+ if(!spo.isFullyBound()) {
+
+ throw new AssertionError("Not fully bound? : " + spo);
+
+ }
+
+ tmp[i] = spo;
+
+ }
+
+ return tmp;
+
+
+ }
+
+}
Modified: branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-05 20:30:37 UTC (rev 3735)
+++ branches/CHANGE_SET_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestChangeSets.java 2010-10-06 02:34:14 UTC (rev 3736)
@@ -292,7 +292,7 @@
}
- public void testTruthMaintenance() throws Exception {
+ public void testTMAdd() throws Exception {
final BigdataSail sail = getSail(getTriplesWithInference());
sail.initialize();
@@ -363,6 +363,113 @@
}
+ public void testTMRetract() throws Exception {
+
+ final BigdataSail sail = getSail(getTriplesWithInference());
+ sail.initialize();
+ final BigdataSailRepository repo = new BigdataSailRepository(sail);
+ final BigdataSailRepositoryConnection cxn =
+ (BigdataSailRepositoryConnection) repo.getConnection();
+ cxn.setAutoCommit(false);
+
+ final TestChangeLog changeLog = new TestChangeLog();
+ cxn.setChangeLog(changeLog);
+
+ try {
+
+ final BigdataValueFactory vf = (BigdataValueFactory) sail.getValueFactory();
+
+ final String ns = BD.NAMESPACE;
+
+ final URI a = vf.createURI(ns+"A");
+ final URI b = vf.createURI(ns+"B");
+ final URI c = vf.createURI(ns+"C");
+
+ final BigdataStatement[] explicitAdd = new BigdataStatement[] {
+ vf.createStatement(a, RDFS.SUBCLASSOF, b),
+ vf.createStatement(b, RDFS.SUBCLASSOF, c),
+ };
+
+ final BigdataStatement[] inferredAdd = new BigdataStatement[] {
+ vf.createStatement(a, RDF.TYPE, RDFS.CLASS),
+ vf.createStatement(a, RDFS.SUBCLASSOF, RDFS.RESOURCE),
+ vf.createStatement(a, RDFS.SUBCLASSOF, a),
+ vf.createStatement(a, RDFS.SUBCLASSOF, c),
+ vf.createStatement(b, RDF.TYPE, RDFS.CLASS),
+ vf.createStatement(b, RDFS.SUBCLASSOF, RDFS.RESOURCE),
+ vf.createStatement(b, RDFS.SUBCLASSOF, b),
+ vf.createStatement(c, RDF.TYPE, RDFS.CLASS),
+ vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE),
+ vf.createStatement(c, RDFS.SUBCLASSOF, c),
+ };
+
+ final BigdataStatement[] explicitRemove = new BigdataStatement[] {
+ vf.createStatement(b, RDFS.SUBCLASSOF, c),
+ };
+
+ final BigdataStatement[] inferredRemove = new BigdataStatement[] {
+ vf.createStatement(a, RDFS.SUBCLASSOF, c),
+ vf.createStatement(c, RDF.TYPE, RDFS.CLASS),
+ vf.createStatement(c, RDFS.SUBCLASSOF, RDFS.RESOURCE),
+ vf.createStatement(c, RDFS.SUBCLASSOF, c),
+ };
+
+/**/
+ cxn.setNamespace("ns", ns);
+
+ for (BigdataStatement stmt : explicitAdd) {
+ cxn.add(stmt);
+ }
+
+ cxn.commit();//
+
+ {
+
+ final Collection<IChangeRecord> expected =
+ new LinkedList<IChangeRecord>();
+ for (BigdataStatement stmt : explicitAdd) {
+ expected.add(new ChangeRecord(stmt, ChangeAction.ADDED));
+ }
+ for (BigdataStatement stmt : inferredAdd) {
+ expected.add(new ChangeRecord(stmt, ChangeAction.ADDED));
+ }
+
+ compare(expected, changeLog.getChangeSet());
+
+ }
+
+ for (BigdataStatement stmt : explicitRemove) {
+ cxn.remove(stmt);
+ }
+
+ cxn.commit();//
+
+ {
+
+ final Collection<IChangeRecord> expected =
+ new LinkedList<IChangeRecord>();
+ for (BigdataStatement stmt : explicitRemove) {
+ expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED));
+ }
+ for (BigdataStatement stmt : inferredRemove) {
+ expected.add(new ChangeRecord(stmt, ChangeAction.REMOVED));
+ }
+
+ compare(expected, changeLog.getChangeSet());
+
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("\n" + sail.getDatabase().dumpStore(true, true, false));
+ }
+
+ } finally {
+ cxn.close();
+ sail.__tearDownUnitTest();
+ }
+
+ }
+
private void compare(final Collection<IChangeRecord> expected,
final Collection<IChangeRecord> actual) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|