From: <tho...@us...> - 2011-04-14 13:55:42
|
Revision: 4398 http://bigdata.svn.sourceforge.net/bigdata/?rev=4398&view=rev Author: thompsonbry Date: 2011-04-14 13:55:29 +0000 (Thu, 14 Apr 2011) Log Message: ----------- Removed the old NanoSparqlServer class from the 'bench' package and its test suite. Restored the thread pool for query processing. Restored pipe for use with DELETE with QUERY to avoid materialization of the entire query result before deleting the data. Fixed the CONSTRUCT test. Modified the response to include the elapsed time as well as the #of statements modified for update operations. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/CountersServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/NanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/XMLBuilder.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/webapp/TestNanoSparqlServer.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HTMLBuilder.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryType.java Removed Paths: ------------- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestAll.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestNanoSparqlServer.java branches/QUADS_QUERY_BRANCH/bigdata-sails/src/test/com/bigdata/rdf/sail/bench/TestNanoSparqlServer_StartStop.java Deleted: branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2011-04-13 20:33:10 UTC (rev 4397) +++ branches/QUADS_QUERY_BRANCH/bigdata-sails/src/java/com/bigdata/rdf/sail/bench/NanoSparqlServer.java 2011-04-14 13:55:29 UTC (rev 4398) @@ -1,2801 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on May 29, 2010 - */ -package com.bigdata.rdf.sail.bench; - -import info.aduna.xml.XMLWriter; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.io.PrintWriter; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Comparator; -import java.util.Date; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.TreeMap; -import java.util.UUID; -import java.util.Vector; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryLanguage; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParserFactory; -import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter; -import org.openrdf.repository.RepositoryException; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.RDFParserFactory; -import org.openrdf.rio.RDFParserRegistry; -import org.openrdf.rio.helpers.RDFHandlerBase; -import org.openrdf.rio.rdfxml.RDFXMLParser; -import org.openrdf.rio.rdfxml.RDFXMLWriter; -import org.openrdf.sail.SailException; - -import com.bigdata.bop.BOpUtility; -import com.bigdata.bop.BufferAnnotations; -import com.bigdata.bop.IPredicate; -import com.bigdata.bop.engine.IRunningQuery; -import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.fed.QueryEngineFactory; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.btree.BytesUtil; -import com.bigdata.btree.IndexMetadata; -import com.bigdata.counters.httpd.CounterSetHTTPD; -import com.bigdata.journal.IAtomicStore; -import com.bigdata.journal.IBufferStrategy; -import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.ITransactionService; -import com.bigdata.journal.ITx; -import com.bigdata.journal.Journal; -import com.bigdata.journal.RWStrategy; -import com.bigdata.journal.TimestampUtility; -import com.bigdata.rawstore.Bytes; -import com.bigdata.rdf.sail.BigdataSail; -import com.bigdata.rdf.sail.BigdataSailGraphQuery; -import com.bigdata.rdf.sail.BigdataSailRepository; -import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; -import com.bigdata.rdf.sail.BigdataSailTupleQuery; -import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; -import com.bigdata.rdf.sail.bench.NanoSparqlClient.QueryType; -import com.bigdata.rdf.store.AbstractTripleStore; -import com.bigdata.rdf.store.DataLoader; -import com.bigdata.relation.AbstractResource; -import com.bigdata.relation.RelationSchema; -import com.bigdata.rwstore.RWStore; -import com.bigdata.service.AbstractDistributedFederation; -import com.bigdata.service.IBigdataFederation; -import com.bigdata.service.jini.JiniClient; -import com.bigdata.sparse.ITPS; -import com.bigdata.util.concurrent.DaemonThreadFactory; -import com.bigdata.util.concurrent.ThreadPoolExecutorBaseStatisticsTask; -import com.bigdata.util.httpd.AbstractHTTPD; -import com.bigdata.util.httpd.NanoHTTPD; - -/** - * A flyweight SPARQL endpoint using HTTP. - * - * @author tho...@us... - * - * @todo Allow configuration options for the sparql endpoint either as URI - * parameters, in the property file, as request headers, or as query hints - * using the PREFIX mechanism. - * - * @todo Isn't there a baseURI option for SPARQL end points? - * - * @todo Add an "?explain" URL query parameter and show the execution plan and - * costs (or make this a navigable option from the set of running queries - * to drill into their running costs and offer an opportunity to kill them - * as well). - * - * @todo Add command to kill a running query, e.g., from the view of the long - * running queries. - * - * @todo Report other performance counters using {@link CounterSetHTTPD} - * - * @todo Simple update protocol. - * - * @todo If the addressed instance uses full transactions, then mutation should - * also use a full transaction. - * - * @todo Remote command to bulk load data from a remote or local resource (it's - * pretty much up to people handling deployment to secure access to - * queries, update requests, and bulk load requests). - * - * @todo Remote command to advance the read-behind point. This will let people - * bulk load a bunch of stuff before advancing queries to read from the - * new consistent commit point. - * - * @todo Review the settings for the {@link RDFParser} instances, e.g., - * verifyData, preserveBNodeIds, etc. Perhaps we should use the same - * defaults as the {@link DataLoader}? - * - * @todo It is possible that we could have concurrent requests which each get - * the unisolated connection. This could cause two problems: (1) we could - * exhaust our request pool, which would cause the server to block; and - * (2) I need to verify that the exclusive semaphore logic for the - * unisolated sail connection works with cross thread access. Someone had - * pointed out a bizarre hole in this.... - * - * @deprecated This has been replaced by the class of the same name in the - * <code>com.bigdata.sail.webapp</code> package. - */ -public class NanoSparqlServer extends AbstractHTTPD { - - /** - * The logger for the concrete {@link NanoSparqlServer} class. The {@link NanoHTTPD} - * class has its own logger. - */ - static private final Logger log = Logger.getLogger(NanoSparqlServer.class); - - /** - * A SPARQL results set in XML. - */ - static public final String MIME_SPARQL_RESULTS_XML = "application/sparql-results+xml"; - - /** - * RDF/XML. - */ - static public final String MIME_RDF_XML = "application/rdf+xml"; - - /** - * The character set used for the response (not negotiated). - */ - static private final String charset = "UTF-8"; - - /** - * The configuration object. - */ - private final Config config; - - /** - * Provides access to the bigdata database. - */ - private final IIndexManager indexManager; - - /** - * @todo use to decide ASK, DESCRIBE, CONSTRUCT, SELECT, EXPLAIN, etc. - */ - private final QueryParser engine; - - /** - * Runs a pool of threads for handling requests. - */ - private final ExecutorService queryService; - - private final LinkedBlockingQueue<byte[]> pipeBufferPool; - - /** - * Metadata about running queries. - */ - private static class RunningQuery { - - /** - * The unique identifier for this query for the {@link NanoSparqlServer}. - */ - final long queryId; - - /** - * The unique identifier for this query for the {@link QueryEngine}. - * - * @see QueryEngine#getRunningQuery(UUID) - */ - final UUID queryId2; - - /** The query. */ - final String query; - - /** The timestamp when the query was accepted (ns). */ - final long begin; - - public RunningQuery(final long queryId, final UUID queryId2, - final String query, final long begin) { - - this.queryId = queryId; - - this.queryId2 = queryId2; - - this.query = query; - - this.begin = begin; - - } - - } - - /** - * The currently executing queries (does not include queries where a client - * has established a connection but the query is not running because the - * {@link #queryService} is blocking). - */ - private final ConcurrentHashMap<Long/* queryId */, RunningQuery> queries = new ConcurrentHashMap<Long, RunningQuery>(); - - /** - * Factory for the query identifiers. - */ - private final AtomicLong queryIdFactory = new AtomicLong(); - - /** - * - * @param config - * The configuration for the server. - * @param indexManager - * The database instance that the server will operate against. - * - * @throws IOException - * @throws SailException - * @throws RepositoryException - */ - public NanoSparqlServer(final Config config, - final IIndexManager indexManager) throws IOException, - SailException, RepositoryException { - - super(config.port); - - if (config.namespace == null) - throw new IllegalArgumentException(); - - if(indexManager == null) - throw new IllegalArgumentException(); - - this.config = config; - - this.indexManager = indexManager; - - // used to parse qeries. - engine = new SPARQLParserFactory().getParser(); - - if (config.queryThreadPoolSize == 0) { - - queryService = (ThreadPoolExecutor) Executors - .newCachedThreadPool(new DaemonThreadFactory - (getClass().getName()+".queryService")); - - // no buffer pool since the #of requests is unbounded. - pipeBufferPool = null; - - } else { - - queryService = (ThreadPoolExecutor) Executors.newFixedThreadPool( - config.queryThreadPoolSize, new DaemonThreadFactory( - getClass().getName() + ".queryService")); - - // create a buffer pool which is reused for each request. - pipeBufferPool = new LinkedBlockingQueue<byte[]>( - config.queryThreadPoolSize); - - for (int i = 0; i < config.queryThreadPoolSize; i++) { - - pipeBufferPool.add(new byte[config.bufferCapacity]); - - } - - } - - if (indexManager.getCollectQueueStatistics()) { - - final long initialDelay = 0; // initial delay in ms. - final long delay = 1000; // delay in ms. - final TimeUnit unit = TimeUnit.MILLISECONDS; - - queueSampleTask = new ThreadPoolExecutorBaseStatisticsTask( - (ThreadPoolExecutor) queryService); - - queueStatsFuture = indexManager.addScheduledTask(queueSampleTask, - initialDelay, delay, unit); - - } else { - - queueSampleTask = null; - - queueStatsFuture = null; - - } - - } - private final ScheduledFuture<?> queueStatsFuture; - private final ThreadPoolExecutorBaseStatisticsTask queueSampleTask; - - /** - * Return a list of the registered {@link AbstractTripleStore}s. - */ - protected List<String> getNamespaces() { - - // the triple store namespaces. - final List<String> namespaces = new LinkedList<String>(); - - // scan the relation schema in the global row store. - final Iterator<ITPS> itr = (Iterator<ITPS>) indexManager - .getGlobalRowStore().rangeIterator(RelationSchema.INSTANCE); - - while (itr.hasNext()) { - - // A timestamped property value set is a logical row with - // timestamped property values. - final ITPS tps = itr.next(); - - // If you want to see what is in the TPS, uncomment this. -// System.err.println(tps.toString()); - - // The namespace is the primary key of the logical row for the - // relation schema. - final String namespace = (String) tps.getPrimaryKey(); - - // Get the name of the implementation class - // (AbstractTripleStore, SPORelation, LexiconRelation, etc.) - final String className = (String) tps.get(RelationSchema.CLASS) - .getValue(); - - try { - final Class<?> cls = Class.forName(className); - if (AbstractTripleStore.class.isAssignableFrom(cls)) { - // this is a triple store (vs something else). - namespaces.add(namespace); - } - } catch (ClassNotFoundException e) { - log.error(e,e); - } - - } - - return namespaces; - - } - - /** - * Return various interesting metadata about the KB state. - * - * @todo The range counts can take some time if the cluster is heavily - * loaded since they must query each shard for the primary statement - * index and the TERM2ID index. - */ - protected StringBuilder getKBInfo(final String namespace, - final long timestamp) { - - final StringBuilder sb = new StringBuilder(); - - BigdataSailRepositoryConnection conn = null; - - try { - - conn = getQueryConnection(namespace, timestamp); - - final AbstractTripleStore tripleStore = conn.getTripleStore(); - - sb.append("class\t = " + tripleStore.getClass().getName() + "\n"); - - sb - .append("indexManager\t = " - + tripleStore.getIndexManager().getClass() - .getName() + "\n"); - - sb.append("namespace\t = " + tripleStore.getNamespace() + "\n"); - - sb.append("timestamp\t = " - + TimestampUtility.toString(tripleStore.getTimestamp()) - + "\n"); - - sb.append("statementCount\t = " + tripleStore.getStatementCount() - + "\n"); - - sb.append("termCount\t = " + tripleStore.getTermCount() + "\n"); - - sb.append("uriCount\t = " + tripleStore.getURICount() + "\n"); - - sb.append("literalCount\t = " + tripleStore.getLiteralCount() + "\n"); - - /* - * Note: The blank node count is only available when using the told - * bnodes mode. - */ - sb - .append("bnodeCount\t = " - + (tripleStore.getLexiconRelation() - .isStoreBlankNodes() ? "" - + tripleStore.getBNodeCount() : "N/A") - + "\n"); - - sb.append(IndexMetadata.Options.BTREE_BRANCHING_FACTOR - + "=" - + tripleStore.getSPORelation().getPrimaryIndex() - .getIndexMetadata().getBranchingFactor() + "\n"); - - sb.append(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY - + "=" - + tripleStore.getSPORelation().getPrimaryIndex() - .getIndexMetadata() - .getWriteRetentionQueueCapacity() + "\n"); - - sb.append(BigdataSail.Options.STAR_JOINS + "=" - + conn.getRepository().getSail().isStarJoins() + "\n"); - - sb.append("-- All properties.--\n"); - - // get the triple store's properties from the global row store. - final Map<String, Object> properties = indexManager - .getGlobalRowStore().read(RelationSchema.INSTANCE, - namespace); - - // write them out, - for (String key : properties.keySet()) { - sb.append(key + "=" + properties.get(key)+"\n"); - } - - /* - * And show some properties which can be inherited from - * AbstractResource. These have been mainly phased out in favor of - * BOP annotations, but there are a few places where they are still - * in use. - */ - - sb.append("-- Interesting AbstractResource effective properties --\n"); - - sb.append(AbstractResource.Options.CHUNK_CAPACITY + "=" - + tripleStore.getChunkCapacity() + "\n"); - - sb.append(AbstractResource.Options.CHUNK_OF_CHUNKS_CAPACITY + "=" - + tripleStore.getChunkOfChunksCapacity() + "\n"); - - sb.append(AbstractResource.Options.CHUNK_TIMEOUT + "=" - + tripleStore.getChunkTimeout() + "\n"); - - sb.append(AbstractResource.Options.FULLY_BUFFERED_READ_THRESHOLD + "=" - + tripleStore.getFullyBufferedReadThreshold() + "\n"); - - sb.append(AbstractResource.Options.MAX_PARALLEL_SUBQUERIES + "=" - + tripleStore.getMaxParallelSubqueries() + "\n"); - - /* - * And show some interesting effective properties for the KB, SPO - * relation, and lexicon relation. - */ - sb.append("-- Interesting KB effective properties --\n"); - - sb - .append(AbstractTripleStore.Options.TERM_CACHE_CAPACITY - + "=" - + tripleStore - .getLexiconRelation() - .getProperties() - .getProperty( - AbstractTripleStore.Options.TERM_CACHE_CAPACITY, - AbstractTripleStore.Options.DEFAULT_TERM_CACHE_CAPACITY) + "\n"); - - /* - * And show several interesting properties with their effective - * defaults. - */ - - sb.append("-- Interesting Effective BOP Annotations --\n"); - - sb.append(BufferAnnotations.CHUNK_CAPACITY - + "=" - + tripleStore.getProperties().getProperty( - BufferAnnotations.CHUNK_CAPACITY, - "" + BufferAnnotations.DEFAULT_CHUNK_CAPACITY) - + "\n"); - - sb - .append(BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY - + "=" - + tripleStore - .getProperties() - .getProperty( - BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, - "" - + BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY) - + "\n"); - - sb.append(BufferAnnotations.CHUNK_TIMEOUT - + "=" - + tripleStore.getProperties().getProperty( - BufferAnnotations.CHUNK_TIMEOUT, - "" + BufferAnnotations.DEFAULT_CHUNK_TIMEOUT) - + "\n"); - - sb.append(PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS - + "=" - + tripleStore.getProperties().getProperty( - PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS, - "" + PipelineJoin.Annotations.DEFAULT_MAX_PARALLEL_CHUNKS) + "\n"); - - sb - .append(IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD - + "=" - + tripleStore - .getProperties() - .getProperty( - IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, - "" - + IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD) - + "\n"); - - // sb.append(tripleStore.predicateUsage()); - - if (tripleStore.getIndexManager() instanceof Journal) { - - final Journal journal = (Journal) tripleStore.getIndexManager(); - - final IBufferStrategy strategy = journal.getBufferStrategy(); - - if (strategy instanceof RWStrategy) { - - final RWStore store = ((RWStrategy) strategy).getRWStore(); - - store.showAllocators(sb); - - } - - } - - } catch (Throwable t) { - - log.warn(t.getMessage(), t); - - } finally { - - if(conn != null) { - try { - conn.close(); - } catch (RepositoryException e) { - log.error(e, e); - } - - } - - } - - return sb; - - } - - /** - * {@inheritDoc} - * <p> - * Overridden to wait until all running queries are complete before - */ - @Override - public void shutdown() { - if(log.isInfoEnabled()) - log.info("Normal shutdown."); - // Stop collecting queue statistics. - if (queueStatsFuture != null) - queueStatsFuture.cancel(true/* mayInterruptIfRunning */); - // Tell NanoHTTP to stop accepting new requests. - super.shutdown(); - // Stop servicing new requests. - queryService.shutdown(); - try { - queryService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - /* - * Note: This is using the atomic boolean as a lock in addition to - * relying on its visibility guarantee. - */ - synchronized (alive) { - alive.set(false); - alive.notifyAll(); - } - } - - /** - * {@inheritDoc} - * <p> - * Overridden to interrupt all running requests. - * - * FIXME Must abort any open transactions. This does not matter for the - * standalone database, but it will make a difference in scale-out. The - * transaction identifiers could be obtained from the {@link #queries} - * map. - */ - @Override - public void shutdownNow() { - if(log.isInfoEnabled()) - log.info("Normal shutdown."); - // Stop collecting queue statistics. - if (queueStatsFuture != null) - queueStatsFuture.cancel(true/* mayInterruptIfRunning */); - // Immediately stop accepting connections and interrupt open requests. - super.shutdownNow(); - // Interrupt all running queries. - queryService.shutdownNow(); - /* - * Note: This is using the atomic boolean as a lock in addition to - * relying on its visibility guarantee. - */ - synchronized (alive) { - alive.set(false); - alive.notifyAll(); - } - } - - /** - * Note: This uses an atomic boolean in order to give us a synchronization - * object whose state also serves as a condition variable. findbugs objects, - * but this is a deliberate usage. - */ - private final AtomicBoolean alive = new AtomicBoolean(true); - - /** - * <p> - * Perform an HTTP-POST, which corresponds to the basic CRUD operation - * "create" according to the generic interaction semantics of HTTP REST. The - * operation will be executed against the target namespace per the URI. - * </p> - * - * <pre> - * POST [/namespace/NAMESPACE] - * ... - * Content-Type: - * ... - * - * BODY - * </pre> - * <p> - * Where <code>BODY</code> is the new RDF content using the representation - * indicated by the <code>Content-Type</code>. - * </p> - * <p> - * -OR- - * </p> - * - * <pre> - * POST [/namespace/NAMESPACE] ?uri=URL - * </pre> - * <p> - * Where <code>URI</code> identifies a resource whose RDF content will be - * inserted into the database. The <code>uri</code> query parameter may - * occur multiple times. All identified resources will be loaded within a - * single native transaction. Bigdata provides snapshot isolation so you can - * continue to execute queries against the last commit point while this - * operation is executed. - * </p> - * - * <p> - * You can shutdown the server using: - * </p> - * - * <pre> - * POST /stop - * </pre> - * - * <p> - * A status page is available: - * </p> - * - * <pre> - * POST /status - * </pre> - */ - @Override - public Response doPost(final Request req) throws Exception { - - final String uri = req.uri; - - if("/stop".equals(uri)) { - - return doStop(req); - - } - - if("/status".equals(uri)) { - - return doStatus(req); - - } - - final String queryStr = getQueryStr(req.params); - - if (queryStr != null) { - - return doQuery(req); - - } - - final String contentType = req.getContentType(); - - if (contentType != null) { - - return doPostWithBody(req); - - } - - if (req.params.get("uri") != null) { - - return doPostWithURIs(req); - - } - - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, uri); - - } - - /** - * POST with request body containing statements to be inserted. - * - * @param req - * The request. - * - * @return The response. - * - * @throws Exception - */ - private Response doPostWithBody(final Request req) throws Exception { - - final String baseURI = "";// @todo baseURI query parameter? - - final String namespace = getNamespace(req.uri); - - final String contentType = req.getContentType(); - - if (contentType == null) - throw new UnsupportedOperationException(); - - if (log.isInfoEnabled()) - log.info("Request body: " + contentType); - - final RDFFormat format = RDFFormat.forMIMEType(contentType); - - if (format == null) { - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Content-Type not recognized as RDF: " - + contentType); - } - - if (log.isInfoEnabled()) - log.info("RDFFormat=" + format); - - final RDFParserFactory rdfParserFactory = RDFParserRegistry - .getInstance().get(format); - - if (rdfParserFactory == null) { - return new Response(HTTP_INTERNALERROR, MIME_TEXT_PLAIN, - "Parser not found: Content-Type=" + contentType); - } - - try { - - // resolve the default namespace. - final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - - if (tripleStore == null) - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Not found: namespace=" + namespace); - - final AtomicLong nmodified = new AtomicLong(0L); - - // Wrap with SAIL. - final BigdataSail sail = new BigdataSail(tripleStore); - BigdataSailConnection conn = null; - try { - - sail.initialize(); - conn = sail.getConnection(); - - /* - * There is a request body, so let's try and parse it. - */ - - final RDFParser rdfParser = rdfParserFactory.getParser(); - - rdfParser.setValueFactory(tripleStore.getValueFactory()); - - rdfParser.setVerifyData(true); - - rdfParser.setStopAtFirstError(true); - - rdfParser - .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); - - rdfParser.setRDFHandler(new AddStatementHandler(conn,nmodified)); - - /* - * Run the parser, which will cause statements to be inserted. - */ - rdfParser.parse(req.getInputStream(), baseURI); - - // Commit the mutation. - conn.commit(); - - return new Response(HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() - + " statements modified."); - - } finally { - - if (conn != null) - conn.close(); - -// sail.shutDown(); - - } - - } catch (Exception ex) { - - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); - - } - - } - - /** - * Helper class adds statements to the sail as they are visited by a parser. - */ - private static class AddStatementHandler extends RDFHandlerBase { - - private final BigdataSailConnection conn; - private final AtomicLong nmodified; - - public AddStatementHandler(final BigdataSailConnection conn, - final AtomicLong nmodified) { - this.conn = conn; - this.nmodified = nmodified; - } - - public void handleStatement(Statement stmt) throws RDFHandlerException { - - try { - - conn.addStatement(// - stmt.getSubject(), // - stmt.getPredicate(), // - stmt.getObject(), // - (Resource[]) (stmt.getContext() == null ? null - : new Resource[] { stmt.getContext() })// - ); - - } catch (SailException e) { - - throw new RDFHandlerException(e); - - } - - nmodified.incrementAndGet(); - - } - - } - - /** - * Helper class removes statements from the sail as they are visited by a parser. - */ - private static class RemoveStatementHandler extends RDFHandlerBase { - - private final BigdataSailConnection conn; - private final AtomicLong nmodified; - - public RemoveStatementHandler(final BigdataSailConnection conn, - final AtomicLong nmodified) { - this.conn = conn; - this.nmodified = nmodified; - } - - public void handleStatement(Statement stmt) throws RDFHandlerException { - - try { - - conn.removeStatements(// - stmt.getSubject(), // - stmt.getPredicate(), // - stmt.getObject(), // - (Resource[]) (stmt.getContext() == null ? null - : new Resource[] { stmt.getContext() })// - ); - - } catch (SailException e) { - - throw new RDFHandlerException(e); - - } - - nmodified.incrementAndGet(); - - } - - } - - /** - * POST with URIs of resources to be inserted. - * - * @param req - * The request. - * - * @return The response. - * - * @throws Exception - */ - private Response doPostWithURIs(final Request req) throws Exception { - - final String namespace = getNamespace(req.uri); - - final String contentType = req.getContentType(); - - final Vector<String> uris = req.params.get("uri"); - - if (uris == null) - throw new UnsupportedOperationException(); - - if (uris.isEmpty()) - return new Response(HTTP_OK, MIME_TEXT_PLAIN, - "0 statements modified"); - - if (log.isInfoEnabled()) - log.info("URIs: " + uris); - - // Before we do anything, make sure we have valid URLs. - final Vector<URL> urls = new Vector<URL>(uris.size()); - for (String uri : uris) { - urls.add(new URL(uri)); - } - - try { - - // resolve the default namespace. - final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - - if (tripleStore == null) - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Not found: namespace=" + namespace); - - final AtomicLong nmodified = new AtomicLong(0L); - - // Wrap with SAIL. - final BigdataSail sail = new BigdataSail(tripleStore); - BigdataSailConnection conn = null; - try { - - conn = sail.getConnection(); - - for (URL url : urls) { - - HttpURLConnection hconn = null; - try { - - hconn = (HttpURLConnection) url.openConnection(); - hconn.setRequestMethod(NanoHTTPD.GET); - hconn.setReadTimeout(0);// no timeout? http param? - - /* - * There is a request body, so let's try and parse it. - */ - - final RDFFormat format = RDFFormat - .forMIMEType(contentType); - - if (format == null) { - return new Response(HTTP_BADREQUEST, - MIME_TEXT_PLAIN, - "Content-Type not recognized as RDF: " - + contentType); - } - - final RDFParserFactory rdfParserFactory = RDFParserRegistry - .getInstance().get(format); - - if (rdfParserFactory == null) { - return new Response(HTTP_INTERNALERROR, - MIME_TEXT_PLAIN, - "Parser not found: Content-Type=" - + contentType); - } - - final RDFParser rdfParser = rdfParserFactory - .getParser(); - - rdfParser - .setValueFactory(tripleStore.getValueFactory()); - - rdfParser.setVerifyData(true); - - rdfParser.setStopAtFirstError(true); - - rdfParser - .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); - - rdfParser.setRDFHandler(new AddStatementHandler(conn, nmodified)); - - /* - * Run the parser, which will cause statements to be - * inserted. - */ - - rdfParser.parse(req.getInputStream(), url - .toExternalForm()/* baseURL */); - - } finally { - - if (hconn != null) - hconn.disconnect(); - - } // next URI. - - } - - // Commit the mutation. - conn.commit(); - - return new Response(HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() - + " statements modified."); - - } finally { - - if (conn != null) - conn.close(); - -// sail.shutDown(); - - } - - } catch (Exception ex) { - - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); - - } - - } - - /** - * Halt the server. - * - * @param req - * The request. - * - * @return The response. - * - * @throws Exception - */ - private Response doStop(final Request req) throws Exception { - - /* - * Create a new thread to run shutdown since we do not want this to - * block on the queryService. - */ - final Thread t = new Thread(new Runnable() { - - public void run() { - - log.warn("Will shutdown."); - - try { - - /* - * Sleep for a bit so the Response will be delivered - * before we shutdown the server. - */ - - Thread.sleep(100/* ms */); - - } catch (InterruptedException ex) { - - // ignore - - } - - // Shutdown the server. - shutdown(); - - } - - }); - - t.setDaemon(true); - - // Start the shutdown thread. - t.start(); - -// // Shutdown. -// shutdown(); - - /* - * Note: Client might not see this response since the shutdown thread - * may have already terminated the httpd service. - */ - return new Response(HTTP_OK, MIME_TEXT_PLAIN, "Shutting down."); - - } - - /** - * Accepts SPARQL queries. - * - * <pre> - * GET [/namespace/NAMESPACE] ?query=QUERY - * </pre> - * - * Where <code>QUERY</code> is the SPARQL query. - */ - @Override - public Response doGet(final Request req) throws Exception { - - final String uri = req.uri; - - if("/status".equals(uri)) { - - return doStatus(req); - - } - - final String queryStr = getQueryStr(req.params); - - if (queryStr != null) { - - return doQuery(req); - - } - - return new Response(HTTP_NOTFOUND, MIME_TEXT_PLAIN, uri); - - } - -// /** -// * TODO Perform an HTTP-PUT, which corresponds to the basic CRUD operation -// * "update" according to the generic interaction semantics of HTTP REST. -// * -// */ -// @Override -// public Response doPut(final Request req) { -// -// return new Response(HTTP_NOTFOUND, MIME_TEXT_PLAIN, req.uri); -// -// } - - /** - * REST DELETE. There are two forms for this operation. - * - * <pre> - * DELETE [/namespace/NAMESPACE] - * ... - * Content-Type - * ... - * - * BODY - * - * </pre> - * <p> - * BODY contains RDF statements according to the specified Content-Type. - * Statements parsed from the BODY are deleted from the addressed namespace. - * </p> - * <p> - * -OR- - * </p> - * - * <pre> - * DELETE [/namespace/NAMESPACE] ?query=... - * </pre> - * <p> - * Where <code>query</code> is a CONSTRUCT or DESCRIBE query. Statements are - * materialized using the query from the addressed namespace are deleted - * from that namespace. - * </p> - */ - @Override - public Response doDelete(final Request req) { - - final String contentType = req.getContentType(); - - final String queryStr = getQueryStr(req.params); - - if(contentType != null) { - - return doDeleteWithBody(req); - - } else if (queryStr != null) { - - return doDeleteWithQuery(req); - - } - - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, ""); - - } - - /** - * Delete all statements materialized by a DESCRIBE or CONSTRUCT query. - * <p> - * Note: To avoid materializing the statements, this runs the query against - * the last commit time. This is done while it is holding the unisolated - * connection which prevents concurrent modifications. Therefore the entire - * SELECT + DELETE operation is ACID. - */ - private Response doDeleteWithQuery(final Request req) { - - final String baseURI = "";// @todo baseURI query parameter? - - final String namespace = getNamespace(req.uri); - - final String queryStr = getQueryStr(req.params); - - if(queryStr == null) - throw new UnsupportedOperationException(); - - if (log.isInfoEnabled()) - log.info("delete with query: "+queryStr); - - try { - - // resolve the default namespace. - final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - - if (tripleStore == null) - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Not found: namespace=" + namespace); - - /* - * Note: pipe is drained by this thread to consume the query - * results, which are the statements to be deleted. - */ - final PipedOutputStream os = new PipedOutputStream(); - final InputStream is = newPipedInputStream(os); - try { - - final AbstractQueryTask queryTask = getQueryTask(namespace, - ITx.READ_COMMITTED, queryStr, req.params, req.headers, - os); - - switch (queryTask.queryType) { - case DESCRIBE: - case CONSTRUCT: - break; - default: - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Must be DESCRIBE or CONSTRUCT query."); - } - - final AtomicLong nmodified = new AtomicLong(0L); - - // Wrap with SAIL. - final BigdataSail sail = new BigdataSail(tripleStore); - BigdataSailConnection conn = null; - try { - - sail.initialize(); - - // get the unisolated connection. - conn = sail.getConnection(); - - final RDFXMLParser rdfParser = new RDFXMLParser( - tripleStore.getValueFactory()); - - rdfParser.setVerifyData(false); - - rdfParser.setStopAtFirstError(true); - - rdfParser - .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); - - rdfParser.setRDFHandler(new RemoveStatementHandler(conn, nmodified)); - - /* - * Run the parser, which will cause statements to be - * deleted. - */ - rdfParser.parse(is, baseURI); - - // Commit the mutation. - conn.commit(); - - } finally { - - if (conn != null) - conn.close(); - -// sail.shutDown(); - - } - - return new Response(HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() - + " statements modified."); - - } catch (Throwable t) { - - throw launderThrowable(t, os, queryStr); - - } - - } catch (Exception ex) { - - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); - - } - - } - - /** - * DELETE request with a request body containing the statements to be - * removed. - */ - private Response doDeleteWithBody(final Request req) { - - final String baseURI = "";// @todo baseURI query parameter? - - final String namespace = getNamespace(req.uri); - - final String contentType = req.getContentType(); - - if (contentType == null) - throw new UnsupportedOperationException(); - - if (log.isInfoEnabled()) - log.info("Request body: " + contentType); - - try { - - // resolve the default namespace. - final AbstractTripleStore tripleStore = (AbstractTripleStore) indexManager - .getResourceLocator().locate(namespace, ITx.UNISOLATED); - - if (tripleStore == null) - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Not found: namespace=" + namespace); - - final AtomicLong nmodified = new AtomicLong(0L); - - // Wrap with SAIL. - final BigdataSail sail = new BigdataSail(tripleStore); - BigdataSailConnection conn = null; - try { - - sail.initialize(); - conn = sail.getConnection(); - - /* - * There is a request body, so let's try and parse it. - */ - - final RDFFormat format = RDFFormat.forMIMEType(contentType); - - if (format == null) { - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Content-Type not recognized as RDF: " - + contentType); - } - - final RDFParserFactory rdfParserFactory = RDFParserRegistry - .getInstance().get(format); - - if (rdfParserFactory == null) { - return new Response(HTTP_INTERNALERROR, MIME_TEXT_PLAIN, - "Parser not found: Content-Type=" + contentType); - } - - final RDFParser rdfParser = rdfParserFactory.getParser(); - - rdfParser.setValueFactory(tripleStore.getValueFactory()); - - rdfParser.setVerifyData(true); - - rdfParser.setStopAtFirstError(true); - - rdfParser - .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); - - rdfParser.setRDFHandler(new RemoveStatementHandler(conn, - nmodified)); - - /* - * Run the parser, which will cause statements to be deleted. - */ - rdfParser.parse(req.getInputStream(), baseURI); - - // Commit the mutation. - conn.commit(); - - return new Response(HTTP_OK, MIME_TEXT_PLAIN, nmodified.get() - + " statements modified."); - - } finally { - - if (conn != null) - conn.close(); - -// sail.shutDown(); - - } - - } catch (Exception ex) { - - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); - - } - - } - - /** - * Return the namespace which will be used to execute the query. The - * namespace is represented by the first component of the URI. If there is - * no namespace, then return the configured default namespace. - * - * @param uri - * The URI path string. - * - * @return The namespace. - */ - private String getNamespace(final String uri) { - -// // locate the "//" after the protocol. -// final int index = uri.indexOf("//"); - - if(!uri.startsWith("/namespace/")) { - // use the default namespace. - return config.namespace; - } - - // locate the next "/" in the URI path. - final int beginIndex = uri.indexOf('/', 1/* fromIndex */); - - // locate the next "/" in the URI path. - int endIndex = uri.indexOf('/', beginIndex + 1/* fromIndex */); - - if (endIndex == -1) { - // use the rest of the URI. - endIndex = uri.length(); - } - - // return the namespace. - return uri.substring(beginIndex + 1, endIndex); - - } - - /** - * Return the timestamp which will be used to execute the query. The uri - * query parameter <code>timestamp</code> may be used to communicate the - * desired commit time against which the query will be issued. If that uri - * query parameter is not given then the default configured commit time will - * be used. Applications may create protocols for sharing interesting commit - * times as reported by {@link IAtomicStore#commit()} or by a distributed - * data loader (for scale-out). - * - * @todo the configured timestamp should only be used for the default - * namespace (or it should be configured for each graph explicitly, or - * we should bundle the (namespace,timestamp) together as a single - * object). - */ - private long getTimestamp(final String uri, - final LinkedHashMap<String, Vector<String>> params) { - - final Vector<String> tmp = params.get("timestamp"); - - if (tmp == null || tmp.size() == 0 || tmp.get(0) == null) { - - return config.timestamp; - - } - - final String val = tmp.get(0); - - return Long.valueOf(val); - - } - - /** - * Respond to a status request. - * - * @param uri - * @param method - * @param header - * @param params - * @return - * @throws Exception - * - * @todo add statistics for top-N queries based on query template - * identifiers, which can be communicated using query hints. See // - * wait for the subquery. - */ - private Response doStatus(final Request req) throws Exception { - - // SPARQL queries accepted by the SPARQL end point. - final boolean showQueries = req.params.get("showQueries") != null; - - // IRunningQuery objects currently running on the query controller. - final boolean showRunningQueries = req.params.get("showRunningQueries") != null; - - // Information about the KB (stats, properties). - final boolean showKBInfo = req.params.get("showKBInfo") != null; - - // bigdata namespaces known to the index manager. - final boolean showNamespaces = req.params.get("showNamespaces") != null; - - final StringBuilder sb = new StringBuilder(); - - sb.append("Accepted query count=" + queryIdFactory.get() + "\n"); - - sb.append("Running query count=" + queries.size() + "\n"); - - if (showNamespaces) { - - final List<String> namespaces = getNamespaces(); - - sb.append("Namespaces: "); - - for (String s : namespaces) { - - sb.append(s); - - } - - sb.append("\n"); - - } - - if (showKBInfo) { - - // General information on the connected kb. - sb.append(getKBInfo(getNamespace(req.uri), getTimestamp(req.uri, - req.params))); - - } - - if(queueSampleTask != null) { - - // Performance counters for the NSS queries. - sb.append(queueSampleTask.getCounters().toString()); - - } - -// if (indexManager instanceof IJournal) { -// -// /* -// * Stuff which is specific to a local/embedded database. -// */ -// -// final AbstractJournal jnl = (AbstractJournal) indexManager; -// -// sb.append("file\t= " + jnl.getFile() + "\n"); -// -// sb.append("BufferMode\t= " -// + jnl.getBufferStrategy().getBufferMode() + "\n"); -// -// sb.append("nextOffset\t= " + jnl.getRootBlockView().getNextOffset() -// + "\n"); -// -// if (LRUNexus.INSTANCE != null) { -// -// sb.append(LRUNexus.Options.CLASS + "=" -// + LRUNexus.INSTANCE.toString().getClass() + "\n"); -// -// sb.append(LRUNexus.INSTANCE.toString() + "\n"); -// -// } else { -// -// sb.append("LRUNexus is disabled."); -// -// } -// -// // show the disk access details. -// sb.append(jnl.getBufferStrategy().getCounters().toString()+"\n"); -// -// } - - if(showQueries) { - - /* - * Show the queries which are currently executing (accepted by the NanoSparqlServer). - */ - - sb.append("\n"); - - final long now = System.nanoTime(); - - final TreeMap<Long, RunningQuery> ages = new TreeMap<Long, RunningQuery>(new Comparator<Long>() { - /** - * Comparator puts the entries into descending order by the query - * execution time (longest running queries are first). - */ - public int compare(final Long o1, final Long o2) { - if(o1.longValue()<o2.longValue()) return 1; - if(o1.longValue()>o2.longValue()) return -1; - return 0; - } - }); - - { - - final Iterator<RunningQuery> itr = queries.values().iterator(); - - while (itr.hasNext()) { - - final RunningQuery query = itr.next(); - - final long age = now - query.begin; - - ages.put(age, query); - - } - - } - - { - - final Iterator<RunningQuery> itr = ages.values().iterator(); - - while (itr.hasNext()) { - - final RunningQuery query = itr.next(); - - final long age = now - query.begin; - - sb.append("age=" - + java.util.concurrent.TimeUnit.NANOSECONDS - .toMillis(age) + "ms, queryId=" - + query.queryId + "\n"); - sb.append(query.query + "\n"); - - } - - } - - } - - if(showRunningQueries) { - - /* - * Show the queries which are currently executing (actually running - * on the QueryEngine). - */ - - sb.append("\n"); - - final QueryEngine queryEngine = (QueryEngine) QueryEngineFactory - .getQueryController(indexManager); - - final UUID[] queryIds = queryEngine.getRunningQueries(); - -// final long now = System.nanoTime(); - - final TreeMap<Long, IRunningQuery> ages = new TreeMap<Long, IRunningQuery>(new Comparator<Long>() { - /** - * Comparator puts the entries into descending order by the query - * execution time (longest running queries are first). - */ - public int compare(final Long o1, final Long o2) { - if(o1.longValue()<o2.longValue()) return 1; - if(o1.longValue()>o2.longValue()) return -1; - return 0; - } - }); - - for(UUID queryId : queryIds) { - - final IRunningQuery query = queryEngine - .getRunningQuery(queryId); - - if (query == null) { - // Already terminated. - continue; - } - - ages.put(query.getElapsed(), query); - - } - - { - - final Iterator<IRunningQuery> itr = ages.values().iterator(); - - while (itr.hasNext()) { - - final IRunningQuery query = itr.next(); - - if (query.isDone() && query.getCause() != null) { - // Already terminated (normal completion). - continue; - } - - /* - * @todo The runstate and stats could be formatted into an - * HTML table ala QueryLog or RunState. - */ - sb.append("age=" + query.getElapsed() + "ms\n"); - sb.append("queryId=" + query.getQueryId() + "\n"); - sb.append(query.toString()); - sb.append("\n"); - sb.append(BOpUtility.toString(query.getQuery())); - sb.append("\n"); - sb.append("\n"); - -// final long age = query.getElapsed(); -// sb.append("age=" -// + java.util.concurrent.TimeUnit.NANOSECONDS -// .toMillis(age) + "ms, queryId=" -// + query.getQueryId() + "\nquery=" -// + BOpUtility.toString(query.getQuery()) + "\n"); - - } - - } - - } - - return new Response(HTTP_OK, MIME_TEXT_PLAIN, sb.toString()); - - } - - /** - * Answer a SPARQL query. - * - * @param uri - * @param method - * @param header - * @param params - * @return - * @throws Exception - */ - public Response doQuery(final Request req) throws Exception { - - final String namespace = getNamespace(req.uri); - - final long timestamp = getTimestamp(req.uri, req.params); - - final String queryStr = getQueryStr(req.params); - - if (queryStr == null) - return new Response(HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Specify query using ?query=...."); - - /* - * Setup pipes. The [os] will be passed into the task that executes the - * query. The [is] will be passed into the Response. The task is - * executed on a thread pool. - * - * Note: If the client closes the connection, then the InputStream - * passed into the Response will be closed and the task will terminate - * rather than running on in the background with a disconnected client. - */ - final PipedOutputStream os = new PipedOutputStream(); - final InputStream is = newPipedInputStream(os); - try { - - final AbstractQueryTask queryTask = getQueryTask(namespace, timestamp, - queryStr, req.params, req.headers, os); - - final FutureTask<Void> ft = new FutureTask<Void>(queryTask); - - // Setup the response. - // TODO Move charset choice into conneg logic. - final Response r = new Response(HTTP_OK, queryTask.mimeType - + "; charset='" + charset + "'", is); - - if (log.isTraceEnabled()) - log.trace("Will run query: " + queryStr); - - // Begin executing the query (asynchronous). - queryService.execute(ft); - - /* - * Sets the cache behavior. - */ - // r.addHeader("Cache-Control", - // "max-age=60, must-revalidate, public"); - // to disable caching. - // r.addHeader("Cache-Control", "no-cache"); - - return r; - - } catch (Throwable e) { - - throw launderThrowable(e, os, queryStr); - - } - - } - - /** - * Return the query string. - * - * @param params - * - * @return The query string -or- <code>null</code> if none was specified. - */ - private String getQueryStr(final Map<String, Vector<String>> params) { - - final String queryStr; - - final Vector<String> tmp = params.get("query"); - - if (tmp == null || tmp.isEmpty() || tmp.get(0) == null) { - queryStr = null; - } else { - queryStr = tmp.get(0); - - if (log.isDebugEnabled()) - log.debug("query: " + queryStr); - - } - - return queryStr; - - } - - /** - * Class reuses the a pool of buffers for each pipe. This is a significant - * performance win. - * - * @see NanoSparqlServer#pipeBufferPool - */ - private class MyPipedInputStream extends PipedInputStream { - - MyPipedInputStream(final PipedOutputStream os) throws IOException, - InterruptedException { - - super(os, 1/* size */); - - // override the buffer. - this.buffer = pipeBufferPool.take(); - - } - - public void close() throws IOException { - - super.close(); - - // return the buffer to the pool. - pipeBufferPoo... [truncated message content] |