|
From: <mrp...@us...> - 2010-10-08 14:40:54
|
Revision: 3754
http://bigdata.svn.sourceforge.net/bigdata/?rev=3754&view=rev
Author: mrpersonick
Date: 2010-10-08 14:40:48 +0000 (Fri, 08 Oct 2010)
Log Message:
-----------
finishing up the change sets implementation
Added Paths:
-----------
branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InMemChangeLog.java
Added: branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InMemChangeLog.java
===================================================================
--- branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InMemChangeLog.java (rev 0)
+++ branches/CHANGE_SET_BRANCH/bigdata-rdf/src/java/com/bigdata/rdf/changesets/InMemChangeLog.java 2010-10-08 14:40:48 UTC (rev 3754)
@@ -0,0 +1,163 @@
+package com.bigdata.rdf.changesets;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import com.bigdata.rdf.model.BigdataStatement;
+import com.bigdata.rdf.spo.ISPO;
+import com.bigdata.rdf.store.AbstractTripleStore;
+import com.bigdata.rdf.store.BigdataStatementIterator;
+import com.bigdata.striterator.ChunkedArrayIterator;
+
+/**
+ * This is a very simple implementation of a change log. NOTE: This is not
+ * a particularly great implementation. First of all it ends up storing
+ * two copies of the change set. Secondly it needs to be smarter about
+ * concurrency, or maybe we can be smart about it when we do the
+ * implementation on the other side (the SAIL connection can just write
+ * change events to a buffer and then the buffer can be drained by
+ * another thread that doesn't block the actual read/write operations,
+ * although then we need to be careful not to issue the committed()
+ * notification before the buffer is drained).
+ *
+ * @author mike
+ *
+ */
+public class InMemChangeLog implements IChangeLog {
+
+ protected static final Logger log = Logger.getLogger(InMemChangeLog.class);
+
+ /**
+ * Running tally of new changes since the last commit notification.
+ */
+ private final Map<ISPO,IChangeRecord> changeSet =
+ new HashMap<ISPO, IChangeRecord>();
+
+ /**
+ * Keep a record of the change set as of the last commit.
+ */
+ private final Map<ISPO,IChangeRecord> committed =
+ new HashMap<ISPO, IChangeRecord>();
+
+ /**
+ * See {@link IChangeLog#changeEvent(IChangeRecord)}.
+ */
+ public synchronized void changeEvent(final IChangeRecord record) {
+
+ if (log.isInfoEnabled())
+ log.info(record);
+
+ changeSet.put(record.getStatement(), record);
+
+ }
+
+ /**
+ * See {@link IChangeLog#transactionCommited()}.
+ */
+ public synchronized void transactionCommited() {
+
+ if (log.isInfoEnabled())
+ log.info("transaction committed");
+
+ committed.clear();
+
+ committed.putAll(changeSet);
+
+ changeSet.clear();
+
+ }
+
+ /**
+ * See {@link IChangeLog#transactionAborted()}.
+ */
+ public synchronized void transactionAborted() {
+
+ if (log.isInfoEnabled())
+ log.info("transaction aborted");
+
+ changeSet.clear();
+
+ }
+
+ /**
+ * Return the change set as of the last commmit point.
+ *
+ * @return
+ * a collection of {@link IChangeRecord}s as of the last commit
+ * point
+ */
+ public Collection<IChangeRecord> getLastCommit() {
+
+ return committed.values();
+
+ }
+
+ /**
+ * Return the change set as of the last commmit point, using the supplied
+ * database to resolve ISPOs to BigdataStatements.
+ *
+ * @return
+ * a collection of {@link IChangeRecord}s as of the last commit
+ * point
+ */
+ public Collection<IChangeRecord> getLastCommit(final AbstractTripleStore db) {
+
+ return resolve(db, committed.values());
+
+ }
+
+ /**
+ * Use the supplied database to turn a set of ISPO change records into
+ * BigdataStatement change records. BigdataStatements also implement
+ * ISPO, the difference being that BigdataStatements also contain
+ * materialized RDF terms for the 3 (or 4) positions, in addition to just
+ * the internal identifiers (IVs) for those terms.
+ *
+ * @param db
+ * the database containing the lexicon needed to materialize
+ * the BigdataStatement objects
+ * @param unresolved
+ * the ISPO change records that came from IChangeLog notification
+ * events
+ * @return
+ * the fully resolves BigdataStatement change records
+ */
+ private Collection<IChangeRecord> resolve(final AbstractTripleStore db,
+ final Collection<IChangeRecord> unresolved) {
+
+ final Collection<IChangeRecord> resolved =
+ new LinkedList<IChangeRecord>();
+
+ // collect up the ISPOs out of the unresolved change records
+ final ISPO[] spos = new ISPO[unresolved.size()];
+ int i = 0;
+ for (IChangeRecord rec : unresolved) {
+ spos[i++] = rec.getStatement();
+ }
+
+ // use the database to resolve them into BigdataStatements
+ final BigdataStatementIterator it =
+ db.asStatementIterator(
+ new ChunkedArrayIterator<ISPO>(i, spos, null/* keyOrder */));
+
+ /*
+ * the BigdataStatementIterator will produce BigdataStatement objects
+ * in the same order as the original ISPO array
+ */
+ for (IChangeRecord rec : unresolved) {
+
+ final BigdataStatement stmt = it.next();
+
+ resolved.add(new ChangeRecord(stmt, rec.getChangeAction()));
+
+ }
+
+ return resolved;
+
+ }
+
+
+
+}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|