|
From: <tho...@us...> - 2010-07-28 15:57:18
|
Revision: 3334
http://bigdata.svn.sourceforge.net/bigdata/?rev=3334&view=rev
Author: thompsonbry
Date: 2010-07-28 15:57:11 +0000 (Wed, 28 Jul 2010)
Log Message:
-----------
Modified the NanoSparqlServer to support URLs containing a non-default namespace such as http://localhost:80/namespace/LUBM_U50. This was done so we can start the NanoSparqlServer regardless of whether a KB exists at the default namespace. I might make a few more changes along these lines to further clean things up.
This change moves the resolution of the AbstractTripleStore, the BigdataSail, etc. inside of the query execution. I have not yet determined whether this imposes any significant overhead when benchmarking.
NanoSparqlServer now obtains a read-only transaction when it starts as of the lastCommitTime on the bigdata instance. This is not really ideal because there is little point to starting it first if we are going to be reading from an empty commit point so I may revisit this point later (perhaps always reading from the lastCommitTime unless the URL includes "/namespace/timestamp", in which case we read from that timestamp using a full transaction). Also, it is possible that abrupt termination of the JVM process (kill -9) could fail to release the read-only transaction and thus keep history from being released.
Modified Paths:
--------------
trunk/bigdata-perf/lubm/build.properties
trunk/bigdata-perf/lubm/src/resources/config/config.kb.sparql
trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java
trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepositoryConnection.java
trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java
trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java
Property Changed:
----------------
trunk/bigdata-perf/lubm/
Property changes on: trunk/bigdata-perf/lubm
___________________________________________________________________
Modified: svn:ignore
- ant-build
*result.txt
+ ant-build
*result.txt
sample.query.sparql
Modified: trunk/bigdata-perf/lubm/build.properties
===================================================================
--- trunk/bigdata-perf/lubm/build.properties 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-perf/lubm/build.properties 2010-07-28 15:57:11 UTC (rev 3334)
@@ -46,7 +46,7 @@
lubm.namespace=LUBM_U${lubm.univ}
# Laptop benchmark data directory.
-#lubm.baseDir=d:/bigdata-perf-analysis/lubm/lubm_${lubm.univ}
+#lubm.baseDir=d:/bigdata-perf-analysis/lubm/U${lubm.univ}
# Server benchmark directory.
#lubm.baseDir=/nas/data/lubm/U${lubm.univ}
# Windows Server 2008 benchmark data directory.
@@ -78,7 +78,7 @@
lubm.journalPropertyFile=${journalMode}Store.properties
# The name of the file used for the journal.
-#lubm.journalFile=${lubm.baseDir}/U${lubm.univ}/bigdata-lubm.${journalMode}.jnl
+#lubm.journalFile=${lubm.baseDir}/bigdata-lubm.${journalMode}.jnl
# Note: This is on the large volume.
#lubm.journalFile=/data/lubm/U${lubm.univ}/bigdata-lubm.${journalMode}.jnl
# SSD.
Modified: trunk/bigdata-perf/lubm/src/resources/config/config.kb.sparql
===================================================================
--- trunk/bigdata-perf/lubm/src/resources/config/config.kb.sparql 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-perf/lubm/src/resources/config/config.kb.sparql 2010-07-28 15:57:11 UTC (rev 3334)
@@ -8,4 +8,7 @@
class=edu.lehigh.swat.bench.ubt.bigdata.SparqlRepositoryFactory
ontology=ignored
data=ignored
+# Use the default namespace specified to NanoSparqlServer
database=http://localhost:80
+# Use a specific namespace regardless of the default specified to NanoSparqlServer
+#database=http://localhost:80/namespace/LUBM_U50
Modified: trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java
===================================================================
--- trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepository.java 2010-07-28 15:57:11 UTC (rev 3334)
@@ -21,12 +21,17 @@
return ((BigdataSail) getSail()).getDatabase();
}
+
+ @Override
+ public BigdataSail getSail() {
+ return (BigdataSail)super.getSail();
+ }
- private BigdataSail getBigdataSail() {
-
- return (BigdataSail) getSail();
-
- }
+// private BigdataSail getBigdataSail() {
+//
+// return (BigdataSail) getSail();
+//
+// }
@Override
public SailRepositoryConnection getConnection() throws RepositoryException {
@@ -55,7 +60,7 @@
throws RepositoryException {
return new BigdataSailRepositoryConnection(this,
- getBigdataSail().getReadOnlyConnection());
+ getSail().getReadOnlyConnection());
}
/**
@@ -69,7 +74,7 @@
throws RepositoryException {
return new BigdataSailRepositoryConnection(this,
- getBigdataSail().getReadOnlyConnection(timestamp));
+ getSail().getReadOnlyConnection(timestamp));
}
@@ -79,7 +84,7 @@
try {
return new BigdataSailRepositoryConnection(this,
- getBigdataSail().getReadWriteConnection());
+ getSail().getReadWriteConnection());
} catch (IOException e) {
@@ -95,7 +100,7 @@
try {
return new BigdataSailRepositoryConnection(this,
- getBigdataSail().getUnisolatedConnection());
+ getSail().getUnisolatedConnection());
} catch (InterruptedException e) {
Modified: trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepositoryConnection.java
===================================================================
--- trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepositoryConnection.java 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSailRepositoryConnection.java 2010-07-28 15:57:11 UTC (rev 3334)
@@ -42,8 +42,15 @@
}
+ @Override
+ public BigdataSailRepository getRepository() {
+ return (BigdataSailRepository)super.getRepository();
+ }
+
/**
- * Overriden to capture query hints from SPARQL queries. Query hints are
+ * {@inheritDoc}
+ * <p>
+ * Overridden to capture query hints from SPARQL queries. Query hints are
* embedded in query strings as namespaces.
* See {@link BD#QUERY_HINTS_NAMESPACE} for more information.
*/
@@ -61,7 +68,9 @@
}
/**
- * Overriden to capture query hints from SPARQL queries. Query hints are
+ * {@inheritDoc}
+ * <p>
+ * Overridden to capture query hints from SPARQL queries. Query hints are
* embedded in query strings as namespaces.
* See {@link BD#QUERY_HINTS_NAMESPACE} for more information.
*/
@@ -76,9 +85,11 @@
}
/**
- * Overriden to capture query hints from SPARQL queries. Query hints are
- * embedded in query strings as namespaces.
- * See {@link BD#QUERY_HINTS_NAMESPACE} for more information.
+ * {@inheritDoc}
+ * <p>
+ * Overridden to capture query hints from SPARQL queries. Query hints are
+ * embedded in query strings as namespaces. See
+ * {@link BD#QUERY_HINTS_NAMESPACE} for more information.
*/
@Override
public SailBooleanQuery prepareBooleanQuery(final QueryLanguage ql,
@@ -91,7 +102,9 @@
}
/**
- * Overriden to capture query hints from SPARQL queries. Query hints are
+ * {@inheritDoc}
+ * <p>
+ * Overridden to capture query hints from SPARQL queries. Query hints are
* embedded in query strings as namespaces.
* See {@link BD#QUERY_HINTS_NAMESPACE} for more information.
*/
@@ -121,6 +134,8 @@
}
/**
+ * {@inheritDoc}
+ * <p>
* Note: auto-commit is an EXTREMELY bad idea. Performance will be terrible.
* The database will swell to an outrageous size. TURN OFF AUTO COMMIT.
*
Modified: trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java
===================================================================
--- trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlClient.java 2010-07-28 15:57:11 UTC (rev 3334)
@@ -275,7 +275,8 @@
final int rc = conn.getResponseCode();
if(rc < 200 || rc >= 300) {
- throw new IOException(conn.getResponseMessage());
+ throw new IOException(rc + " : "
+ + conn.getResponseMessage()+" : "+url);
}
if (log.isDebugEnabled()) {
Modified: trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java
===================================================================
--- trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2010-07-28 13:07:06 UTC (rev 3333)
+++ trunk/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2010-07-28 15:57:11 UTC (rev 3334)
@@ -73,15 +73,18 @@
import com.bigdata.journal.AbstractJournal;
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.IJournal;
+import com.bigdata.journal.ITransactionService;
import com.bigdata.journal.ITx;
import com.bigdata.journal.Journal;
import com.bigdata.journal.TimestampUtility;
import com.bigdata.rdf.sail.BigdataSail;
import com.bigdata.rdf.sail.BigdataSailGraphQuery;
import com.bigdata.rdf.sail.BigdataSailRepository;
+import com.bigdata.rdf.sail.BigdataSailRepositoryConnection;
import com.bigdata.rdf.sail.bench.NanoSparqlClient.QueryType;
import com.bigdata.rdf.store.AbstractTripleStore;
import com.bigdata.relation.AbstractResource;
+import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.jini.JiniClient;
import com.bigdata.util.concurrent.DaemonThreadFactory;
import com.bigdata.util.httpd.AbstractHTTPD;
@@ -95,9 +98,7 @@
* @todo Allow configuration options for the sparql endpoint either as URI
* parameters, in the property file, as request headers, or as query hints
* using the PREFIX mechanism.
- *
- * @todo Make each namespace available at <code>/namespace?query=...</code>?
- *
+
* @todo Allow timestamp for server reads as protocol parameter (URL query
* parameter or header).
*
@@ -107,8 +108,6 @@
* as well).
*
* @todo Add command to kill a running query.
- *
- * @todo Add command to drop the LRUNexus cache.
*/
public class NanoSparqlServer extends AbstractHTTPD {
@@ -133,16 +132,26 @@
*/
static private final String charset = "UTF-8";
+// /**
+// * The target Sail.
+// */
+// private final BigdataSail sail;
+//
+// /**
+// * The target repository.
+// */
+// private final BigdataSailRepository repo;
+
/**
- * The target Sail.
+ * The configuration object.
*/
- private final BigdataSail sail;
-
+ private final Config config;
+
/**
- * The target repository.
- */
- private final BigdataSailRepository repo;
-
+ * Provides access to the bigdata database.
+ */
+ private final IIndexManager indexManager;
+
/**
* @todo use to decide ASK, DESCRIBE, CONSTRUCT, SELECT, EXPLAIN, etc.
*/
@@ -163,7 +172,7 @@
final String query;
/** The timestamp when the query was accepted (ns). */
final long begin;
- public RunningQuery(long queryId,String query,long begin) {
+ public RunningQuery(long queryId, String query, long begin) {
this.queryId = queryId;
this.query = query;
this.begin = begin;
@@ -188,22 +197,16 @@
super(config.port);
- // resolve the kb instance of interest.
- final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager
- .getResourceLocator().locate(config.namespace, ITx.UNISOLATED);
-
- if (tripleStore == null) {
-
- throw new RuntimeException("No such kb: " + config.namespace);
-
- }
+ if (config.namespace == null)
+ throw new IllegalArgumentException();
- // since the kb exists, wrap it as a sail.
- sail = new BigdataSail(tripleStore);
-
- repo = new BigdataSailRepository(sail);
- repo.initialize();
-
+ if(indexManager == null)
+ throw new IllegalArgumentException();
+
+ this.config = config;
+
+ this.indexManager = indexManager;
+
// used to parse qeries.
engine = new SPARQLParserFactory().getParser();
@@ -230,13 +233,18 @@
* loaded since they must query each shard for the primary statement
* index and the TERM2ID index.
*/
- protected StringBuilder getKBInfo() {
+ protected StringBuilder getKBInfo(final String namespace,
+ final long timestamp) {
final StringBuilder sb = new StringBuilder();
+ BigdataSailRepositoryConnection conn = null;
+
try {
- final AbstractTripleStore tripleStore = sail.getDatabase();
+ conn = getQueryConnection(namespace, timestamp);
+
+ final AbstractTripleStore tripleStore = conn.getTripleStore();
sb.append("class\t = " + tripleStore.getClass().getName() + "\n");
@@ -282,35 +290,29 @@
.getIndexMetadata()
.getWriteRetentionQueueCapacity() + "\n");
- sb.append(BigdataSail.Options.STAR_JOINS + "=" + sail.isStarJoins()
- + "\n");
+ sb.append(BigdataSail.Options.STAR_JOINS + "="
+ + conn.getRepository().getSail().isStarJoins() + "\n");
sb.append(AbstractResource.Options.MAX_PARALLEL_SUBQUERIES + "="
+ tripleStore.getMaxParallelSubqueries() + "\n");
- /*
- * Stuff which is specific to a local/embedded database.
- */
- if (tripleStore.getIndexManager() instanceof IJournal) {
-
- final AbstractJournal jnl = (AbstractJournal) sail.getDatabase()
- .getIndexManager();
-
- sb.append("file\t= " + jnl.getFile()+"\n");
-
- sb.append("BufferMode\t= " + jnl.getBufferStrategy().getBufferMode()+"\n");
-
- sb.append("nextOffset\t= "
- + jnl.getRootBlockView().getNextOffset() + "\n");
-
- }
-
// sb.append(tripleStore.predicateUsage());
} catch (Throwable t) {
log.warn(t.getMessage(), t);
+ } finally {
+
+ if(conn != null) {
+ try {
+ conn.close();
+ } catch (RepositoryException e) {
+ log.error(e, e);
+ }
+
+ }
+
}
return sb;
@@ -333,6 +335,9 @@
}
}
+ /**
+ * FIXME Must abort any open transactions.
+ */
synchronized public void shutdownNow() {
System.err.println("Immediate shutdown");
// interrupt all running queries.
@@ -460,14 +465,76 @@
if("/status".equals(uri)) {
+ // @todo Could list the known namespaces.
return doStatus(uri, method, header, params);
-
- }
-
+
+ }
+
+ if (uri.startsWith("/namespace/")) {
+
+ // @todo allow status query against any namespace.
+ return doQuery(uri, method, header, params);
+
+ }
+
return new Response(HTTP_NOTFOUND, MIME_TEXT_PLAIN, uri);
}
+ /**
+ * Return the namespace which will be used to execute the query. The
+ * namespace is represented by the first component of the URI. If there is
+ * no namespace, then return the configured default namespace.
+ *
+ * @param uri
+ * The URI path string.
+ *
+ * @return The namespace.
+ */
+ private String getNamespace(final String uri) {
+
+// // locate the "//" after the protocol.
+// final int index = uri.indexOf("//");
+
+ if(!uri.startsWith("/namespace/")) {
+ // use the default namespace.
+ return config.namespace;
+ }
+
+ // locate the next "/" in the URI path.
+ final int beginIndex = uri.indexOf('/', 1/* fromIndex */);
+
+ // locate the next "/" in the URI path.
+ int endIndex = uri.indexOf('/', beginIndex + 1/* fromIndex */);
+
+ if (endIndex == -1) {
+ // use the rest of the URI.
+ endIndex = uri.length();
+ }
+
+ // return the namespace.
+ return uri.substring(beginIndex + 1, endIndex);
+
+ }
+
+ /**
+ * Return the timestamp which will be used to execute the query.
+ *
+ * @todo the configured timestamp should only be used for the default
+ * namespace (or it should be configured for each graph explicitly, or
+ * we should bundle the (namespace,timestamp) together as a single
+ * object).
+ *
+ * @todo use path for the timestamp or acquire read lock when the server
+ * starts against a specific namespace?
+ */
+ private long getTimestamp(final String uri,
+ final LinkedHashMap<String, Vector<String>> params) {
+
+ return config.timestamp;
+
+ }
+
/**
* Respond to a status request.
*
@@ -495,19 +562,26 @@
if (showKBInfo) {
// General information on the connected kb.
- sb.append(getKBInfo());
+ sb.append(getKBInfo(getNamespace(uri), getTimestamp(uri, params)));
}
- if (repo.getDatabase().getIndexManager() instanceof IJournal) {
+ if (indexManager instanceof IJournal) {
/*
* Stuff which is specific to a local/embedded database.
*/
- final AbstractJournal jnl = (AbstractJournal) repo.getDatabase()
- .getIndexManager();
+ final AbstractJournal jnl = (AbstractJournal) indexManager;
+ sb.append("file\t= " + jnl.getFile() + "\n");
+
+ sb.append("BufferMode\t= "
+ + jnl.getBufferStrategy().getBufferMode() + "\n");
+
+ sb.append("nextOffset\t= " + jnl.getRootBlockView().getNextOffset()
+ + "\n");
+
if (LRUNexus.INSTANCE != null) {
sb.append(LRUNexus.Options.CLASS + "="
@@ -522,7 +596,6 @@
}
// show the disk access details.
-
sb.append(jnl.getBufferStrategy().getCounters().toString()+"\n\n");
}
@@ -592,6 +665,10 @@
final Properties header,
final LinkedHashMap<String, Vector<String>> params) throws Exception {
+ final String namespace = getNamespace(uri);
+
+ final long timestamp = getTimestamp(uri, params);
+
final String queryStr = params.get("query").get(0);
if (queryStr == null) {
@@ -615,8 +692,8 @@
*/
final PipedOutputStream os = new PipedOutputStream();
final InputStream is = new PipedInputStream(os);//Bytes.kilobyte32*8/*pipeSize*/);
- final FutureTask<Void> ft = new FutureTask<Void>(getQueryTask(queryStr,
- os));
+ final FutureTask<Void> ft = new FutureTask<Void>(getQueryTask(
+ namespace, timestamp, queryStr, os));
try {
// Choose an appropriate MIME type.
@@ -711,8 +788,9 @@
*
* @throws MalformedQueryException
*/
- private Callable<Void> getQueryTask(final String queryStr,
- final PipedOutputStream os) throws MalformedQueryException {
+ private Callable<Void> getQueryTask(final String namespace,
+ final long timestamp, final String queryStr,
+ final PipedOutputStream os) throws MalformedQueryException {
/*
* Parse the query so we can figure out how it will need to be executed.
@@ -736,142 +814,283 @@
break;
case DESCRIBE:
case CONSTRUCT:
- return new GraphQueryTask(queryStr, os);
+ return new GraphQueryTask(namespace, timestamp, queryStr, os);
case SELECT:
- return new TupleQueryTask(queryStr, os);
+ return new TupleQueryTask(namespace, timestamp, queryStr, os);
}
throw new RuntimeException("Unknown query type: " + queryType);
}
- /**
- * Note: A read-only connection from the lastCommitTime
- *
- * @throws RepositoryException
- */
- protected SailRepositoryConnection getQueryConnection()
- throws RepositoryException {
+ /**
+ * Note: A read-only connection.
+ *
+ * @param namespace
+ * @param timestamp
+ *
+ * @throws RepositoryException
+ *
+ * @todo enforce historical query by making sure timestamps conform (we do
+ * not want to allow read/write tx queries unless update semantics are
+ * introduced ala SPARQL 1.1).
+ *
+ * @todo Use a distributed read-only tx for queries (it would be nice if a
+ * tx used 2PL to specify which namespaces it could touch).
+ */
+ protected BigdataSailRepositoryConnection getQueryConnection(
+ final String namespace, final long timestamp)
+ throws RepositoryException {
+
+ // resolve the default namespace.
+ final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager
+ .getResourceLocator().locate(namespace, timestamp);
- return repo.getReadOnlyConnection();
+ if (tripleStore == null) {
+ throw new RuntimeException("Not found: namespace=" + namespace
+ + ", timestamp=" + TimestampUtility.toString(timestamp));
+
+ }
+
+ /*
+ * Since the kb exists, wrap it as a sail.
+ *
+ * @todo cache? close when not in use any more?
+ */
+ final BigdataSail sail = new BigdataSail(tripleStore);
+
+ final BigdataSailRepository repo = new BigdataSailRepository(sail);
+
+ repo.initialize();
+
+ return (BigdataSailRepositoryConnection) repo
+ .getReadOnlyConnection(timestamp);
+
}
/**
- * Executes a tuple query.
+ * Abstract base class for running queries handles the timing, pipe,
+ * reporting, obtains the connection, and provides the finally {} semantics
+ * for each type of query task.
*
- * @todo Extract a base class which handles the timing, pipe, reporting,
- * obtains the connection, and provides the finally {} semantics for
- * each type of query task.
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ * @version $Id$
*/
- private class TupleQueryTask implements Callable<Void> {
+ abstract private class AbstractQueryTask implements Callable<Void> {
+
+ /** The namespace against which the query will be run. */
+ private final String namespace;
- private final String queryStr;
- private final PipedOutputStream os;
+ /**
+ * The timestamp of the view for that namespace against which the query
+ * will be run.
+ */
+ private final long timestamp;
- public TupleQueryTask(final String queryStr, final PipedOutputStream os) {
+ /** The SPARQL query string. */
+ protected final String queryStr;
- this.queryStr = queryStr;
- this.os = os;
+ /** A pipe used to incrementally deliver the results to the client. */
+ private final PipedOutputStream os;
- }
+ /**
+ *
+ * @param namespace
+ * The namespace against which the query will be run.
+ * @param timestamp
+ * The timestamp of the view for that namespace against which
+ * the query will be run.
+ * @param queryStr
+ * The SPARQL query string.
+ * @param os
+ * A pipe used to incrementally deliver the results to the
+ * client.
+ */
+ protected AbstractQueryTask(final String namespace,
+ final long timestamp, final String queryStr,
+ final PipedOutputStream os) {
- public Void call() throws Exception {
- final Long queryId = Long.valueOf(queryIdFactory.incrementAndGet());
- final SailRepositoryConnection cxn = getQueryConnection();
- try {
- final long begin = System.nanoTime();
- queries.put(queryId, new RunningQuery(queryId.longValue(), queryStr, begin));
- final TupleQuery query = cxn.prepareTupleQuery(
- QueryLanguage.SPARQL, queryStr);
- query.evaluate(new SPARQLResultsXMLWriter(new XMLWriter(os)));
- os.close();
- return null;
- } catch (Throwable t) {
- // launder and rethrow the exception.
- throw launderThrowable(t,os);
- } finally {
- try {
- cxn.close();
- } finally {
- queries.remove(queryId);
- }
- }
+ this.namespace = namespace;
+ this.timestamp = timestamp;
+ this.queryStr = queryStr;
+ this.os = os;
+
+ }
+
+ /**
+ * Execute the query.
+ *
+ * @param cxn
+ * The connection.
+ * @param os
+ * Where the write the query results.
+ *
+ * @throws Exception
+ */
+ abstract protected void doQuery(SailRepositoryConnection cxn,
+ OutputStream os) throws Exception;
+
+ final public Void call() throws Exception {
+ final Long queryId = Long.valueOf(queryIdFactory.incrementAndGet());
+ final SailRepositoryConnection cxn = getQueryConnection(namespace,
+ timestamp);
+ final long begin = System.nanoTime();
+ try {
+ queries.put(queryId, new RunningQuery(queryId.longValue(),
+ queryStr, begin));
+ doQuery(cxn, os);
+ os.flush();
+ return null;
+ } catch (Throwable t) {
+ // launder and rethrow the exception.
+ throw launderThrowable(t, os);
+ } finally {
+ queries.remove(queryId);
+ try {
+ os.close();
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ try {
+ cxn.close();
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Executes a tuple query.
+ */
+ private class TupleQueryTask extends AbstractQueryTask {
+
+ public TupleQueryTask(final String namespace, final long timestamp,
+ final String queryStr, final PipedOutputStream os) {
+
+ super(namespace, timestamp, queryStr, os);
+
}
+ protected void doQuery(final SailRepositoryConnection cxn,
+ final OutputStream os) throws Exception {
+
+ final TupleQuery query = cxn.prepareTupleQuery(
+ QueryLanguage.SPARQL, queryStr);
+
+ query.evaluate(new SPARQLResultsXMLWriter(new XMLWriter(os)));
+
+ }
+
+// public Void call() throws Exception {
+// final Long queryId = Long.valueOf(queryIdFactory.incrementAndGet());
+// final SailRepositoryConnection cxn = getQueryConnection();
+// try {
+// final long begin = System.nanoTime();
+// queries.put(queryId, new RunningQuery(queryId.longValue(), queryStr, begin));
+// final TupleQuery query = cxn.prepareTupleQuery(
+// QueryLanguage.SPARQL, queryStr);
+// query.evaluate(new SPARQLResultsXMLWriter(new XMLWriter(os)));
+// os.close();
+// return null;
+// } catch (Throwable t) {
+// // launder and rethrow the exception.
+// throw launderThrowable(t,os);
+// } finally {
+// try {
+// cxn.close();
+// } finally {
+// queries.remove(queryId);
+// }
+// }
+// }
+
}
/**
* Executes a graph query.
*/
- private class GraphQueryTask implements Callable<Void> {
+ private class GraphQueryTask extends AbstractQueryTask {
- private final String queryStr;
- private final PipedOutputStream os;
+ public GraphQueryTask(final String namespace, final long timestamp,
+ final String queryStr, final PipedOutputStream os) {
- public GraphQueryTask(final String queryStr, final PipedOutputStream os) {
+ super(namespace,timestamp,queryStr,os);
- this.queryStr = queryStr;
- this.os = os;
-
}
- public Void call() throws Exception {
- final Long queryId = Long.valueOf(queryIdFactory.incrementAndGet());
- final SailRepositoryConnection cxn = getQueryConnection();
- try {
- final long begin = System.nanoTime();
- queries.put(queryId, new RunningQuery(queryId.longValue(), queryStr, begin));
- final BigdataSailGraphQuery query = (BigdataSailGraphQuery) cxn
- .prepareGraphQuery(QueryLanguage.SPARQL, queryStr);
- query.evaluate(new RDFXMLWriter(os));
- os.close();
- return null;
- } catch (Throwable t) {
- throw launderThrowable(t, os);
- } finally {
- try {
- cxn.close();
- } finally {
- queries.remove(queryId);
- }
- }
- }
-
+// public Void call() throws Exception {
+// final Long queryId = Long.valueOf(queryIdFactory.incrementAndGet());
+// final SailRepositoryConnection cxn = getQueryConnection();
+// try {
+// final long begin = System.nanoTime();
+// queries.put(queryId, new RunningQuery(queryId.longValue(), queryStr, begin));
+// final BigdataSailGraphQuery query = (BigdataSailGraphQuery) cxn
+// .prepareGraphQuery(QueryLanguage.SPARQL, queryStr);
+// query.evaluate(new RDFXMLWriter(os));
+// os.close();
+// return null;
+// } catch (Throwable t) {
+// throw launderThrowable(t, os);
+// } finally {
+// try {
+// cxn.close();
+// } finally {
+// queries.remove(queryId);
+// }
+// }
+// }
+
+ @Override
+ protected void doQuery(final SailRepositoryConnection cxn,
+ final OutputStream os) throws Exception {
+
+ final BigdataSailGraphQuery query = (BigdataSailGraphQuery) cxn
+ .prepareGraphQuery(QueryLanguage.SPARQL, queryStr);
+
+ query.evaluate(new RDFXMLWriter(os));
+
+ }
+
}
- /**
- * Send a STOP message to the service
- *
- * @param port The port for that service.
- *
- * @throws IOException
- *
- * @todo This winds up warning <pre> java.net.SocketTimeoutException: Read timed out</pre>
- * even though the shutdown request was accepted and processed by the server. I'm not
- * sure why.
- */
- public static void sendStop(final int port) throws IOException {
-
- final URL url = new URL("http://localhost:" + port+"/stop");
- HttpURLConnection conn = null;
- try {
+ /**
+ * Send a STOP message to the service
+ *
+ * @param port
+ * The port for that service.
+ *
+ * @throws IOException
+ *
+ * @todo This winds up warning <code>
+ * java.net.SocketTimeoutException: Read timed out
+ * </code> even though the shutdown request was
+ * accepted and processed by the server. I'm not sure why.
+ */
+ public static void sendStop(final int port) throws IOException {
- conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("POST");
- conn.setDoInput(true); // true to read from the server.
- conn.setDoOutput(true); // true to write to the server.
- conn.setUseCaches(false);
- conn.setReadTimeout(2000/* ms */);
- conn.setRequestProperty("Content-Type",
- "application/x-www-form-urlencoded");
- conn.setRequestProperty("Content-Length", "" + Integer.toString(0));
- conn.setRequestProperty("Content-Language", "en-US");
+ final URL url = new URL("http://localhost:" + port + "/stop");
+ HttpURLConnection conn = null;
+ try {
- // Send request
- conn.getOutputStream().close();
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ conn.setDoInput(true); // true to read from the server.
+ conn.setDoOutput(true); // true to write to the server.
+ conn.setUseCaches(false);
+ conn.setReadTimeout(2000/* ms */);
+ conn.setRequestProperty("Content-Type",
+ "application/x-www-form-urlencoded");
+ conn.setRequestProperty("Content-Length", "" + Integer.toString(0));
+ conn.setRequestProperty("Content-Language", "en-US");
- // connect.
+ // Send request
+ conn.getOutputStream().close();
+
+ // connect.
try {
conn.connect();
@@ -913,16 +1132,21 @@
public int port;
/**
- * The namespace of the KB instance.
+ * The default namespace.
*/
public String namespace;
+ /**
+ * The default timestamp used to query the default namespace.
+ */
+ public long timestamp;
+
/**
* The #of threads to use to handle SPARQL queries -or- ZERO (0) for an
* unbounded pool.
*/
public int queryThreadPoolSize = 8;
-
+
public Config() {
}
@@ -934,47 +1158,51 @@
}
- /**
- * Run an httpd service exposing a SPARQL endpoint. The service will respond
- * at the root path for the specified port.
- *
- * @param args
- * USAGE: <code>port -stop</code> to stop the server; OR<br/>
- * <code>(options) <i>namespace</i> (propertyFile|configFile) )</code>
- * where
- * <dl>
- * <dt>port</dt>
- * <dd>The port on which the service will respond.</dd>
- * <dt>namespace</dt>
- * <dd>The namespace of the target KB instance ("kb" is the
- * default namespace).</dd>
- * <dt>propertyFile</dt>
- * <dd>A java properties file for a standalone {@link Journal}.</dd>
- * <dt>configFile</dt>
- * <dd>A jini configuration file for a bigdata federation.</dd>
- * </dl>
- * and <i>options</i> are any of:
- * <dl>
- * <dt>-nthreads</dt>
- * <dd>The #of threads which will be used to answer SPARQL queries.</dd>
- * <dt></dt>
- * <dd></dd>
- * <dt></dt>
- * <dd></dd>
- * <dt></dt>
- * <dd></dd>
- * <dt></dt>
- * <dd></dd>
- * <dt></dt>
- * <dd></dd>
- * </dl>
- */
+ /**
+ * Run an httpd service exposing a SPARQL endpoint. The service will respond
+ * to the following URL paths:
+ * <dl>
+ * <dt>http://localhost:port/</dt>
+ * <dd>The SPARQL end point for the default namespace as specified by the
+ * <code>namespace</code> command line argument.</dd>
+ * <dt>http://localhost:port/namespace/NAMESPACE</dt>
+ * <dd>where <code>NAMESPACE</code> is the namespace of some triple store or
+ * quad store, may be used to address ANY triple or quads store in the
+ * bigdata instance.</dd>
+ * <dt>http://localhost:port/status</dt>
+ * <dd>A status page.</dd>
+ * </dl>
+ *
+ * @param args
+ * USAGE: <code>port -stop</code> to stop the server; OR<br/>
+ * <code>(options) <i>namespace</i> (propertyFile|configFile) )</code>
+ * where
+ * <dl>
+ * <dt>port</dt>
+ * <dd>The port on which the service will respond.</dd>
+ * <dt>namespace</dt>
+ * <dd>The namespace of the default SPARQL endpoint (the
+ * namespace will be <code>kb</code> if none was specified when
+ * the triple/quad store was created).</dd>
+ * <dt>propertyFile</dt>
+ * <dd>A java properties file for a standalone {@link Journal}.</dd>
+ * <dt>configFile</dt>
+ * <dd>A jini configuration file for a bigdata federation.</dd>
+ * </dl>
+ * and <i>options</i> are any of:
+ * <dl>
+ * <dt>-nthreads</dt>
+ * <dd>The #of threads which will be used to answer SPARQL
+ * queries.</dd>
+ * </dl>
+ */
public static void main(final String[] args) {
final Config config = new Config();
config.port = 80;
Journal jnl = null;
JiniClient<?> jiniClient = null;
NanoSparqlServer server = null;
+ ITransactionService txs = null;
try {
/*
* `
@@ -1073,6 +1301,14 @@
}
+ txs = (indexManager instanceof Journal ? ((Journal) indexManager)
+ .getTransactionManager().getTransactionService()
+ : ((IBigdataFederation<?>) indexManager)
+ .getTransactionService());
+
+ config.timestamp = txs.newTx(ITx.READ_COMMITTED);
+ System.out.println("tx: " + config.timestamp);
+
// start the server.
server = new NanoSparqlServer(config, indexManager);
@@ -1104,9 +1340,10 @@
if (true) { // @todo if(!quiet) or if(verbose)
/*
- * Log some information about the kb (#of statements, etc).
+ * Log some information about the default kb (#of statements, etc).
*/
- System.out.println(server.getKBInfo());
+ System.out.println(server.getKBInfo(config.namespace,
+ config.timestamp));
}
/*
@@ -1126,7 +1363,15 @@
} catch (Throwable ex) {
ex.printStackTrace();
- } finally {
+ } finally {
+ if (txs != null) {
+ try {
+ txs.abort(config.timestamp);
+ } catch (IOException e) {
+ log.error("Could not release transaction: tx="
+ + config.timestamp, e);
+ }
+ }
if (server != null)
server.shutdownNow();
if (jnl != null) {
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|