From: <tho...@us...> - 2014-06-17 21:14:40
|
Revision: 8501 http://sourceforge.net/p/bigdata/code/8501 Author: thompsonbry Date: 2014-06-17 21:14:25 +0000 (Tue, 17 Jun 2014) Log Message: ----------- Continued progress in support of #566 and #753. I am iterating over the REST API method implementations and restructuring them to support both #566 and, by extension, #753 as well. It is not necessary to change over to group commit in order to derive the benefit from this refactoring, but when we do change over it will be with a single boolean switch from the existing operational mode to the group commit operational mode. I have modified the code to execute the RestApiTask in the caller's thread for the non-group-commit code path. This avoids the potential introduction of another thread for heavy query workloads. I have cleaned up some of the launderThrowable() invocations to provide better information about the REST API request that failed. This is not yet systematic. SPARQL Query and SPARQL UPDATE now go through the RestApiTask pattern. Continuing to identify, document, and work through potential problems in the REST API that would conflict with group commit semantics. All tests are passing. Group commit is still disabled. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/BigdataStatics.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BlueprintsServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DescribeCacheServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RestApiTask.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/BigdataStatics.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/BigdataStatics.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/BigdataStatics.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -125,7 +125,6 @@ * @see <a href="- http://sourceforge.net/apps/trac/bigdata/ticket/566" > * Concurrent unisolated operations against multiple KBs </a> */ - public static final boolean NSS_GROUP_COMMIT = Boolean - .getBoolean("com.bigdata.nssGroupCommit"); + public static final boolean NSS_GROUP_COMMIT = Boolean.getBoolean("com.bigdata.nssGroupCommit"); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -95,6 +95,7 @@ import com.bigdata.rdf.sail.ISPARQLUpdateListener; import com.bigdata.rdf.sail.SPARQLUpdateEvent; import com.bigdata.rdf.sail.sparql.Bigdata2ASTSPARQLParser; +import com.bigdata.rdf.sail.webapp.RestApiTask.RestApiMutationTask; import com.bigdata.rdf.sail.webapp.client.StringUtil; import com.bigdata.rdf.sparql.ast.ASTContainer; import com.bigdata.rdf.sparql.ast.QueryHints; @@ -365,13 +366,10 @@ /** * Immediate shutdown interrupts any running queries. * - * FIXME Must abort any open transactions. This does not matter for the - * standalone database, but it will make a difference in scale-out. The - * transaction identifiers could be obtained from the {@link #queries} map. - * - * FIXME This must also abort any running updates. Those are currently - * running in thread handling the {@link HttpServletRequest}, however it - * probably makes sense to execute them on a bounded thread pool as well. + * FIXME GROUP COMMIT: Shutdown should abort open transactions (including + * queries and updates). This hould be addressed when we handle group commit + * since that provides us with a means to recognize and interrupt each + * running {@link RestApiTask}. */ void shutdownNow() { @@ -1135,83 +1133,125 @@ abstract protected void doQuery(BigdataSailRepositoryConnection cxn, OutputStream os) throws Exception; - @Override - final public Void call() throws Exception { - BigdataSailRepositoryConnection cxn = null; - boolean success = false; - try { - // Note: Will be UPDATE connection if UPDATE request!!! - cxn = getQueryConnection(namespace, timestamp); - if(log.isTraceEnabled()) - log.trace("Query running..."); - beginNanos = System.nanoTime(); - if (explain && !update) { - /* - * The data goes to a bit bucket and we send an - * "explanation" of the query evaluation back to the caller. - * - * Note: The trick is how to get hold of the IRunningQuery - * object. It is created deep within the Sail when we - * finally submit a query plan to the query engine. We have - * the queryId (on queryId2), so we can look up the - * IRunningQuery in [m_queries] while it is running, but - * once it is terminated the IRunningQuery will have been - * cleared from the internal map maintained by the - * QueryEngine, at which point we can not longer find it. - * - * Note: We can't do this for UPDATE since it would have a - * side-effect anyway. The way to "EXPLAIN" an UPDATE is to - * break it down into the component QUERY bits and execute - * those. - */ - doQuery(cxn, new NullOutputStream()); - success = true; - } else { - doQuery(cxn, os); - success = true; - os.flush(); - os.close(); - } - if (log.isTraceEnabled()) - log.trace("Query done."); - return null; - } finally { - endNanos = System.nanoTime(); - m_queries.remove(queryId); - if (queryId2 != null) m_queries2.remove(queryId2); -// if (os != null) { -// try { -// os.close(); -// } catch (Throwable t) { -// log.error(t, t); -// } -// } - if (cxn != null) { - if (!success && !cxn.isReadOnly()) { + /** + * Task for executing a SPARQL QUERY or SPARQL UPDATE. + * <p> + * See {@link AbstractQueryTask#update} to decide whether this task is a + * QUERY or an UPDATE. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class SparqlRestApiTask extends RestApiTask<Void> { + + public SparqlRestApiTask(final HttpServletRequest req, + final HttpServletResponse resp, final String namespace, + final long timestamp) { + + super(req, resp, namespace, timestamp); + + } + + @Override + public Void call() throws Exception { + BigdataSailRepositoryConnection cxn = null; + boolean success = false; + try { + // Note: Will be UPDATE connection if UPDATE request!!! + cxn = getQueryConnection();//namespace, timestamp); + if(log.isTraceEnabled()) + log.trace("Query running..."); + beginNanos = System.nanoTime(); + if (explain && !update) { /* - * Force rollback of the connection. + * The data goes to a bit bucket and we send an + * "explanation" of the query evaluation back to the caller. * - * Note: It is possible that the commit has already been - * processed, in which case this rollback() will be a - * NOP. This can happen when there is an IO error when - * communicating with the client, but the database has - * already gone through a commit. + * Note: The trick is how to get hold of the IRunningQuery + * object. It is created deep within the Sail when we + * finally submit a query plan to the query engine. We have + * the queryId (on queryId2), so we can look up the + * IRunningQuery in [m_queries] while it is running, but + * once it is terminated the IRunningQuery will have been + * cleared from the internal map maintained by the + * QueryEngine, at which point we can not longer find it. + * + * Note: We can't do this for UPDATE since it would have a + * side-effect anyway. The way to "EXPLAIN" an UPDATE is to + * break it down into the component QUERY bits and execute + * those. */ + doQuery(cxn, new NullOutputStream()); + success = true; + } else { + doQuery(cxn, os); + success = true; + os.flush(); + os.close(); + } + if (log.isTraceEnabled()) + log.trace("Query done."); + return null; + } finally { + endNanos = System.nanoTime(); + m_queries.remove(queryId); + if (queryId2 != null) m_queries2.remove(queryId2); +// if (os != null) { +// try { +// os.close(); +// } catch (Throwable t) { +// log.error(t, t); +// } +// } + if (cxn != null) { + if (!success && !cxn.isReadOnly()) { + /* + * Force rollback of the connection. + * + * Note: It is possible that the commit has already been + * processed, in which case this rollback() will be a + * NOP. This can happen when there is an IO error when + * communicating with the client, but the database has + * already gone through a commit. + */ + try { + // Force rollback of the connection. + cxn.rollback(); + } catch (Throwable t) { + log.error(t, t); + } + } try { - // Force rollback of the connection. - cxn.rollback(); + // Force close of the connection. + cxn.close(); } catch (Throwable t) { log.error(t, t); } } - try { - // Force close of the connection. - cxn.close(); - } catch (Throwable t) { - log.error(t, t); - } } } + + } + + @Override + final public Void call() throws Exception { + + final String queryOrUpdateStr = astContainer.getQueryString(); + + try { + + return BigdataServlet.submitApiTask(getIndexManager(), + new SparqlRestApiTask(req, resp, namespace, timestamp)) + .get(); + + } catch (Throwable t) { + + // FIXME GROUP_COMMIT: check calling stack for existing launderThrowable. + throw BigdataRDFServlet.launderThrowable(t, resp, + queryOrUpdateStr); + + } + } // call() } // class AbstractQueryTask @@ -1234,6 +1274,7 @@ } + @Override protected void doQuery(final BigdataSailRepositoryConnection cxn, final OutputStream os) throws Exception { @@ -2111,64 +2152,64 @@ } - /** - * Return a connection transaction, which may be read-only or support - * update. When the timestamp is associated with a historical commit point, - * this will be a read-only connection. When it is associated with the - * {@link ITx#UNISOLATED} view or a read-write transaction, this will be a - * mutable connection. - * - * @param namespace - * The namespace. - * @param timestamp - * The timestamp. - * - * @throws RepositoryException - */ - public BigdataSailRepositoryConnection getQueryConnection( - final String namespace, final long timestamp) - throws RepositoryException { +// /** +// * Return a connection transaction, which may be read-only or support +// * update. When the timestamp is associated with a historical commit point, +// * this will be a read-only connection. When it is associated with the +// * {@link ITx#UNISOLATED} view or a read-write transaction, this will be a +// * mutable connection. +// * +// * @param namespace +// * The namespace. +// * @param timestamp +// * The timestamp. +// * +// * @throws RepositoryException +// */ +// public BigdataSailRepositoryConnection getQueryConnection( +// final String namespace, final long timestamp) +// throws RepositoryException { +// +// /* +// * Note: [timestamp] will be a read-only tx view of the triple store if +// * a READ_LOCK was specified when the NanoSparqlServer was started +// * (unless the query explicitly overrides the timestamp of the view on +// * which it will operate). +// */ +// final AbstractTripleStore tripleStore = getTripleStore(namespace, +// timestamp); +// +// if (tripleStore == null) { +// +// throw new DatasetNotFoundException("Not found: namespace=" +// + namespace + ", timestamp=" +// + TimestampUtility.toString(timestamp)); +// +// } +// +// // Wrap with SAIL. +// final BigdataSail sail = new BigdataSail(tripleStore); +// +// final BigdataSailRepository repo = new BigdataSailRepository(sail); +// +// repo.initialize(); +// +// if (TimestampUtility.isReadOnly(timestamp)) { +// +// return (BigdataSailRepositoryConnection) repo +// .getReadOnlyConnection(timestamp); +// +// } +// +// // Read-write connection. +// final BigdataSailRepositoryConnection conn = repo.getConnection(); +// +// conn.setAutoCommit(false); +// +// return conn; +// +// } - /* - * Note: [timestamp] will be a read-only tx view of the triple store if - * a READ_LOCK was specified when the NanoSparqlServer was started - * (unless the query explicitly overrides the timestamp of the view on - * which it will operate). - */ - final AbstractTripleStore tripleStore = getTripleStore(namespace, - timestamp); - - if (tripleStore == null) { - - throw new DatasetNotFoundException("Not found: namespace=" - + namespace + ", timestamp=" - + TimestampUtility.toString(timestamp)); - - } - - // Wrap with SAIL. - final BigdataSail sail = new BigdataSail(tripleStore); - - final BigdataSailRepository repo = new BigdataSailRepository(sail); - - repo.initialize(); - - if (TimestampUtility.isReadOnly(timestamp)) { - - return (BigdataSailRepositoryConnection) repo - .getReadOnlyConnection(timestamp); - - } - - // Read-write connection. - final BigdataSailRepositoryConnection conn = repo.getConnection(); - - conn.setAutoCommit(false); - - return conn; - - } - /** * Return a read-only view of the {@link AbstractTripleStore} for the given * namespace will read from the commit point associated with the given @@ -2182,12 +2223,17 @@ * @return The {@link AbstractTripleStore} -or- <code>null</code> if none is * found for that namespace and timestamp. * - * @todo enforce historical query by making sure timestamps conform (we do - * not want to allow read/write tx queries unless update semantics are - * introduced ala SPARQL 1.1). - * - * @todo Use a distributed read-only tx for queries (it would be nice if a - * tx used 2PL to specify which namespaces it could touch). + * FIXME GROUP_COMMIT: Review all callers. They are suspect. The + * code will sometimes resolve the KB as of the timestamp, but, + * given that the default is to read against the lastCommitTime, + * that does NOT prevent a concurrent destroy or create of a KB that + * invalidates such a pre-condition test. The main reason for such + * pre-condition tests is to provide nice HTTP status code responses + * when an identified namespace does (or does not) exist. The better + * way to handle this is by pushing the pre-condition test down into + * the {@link RestApiTask} and then throwning out an appropriate + * marked exception that gets correctly converted into an HTTP + * BAD_REQUEST message rather than sending back a stack trace. */ public AbstractTripleStore getTripleStore(final String namespace, final long timestamp) { @@ -2214,8 +2260,12 @@ * @throws SailException * * @throws RepositoryException + * + * FIXME GROUP COMMIT: This is deprecated by the support for + * {@link RestApiMutationTask}s */ - public BigdataSailRepositoryConnection getUnisolatedConnection( // FIXME REVIEW CALLERS + @Deprecated // deprecated by the + BigdataSailRepositoryConnection getUnisolatedConnection( final String namespace) throws SailException, RepositoryException { // resolve the default namespace. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -47,8 +47,6 @@ import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.impl.URIImpl; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.RepositoryResult; import org.openrdf.rio.RDFFormat; import org.openrdf.rio.RDFHandlerException; import org.openrdf.rio.RDFWriter; @@ -59,7 +57,6 @@ import com.bigdata.rdf.properties.PropertiesWriter; import com.bigdata.rdf.properties.PropertiesWriterRegistry; import com.bigdata.rdf.rules.ConstraintViolationException; -import com.bigdata.rdf.sail.webapp.XMLBuilder.Node; import com.bigdata.util.InnerCause; /** @@ -398,41 +395,7 @@ buildResponse(resp, HTTP_OK, MIME_APPLICATION_XML, w.toString()); } - - /** - * Report the contexts back to the user agent. - * - * @param resp - * The response. - * @param it - * The iteration of contexts. - * @param elapsed - * The elapsed time (milliseconds). - * - * @throws IOException - */ - static protected void reportContexts(final HttpServletResponse resp, - final RepositoryResult<Resource> contexts, final long elapsed) - throws IOException, RepositoryException { - - final StringWriter w = new StringWriter(); - final XMLBuilder t = new XMLBuilder(w); - - final Node root = t.root("contexts"); - - while (contexts.hasNext()) { - - root.node("context").attr("uri", contexts.next()).close(); - - } - - root.close(); - - buildResponse(resp, HTTP_OK, MIME_APPLICATION_XML, w.toString()); - - } - /** * Send an RDF Graph as a response using content negotiation. * Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -34,6 +34,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; @@ -53,6 +54,7 @@ import com.bigdata.rdf.sail.webapp.client.IMimeTypes; import com.bigdata.rdf.sail.webapp.lbs.IHALoadBalancerPolicy; import com.bigdata.rdf.store.AbstractTripleStore; +import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; /** @@ -216,22 +218,49 @@ * @see <a href="- http://sourceforge.net/apps/trac/bigdata/ticket/566" > * Concurrent unisolated operations against multiple KBs </a> */ - @SuppressWarnings({ "unchecked", "rawtypes" }) protected <T> Future<T> submitApiTask(final RestApiTask<T> task) throws DatasetNotFoundException { + final IIndexManager indexManager = getIndexManager(); + + return submitApiTask(indexManager, task); + + } + + /** + * Submit a task and return a {@link Future} for that task. The task will be + * run on the appropriate executor service depending on the nature of the + * backing database and the view required by the task. + * + * @param indexManager + * The {@link IndexManager}. + * @param task + * The task. + * + * @return The {@link Future} for that task. + * + * @throws DatasetNotFoundException + * + * @see <a href="http://sourceforge.net/apps/trac/bigdata/ticket/753" > HA + * doLocalAbort() should interrupt NSS requests and AbstractTasks </a> + * @see <a href="- http://sourceforge.net/apps/trac/bigdata/ticket/566" > + * Concurrent unisolated operations against multiple KBs </a> + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + static protected <T> Future<T> submitApiTask( + final IIndexManager indexManager, final RestApiTask<T> task) + throws DatasetNotFoundException { + final String namespace = task.getNamespace(); final long timestamp = task.getTimestamp(); - final IIndexManager indexManager = getIndexManager(); - if (!BigdataStatics.NSS_GROUP_COMMIT || indexManager instanceof IBigdataFederation || TimestampUtility.isReadOnly(timestamp) ) { /* - * Run on a normal executor service. + * Execute the REST API task. * * Note: For scale-out, the operation will be applied using * client-side global views of the indices. @@ -240,10 +269,31 @@ * a Journal). This is helpful since we can avoid some overhead * associated the AbstractTask lock declarations. */ - - return indexManager.getExecutorService().submit( + // Wrap Callable. + final FutureTask<T> ft = new FutureTask<T>( new RestApiTaskForIndexManager(indexManager, task)); + if (true) { + + /* + * Caller runs (synchronous execution) + * + * Note: By having the caller run the task here we avoid + * consuming another thread. + */ + ft.run(); + + } else { + + /* + * Run on a normal executor service. + */ + indexManager.getExecutorService().submit(ft); + + } + + return ft; + } else { /** @@ -282,7 +332,7 @@ } } - + /** * Acquire the locks for the named indices associated with the specified KB. * Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BlueprintsServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BlueprintsServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BlueprintsServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -33,8 +33,8 @@ import com.bigdata.blueprints.BigdataGraphBulkLoad; import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.sail.webapp.RestApiTask.RestApiMutationTask; import com.bigdata.rdf.sail.webapp.client.MiniMime; -import com.bigdata.rdf.store.AbstractTripleStore; import com.tinkerpop.blueprints.util.io.graphml.GraphMLReader; /** @@ -69,16 +69,8 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - final long begin = System.currentTimeMillis(); - - final String namespace = getNamespace(req); - - final long timestamp = getTimestamp(req); - - final AbstractTripleStore tripleStore = getBigdataRDFContext() - .getTripleStore(namespace, timestamp); - - if (tripleStore == null) { + if (getBigdataRDFContext().getTripleStore(getNamespace(req), + getTimestamp(req)) == null) { /* * There is no such triple/quad store instance. */ @@ -104,12 +96,37 @@ try { + submitApiTask( + new BlueprintsPostTask(req, resp, getNamespace(req), + getTimestamp(req))).get(); + + } catch (Throwable t) { + + throw BigdataRDFServlet.launderThrowable(t, resp, ""); + + } + + } + + private static class BlueprintsPostTask extends RestApiMutationTask<Void> { + + public BlueprintsPostTask(HttpServletRequest req, + HttpServletResponse resp, String namespace, long timestamp) { + + super(req, resp, namespace, timestamp); + + } + + @Override + public Void call() throws Exception { + + final long begin = System.currentTimeMillis(); + BigdataSailRepositoryConnection conn = null; boolean success = false; try { - conn = getBigdataRDFContext() - .getUnisolatedConnection(namespace); + conn = getUnisolatedConnection(); final BigdataGraphBulkLoad graph = new BigdataGraphBulkLoad(conn); @@ -123,10 +140,11 @@ final long elapsed = System.currentTimeMillis() - begin; - reportModifiedCount(resp, nmodified, elapsed); - - return; + reportModifiedCount(nmodified, elapsed); + // Done. + return null; + } finally { if (conn != null) { @@ -137,15 +155,11 @@ conn.close(); } - + } - } catch (Throwable t) { + } - throw BigdataRDFServlet.launderThrowable(t, resp, ""); - - } - } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.PipedOutputStream; +import java.util.Arrays; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicLong; @@ -270,8 +271,6 @@ final HttpServletResponse resp) throws IOException { final String baseURI = req.getRequestURL().toString(); - - final String namespace = getNamespace(req); final String contentType = req.getContentType(); @@ -281,68 +280,70 @@ if (log.isInfoEnabled()) log.info("Request body: " + contentType); - try { + /** + * There is a request body, so let's try and parse it. + * + * <a href="https://sourceforge.net/apps/trac/bigdata/ticket/620"> + * UpdateServlet fails to parse MIMEType when doing conneg. </a> + */ - /** - * There is a request body, so let's try and parse it. - * - * <a href="https://sourceforge.net/apps/trac/bigdata/ticket/620"> - * UpdateServlet fails to parse MIMEType when doing conneg. </a> - */ + final RDFFormat format = RDFFormat.forMIMEType(new MiniMime( + contentType).getMimeType()); - final RDFFormat format = RDFFormat.forMIMEType(new MiniMime( - contentType).getMimeType()); + if (format == null) { - if (format == null) { + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Content-Type not recognized as RDF: " + contentType); - buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, - "Content-Type not recognized as RDF: " + contentType); + return; - return; + } - } + final RDFParserFactory rdfParserFactory = RDFParserRegistry + .getInstance().get(format); - final RDFParserFactory rdfParserFactory = RDFParserRegistry - .getInstance().get(format); + if (rdfParserFactory == null) { - if (rdfParserFactory == null) { + buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, + "Parser factory not found: Content-Type=" + contentType + + ", format=" + format); - buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, - "Parser factory not found: Content-Type=" + contentType - + ", format=" + format); + return; - return; + } + /* + * Allow the caller to specify the default contexts. + */ + final Resource[] defaultContext; + { + final String[] s = req.getParameterValues("context-uri"); + if (s != null && s.length > 0) { + try { + defaultContext = toURIs(s); + } catch (IllegalArgumentException ex) { + buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, + ex.getLocalizedMessage()); + return; + } + } else { + defaultContext = new Resource[0]; } + } - /* - * Allow the caller to specify the default contexts. - */ - final Resource[] defaultContext; - { - final String[] s = req.getParameterValues("context-uri"); - if (s != null && s.length > 0) { - try { - defaultContext = toURIs(s); - } catch (IllegalArgumentException ex) { - buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, - ex.getLocalizedMessage()); - return; - } - } else { - defaultContext = new Resource[0]; - } - } - + try { + submitApiTask( - new DeleteWithBodyTask(req, resp, namespace, + new DeleteWithBodyTask(req, resp, getNamespace(req), ITx.UNISOLATED, baseURI, defaultContext, rdfParserFactory)).get(); - + } catch (Throwable t) { - throw BigdataRDFServlet.launderThrowable(t, resp, ""); - + throw BigdataRDFServlet.launderThrowable(t, resp, + "DELETE-WITH-BODY: baseURI=" + baseURI + ", context-uri=" + + Arrays.toString(defaultContext)); + } } @@ -523,7 +524,7 @@ } if (log.isInfoEnabled()) - log.info("delete with access path: (s=" + s + ", p=" + p + ", o=" + log.info("DELETE-WITH-ACCESS-PATH: (s=" + s + ", p=" + p + ", o=" + o + ", c=" + c + ")"); try { @@ -534,8 +535,9 @@ } catch (Throwable t) { - throw BigdataRDFServlet.launderThrowable(t, resp, "s=" + s + ",p=" - + p + ",o=" + o + ",c=" + c); + throw BigdataRDFServlet.launderThrowable(t, resp, + "DELETE-WITH-ACCESS-PATH: (s=" + s + ",p=" + p + ",o=" + o + + ",c=" + c + ")"); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DescribeCacheServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DescribeCacheServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DescribeCacheServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -129,6 +129,10 @@ /** * GET returns the DESCRIBE of the resource. + * + * FIXME DESCRIBE: TX ISOLATION for request but ensure that cache is not + * negatively effected by that isolation (i.e., how does the cache index + * based on time tx view). */ @Override protected void doGet(final HttpServletRequest req, @@ -369,14 +373,10 @@ os.flush(); } catch (Throwable e) { -// try { - throw BigdataRDFServlet.launderThrowable(e, resp, - "DESCRIBE" - // queryStr // TODO Report as "DESCRIBE uri(s)". - ); -// } catch (Exception e1) { -// throw new RuntimeException(e); -// } + + throw BigdataRDFServlet.launderThrowable(e, resp, + "DESCRIBE: uris=" + internalURIs); + } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -138,8 +138,6 @@ final String baseURI = req.getRequestURL().toString(); - final String namespace = getNamespace(req); - final String contentType = req.getContentType(); if (contentType == null) @@ -175,32 +173,13 @@ if (rdfParserFactory == null) { buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, - "Parser factory not found: Content-Type=" - + contentType + ", format=" + format); - - return; + "Parser factory not found: Content-Type=" + contentType + + ", format=" + format); + return; + } -// /* -// * Allow the caller to specify the default context. -// */ -// final Resource defaultContext; -// { -// final String s = req.getParameter("context-uri"); -// if (s != null) { -// try { -// defaultContext = new URIImpl(s); -// } catch (IllegalArgumentException ex) { -// buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, -// ex.getLocalizedMessage()); -// return; -// } -// } else { -// defaultContext = null; -// } -// } - /* * Allow the caller to specify the default contexts. */ @@ -223,13 +202,16 @@ try { submitApiTask( - new InsertWithBodyTask(req, resp, namespace, ITx.UNISOLATED, - baseURI, defaultContext, rdfParserFactory)).get(); + new InsertWithBodyTask(req, resp, getNamespace(req), + ITx.UNISOLATED, baseURI, defaultContext, + rdfParserFactory)).get(); } catch (Throwable t) { - throw BigdataRDFServlet.launderThrowable(t, resp, ""); - + throw BigdataRDFServlet.launderThrowable(t, resp, + "INSERT-WITH-BODY: baseURI=" + baseURI + ", context-uri=" + + Arrays.toString(defaultContext)); + } } @@ -385,25 +367,6 @@ } -// /* -// * Allow the caller to specify the default context. -// */ -// final Resource defaultContext; -// { -// final String s = req.getParameter("context-uri"); -// if (s != null) { -// try { -// defaultContext = new URIImpl(s); -// } catch (IllegalArgumentException ex) { -// buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, -// ex.getLocalizedMessage()); -// return; -// } -// } else { -// defaultContext = null; -// } -// } - /* * Allow the caller to specify the default contexts. */ @@ -431,8 +394,9 @@ } catch (Throwable t) { - throw launderThrowable(t, resp, "urls=" + urls); - + throw launderThrowable(t, resp, "uri=" + urls + ", context-uri=" + + Arrays.toString(defaultContext)); + } } @@ -688,10 +652,10 @@ } if (c.length >= 2) { - // added to more than one context - nmodified.addAndGet(c.length); + // added to more than one context + nmodified.addAndGet(c.length); } else { - nmodified.incrementAndGet(); + nmodified.incrementAndGet(); } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -63,6 +63,10 @@ * NanoSparqlServer Admin API for Multi-tenant deployments</a> * * @author thompsonbry + * + * FIXME GROUP COMMIT: The CREATE and DESTROY operations require special + * attention. The other operations in this class also should use the new + * REST API pattern, but are not intrinsically sensitive. */ public class MultiTenancyServlet extends BigdataRDFServlet { @@ -540,13 +544,6 @@ final long timestamp = getTimestamp(req); -// if (timestamp == ITx.READ_COMMITTED) { -// -// // Use the last commit point. -// timestamp = getIndexManager().getLastCommitTime(); -// -// } - final long tx = getBigdataRDFContext().newTx(timestamp); try { @@ -582,13 +579,6 @@ final HttpServletResponse resp) throws IOException { final long timestamp = getTimestamp(req); - -// if (timestamp == ITx.READ_COMMITTED) { -// -// // Use the last commit point. -// timestamp = getIndexManager().getLastCommitTime(); -// -// } final boolean describeEachNamedGraph; { @@ -683,9 +673,7 @@ final VoID v = new VoID(g, tripleStore, serviceURI, aDataSet); - v.describeDataSet(false/* describeStatistics */, -// getBigdataRDFContext().getConfig().describeEachNamedGraph); - describeEachNamedGraph); + v.describeDataSet(false/* describeStatistics */, describeEachNamedGraph); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-06-17 20:51:10 UTC (rev 8500) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-06-17 21:14:25 UTC (rev 8501) @@ -69,6 +69,8 @@ import com.bigdata.rdf.sail.webapp.BigdataRDFContext.AbstractQueryTask; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.RunningQuery; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.UpdateTask; +import com.bigdata.rdf.sail.webapp.RestApiTask.RestApiQueryTask; +import com.bigdata.rdf.sail.webapp.XMLBuilder.Node; import com.bigdata.rdf.sail.webapp.client.EncodeDecodeValue; import com.bigdata.rdf.sparql.ast.ASTContainer; import com.bigdata.rdf.sparql.ast.QueryRoot; @@ -163,11 +165,11 @@ if (req.getParameter(ATTR_UPDATE) != null) { // SPARQL 1.1 UPDATE. - doUpdate(req, resp); + doSparqlUpdate(req, resp); } else if (RESTServlet.hasMimeType(req, MIME_SPARQL_UPDATE)) { // SPARQL 1.1 UPDATE, see trac 711 for bug report motivating this case - doUpdate(req, resp); + doSparqlUpdate(req, resp); } else if (req.getParameter(ATTR_UUID) != null) { @@ -187,7 +189,7 @@ } else { // SPARQL Query. - doQuery(req, resp); + doSparqlQuery(req, resp); } @@ -202,7 +204,7 @@ if (req.getParameter(ATTR_QUERY) != null) { - doQuery(req, resp); + doSparqlQuery(req, resp); } else if (req.getParameter(ATTR_UUID) != null) { @@ -318,7 +320,7 @@ * @param resp * @throws IOException */ - private void doUpdate(final HttpServletRequest req, + private void doSparqlUpdate(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { if (!isWritable(getServletContext(), req, resp)) { @@ -409,43 +411,9 @@ } /** - * FIXME GROUP COMMIT: We need to refactor the code that manages the - * running queries in BigdataRDFServlet so we can separate out the - * concurrency control of the views from the control over the #of - * running queries and/or update requests and the metadata that we - * manage to track and report on those requests. - */ -// private static class SparqlUpdateTask extends RestApiMutationTask<Void> { -// -// /** -// * -// * @param namespace -// * The namespace of the target KB instance. -// * @param timestamp -// * The timestamp used to obtain a mutable connection. -// * @param baseURI -// * The base URI for the operation. -// */ -// public SparqlUpdateTask(final HttpServletRequest req, -// final HttpServletResponse resp, -// final String namespace, final long timestamp -// ) { -// super(req, resp, namespace, timestamp); -// } -// -// @Override -// public Void call() throws Exception { -// -// -// -// } -// -// } - - /** * Run a SPARQL query. */ - void doQuery(final HttpServletRequest req, final HttpServletResponse resp) + void doSparqlQuery(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { if (!isReadable(getServletContext(), req, resp)) { @@ -1065,10 +1033,6 @@ return; } - final long begin = System.currentTimeMillis(); - - final String namespace = getNamespace(req); - final Resource s; final URI p; final Value o; @@ -1089,70 +1053,94 @@ + o + ", c=" + c + ")"); try { + + submitApiTask( + new EstCardTask(req, resp, getNamespace(req), + getTimestamp(req), // + s, p, o, c)).get(); - try { + } catch (Throwable t) { - BigdataSailRepositoryConnection conn = null; - try { + launderThrowable(t, resp, "ESTCARD: access path: (s=" + s + ", p=" + + p + ", o=" + o + ", c=" + c + ")"); - final long timestamp = getTimestamp(req); + } + + } + + /** + * Helper task for the ESTCARD query. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private static class EstCardTask extends RestApiQueryTask<Void> { - conn = getBigdataRDFContext().getQueryConnection( - namespace, timestamp); + private final Resource s; + private final URI p; + private final Value o; + private final Resource[] c; + + public EstCardTask(final HttpServletRequest req, + final HttpServletResponse resp, final String namespace, + final long timestamp, final Resource s, final URI p, + final Value o, final Resource[] c) { - // Range count all statements matching that access path. - long rangeCount = 0; - if (c != null && c.length > 0) { - for (Resource r : c) { - rangeCount += conn.getSailConnection() - .getBigdataSail().getDatabase() - .getAccessPath(s, p, o, r) - .rangeCount(false/* exact */); - } - } else { - rangeCount += conn.getSailConnection() - .getBigdataSail().getDatabase() - .getAccessPath(s, p, o, (Resource) null) + super(req, resp, namespace, timestamp); + + this.s = s; + this.p = p; + this.o = o; + this.c = c; + + } + + @Override + public Void call() throws Exception { + + final long begin = System.currentTimeMillis(); + + BigdataSailRepositoryConnection conn = null; + try { + + conn = getQueryConnection(); + + // Range count all statements matching that access path. + long rangeCount = 0; + if (c != null && c.length > 0) { + for (Resource r : c) { + rangeCount += conn.getSailConnection().getBigdataSail() + .getDatabase().getAccessPath(s, p, o, r) .rangeCount(false/* exact */); } - - final long elapsed = System.currentTimeMillis() - begin; - - reportRangeCount(resp, rangeCount, elapsed); + } else { + rangeCount += conn.getSailConnection().getBigdataSail() + .getDatabase() + .getAccessPath(s, p, o, (Resource) null) + .rangeCount(false/* exact */); + } - } catch(Throwable t) { - - if(conn != null) - conn.rollback(); - - throw new RuntimeException(t); - - } finally { + final long elapsed = System.currentTimeMillis() - begin; - if (conn != null) - conn.close(); + reportRangeCount(resp, rangeCount, elapsed); - } + return null; - } catch (Throwable t) { + } finally { - throw BigdataRDFServlet.launderThrowable(t, resp, ""); + if (conn != null) { - } + conn.close(); - } catch (Exception ex) { + } - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); + } } - } + } // ESTCARD task. /** * Report on the contexts in use in the quads database. - * @param req - * @param resp */ private void doContexts(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { @@ -1161,58 +1149,86 @@ // HA Quorum in use, but quorum is not met. return; } - - final long begin = System.currentTimeMillis(); - - final String namespace = getNamespace(req); try { + + submitApiTask( + new GetContextsTask(req, resp, getNamespace(req), + getTimestamp(req))).get(); + } catch (Throwable t) { + + launderThrowable(t, resp, "GET-CONTEXTS"); + + } + + } + + /** + * Task to report the contexts used by a QUADS mode KB instance. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private static class GetContextsTask extends RestApiQueryTask<Void> { + + public GetContextsTask(final HttpServletRequest req, + final HttpServletResponse resp, final String namespace, + final long timestamp) { + + super(req, resp, namespace, timestamp); + + } + + @Override + public Void call() throws Exception { + + BigdataSailRepositoryConnection conn = null; try { - BigdataSailRepositoryConnection conn = null; + conn = getQueryConnection(); + + final StringWriter w = new StringWriter(); + + final RepositoryResult<Resource> it = conn.getContextIDs(); + try { - final long timestamp = getTimestamp(req); + final XMLBuilder t = new XMLBuilder(w); - conn = getBigdataRDFContext().getQueryConnection( - namespace, timestamp); + final Node root = t.root("contexts"); - final RepositoryResult<Resource> it = conn.getContextIDs(); - - final long elapsed = System.currentTimeMillis() - begin; - - reportContexts(resp, it, elapsed); + while (it.hasNext()) { - } catch(Throwable t) { - - if(conn != null) - conn.rollback(); - - throw new RuntimeException(t); - + root.node("context").attr("uri", it.next()).close(); + + } + + root.close(); + } finally { - if (conn != null) - conn.close(); + it.close(); } - } catch (Throwable t) { + buildResponse(resp, HTTP_OK, MIME_APPLICATION_XML, w.toString()); + + return null; - throw BigdataRDFServlet.launderThrowable(t, resp, ""); + } finally { - } + if (conn != null) { - } catch (Exception ex) { + conn.close(); + + } - // Will be rendered as an INTERNAL_ERROR. - throw new RuntimeException(ex); + } } - + } - + /** * Private API reports the shards against which the access path would * read. @@ -1234,10 +1250,6 @@ return; } - final long begin = System.currentTimeMillis(); - - final String namespace = getNamespace(req); - final boolean doRangeCount = true; final Resource s; final URI p; @@ -1259,173 +1271,201 @@ + o + ", c=" + c + ")"); try { + + submitApiTask( + new ShardsTask(req, resp, getNamespace(req), + getTimestamp(req), s, p, o, c, doRangeCount)).get(); - try { + } catch (Throwable t) { - BigdataSailRepositoryConnection conn = null; - try { + launderThrowable(t, resp, "SHARDS: access path: (s=" + s + ", p=" + + p + ", o=" + o + ", c=" + c + ")"); - final long timestamp = getTimestamp(req); + } - conn = getBigdataRDFContext().getQueryConnection( - namespace, timestamp); + } - final AccessPath<?> accessPath = (AccessPath<?>) conn - .getSailConnection().getBigdataSail().getDatabase() - .getAccessPath(s, p, o, c); - - final ClientIndexView ndx = (ClientIndexView) accessPath - .getIndex(); - - final String charset = "utf-8";// TODO from request. + /** + * Task to report on the SHARDS used by a scale-out deployment. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ + private static class ShardsTask extends RestApiQueryTask<Void> { - resp.setContentType(BigdataServlet.MIME_TEXT_HTML); - resp.setCharacterEncoding(charset); - final Writer w = resp.getWriter(); - try { + private final Resource s; + private final URI p; + private final Value o; + private final Resource c; + private final boolean doRangeCount; + + public ShardsTask(final HttpServletRequest req, + final HttpServletResponse resp, final String namespace, + final long timestamp, final Resource s, final URI p, + final Value o, final Resource c, final boolean doRangeCount) { - final HTMLBuilder doc = new HTMLBuilder(charset, w); - - XMLBuilder.Node current = doc.root("html"); - { - current = current.node("head"); - current.node("meta") - .attr("http-equiv", "Content-Type") - .attr("content", - "text/html;charset=utf-8") - .close(); - current.node("title") - .textNoEncode("bigdata®").close(); - current = current.close();// close the head. - } + super(req, resp, namespace, timestamp); - // open the body - current = current.node("body"); + this.s = s; + this.p = p; + this.o = o; + this.c = c; + this.doRangeCount = doRangeCount; + + } - final IBigdataFederation<?> fed = (IBigdataFederation<?>) getBigdataRDFContext() - .getIndexManager(); - - final Iterator<PartitionLocator> itr = ndx.locatorScan( - timestamp, accessPath.getFromKey(), - accessPath.getToKey(), false/* reverseScan */); + @Override + public Void call() throws Exception { - int nlocators = 0; + final long begin = System.currentTimeMillis(); + + BigdataSailRepositoryConnection conn = null; + try { - // The distinct hosts on which the shards are located. - final Map<String,AtomicInteger> hosts = new TreeMap<String,AtomicInteger>(); - ... [truncated message content] |