From: <mrp...@us...> - 2013-09-28 18:23:33
|
Revision: 7419 http://bigdata.svn.sourceforge.net/bigdata/?rev=7419&view=rev Author: mrpersonick Date: 2013-09-28 18:23:26 +0000 (Sat, 28 Sep 2013) Log Message: ----------- remote query cancellation - intermediate commit 1 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/IPreparedSparqlUpdate.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-26 14:52:49 UTC (rev 7418) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-28 18:23:26 UTC (rev 7419) @@ -151,7 +151,7 @@ * The name of a request parameter whose value is the {@link UUID} of a * top-level query. */ - private static final String QUERY_ID = "queryId"; + public static final String QUERY_ID = "queryId"; /** * The name of a request parameter used to cancel a running query. At least @@ -160,7 +160,7 @@ * * @see #QUERY_ID */ - static final String CANCEL_QUERY = "cancelQuery"; + public static final String CANCEL_QUERY = "cancelQuery"; /** * Request a snapshot of the journal (HA only). The snapshot will be written Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/IPreparedSparqlUpdate.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/IPreparedSparqlUpdate.java 2013-09-26 14:52:49 UTC (rev 7418) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/IPreparedSparqlUpdate.java 2013-09-28 18:23:26 UTC (rev 7419) @@ -27,6 +27,8 @@ package com.bigdata.rdf.sail.webapp.client; +import java.util.UUID; + /** * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ @@ -34,5 +36,7 @@ public interface IPreparedSparqlUpdate { void evaluate() throws Exception; + + UUID getQueryId(); } \ No newline at end of file Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-26 14:52:49 UTC (rev 7418) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-28 18:23:26 UTC (rev 7419) @@ -74,6 +74,7 @@ import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.GraphQueryResult; +import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.TupleQueryResult; import org.openrdf.query.TupleQueryResultHandlerBase; import org.openrdf.query.impl.MapBindingSet; @@ -97,6 +98,8 @@ import org.xml.sax.Attributes; import org.xml.sax.ext.DefaultHandler2; +import com.bigdata.rdf.sail.webapp.StatusServlet; + // Note: Do not import. Not part of the bigdata-client.jar // //import com.bigdata.rdf.sparql.ast.service.RemoteServiceOptions; @@ -529,9 +532,9 @@ final ConnectOptions opts = newUpdateConnectOptions(); - opts.addRequestParam("cancelQuery"); + opts.addRequestParam(StatusServlet.CANCEL_QUERY); - opts.addRequestParam("queryId", queryId.toString()); + opts.addRequestParam(StatusServlet.QUERY_ID, queryId.toString()); checkResponseCode(doConnect(opts)); @@ -1000,32 +1003,13 @@ } + @Override public TupleQueryResult evaluate() throws Exception { - HttpResponse response = null; -// try { - setupConnectOptions(); - checkResponseCode(response = doConnect(opts)); + return tupleResults(opts, getQueryId()); - return tupleResults(response); - -// } finally { -// -// try { -// -// if (response != null) -// EntityUtils.consume(response.getEntity()); -// -// } catch (Exception ex) { -// -// log.warn(ex); -// -// } -// -// } - } } @@ -1556,7 +1540,7 @@ HttpEntity entity = null; BackgroundTupleResult result = null; try { - + entity = response.getEntity(); final String contentType = entity.getContentType().getValue(); @@ -1624,6 +1608,151 @@ } /** + * Extracts the solutions from a SPARQL query. + * + * @param response + * The connection from which to read the results. + * + * @return The results. + * + * @throws Exception + * If anything goes wrong. + */ + public TupleQueryResult tupleResults(final ConnectOptions opts, final UUID queryId) + throws Exception { + + HttpEntity entity = null; + BackgroundTupleResult result = null; + boolean needToCancel = false; + try { + + /* + * If we put this after the doConnect we risk an interruption in + * between the doConnect and the setting the flag to true, which + * would result in a query running on the server that should have + * been canceled. + */ + needToCancel = true; + + final HttpResponse response = doConnect(opts); + + checkResponseCode(response); + + entity = response.getEntity(); + + final String contentType = entity.getContentType().getValue(); + + final MiniMime mimeType = new MiniMime(contentType); + + final TupleQueryResultFormat format = TupleQueryResultFormat + .forMIMEType(mimeType.getMimeType()); + + if (format == null) + throw new IOException( + "Could not identify format for service response: serviceURI=" + + sparqlEndpointURL + ", contentType=" + contentType + + " : response=" + getResponseBody(response)); + + final TupleQueryResultParserFactory parserFactory = TupleQueryResultParserRegistry + .getInstance().get(format); + + if (parserFactory == null) + throw new IOException( + "No parser for format for service response: serviceURI=" + + sparqlEndpointURL + ", contentType=" + contentType + + ", format=" + format + " : response=" + + getResponseBody(response)); + + final TupleQueryResultParser parser = parserFactory.getParser(); + + final InputStream in = entity.getContent(); + + result = new BackgroundTupleResult(parser, in, entity); + + executor.execute(result); + + final MapBindingSet bindings = new MapBindingSet(); + + final InsertBindingSetCursor cursor = + new InsertBindingSetCursor(result, bindings); + + final List<String> list = new ArrayList<String>( + result.getBindingNames()); + + final TupleQueryResultImpl tqrImpl = new TupleQueryResultImpl(list, cursor) { + + transient boolean done = false; + + @Override + public boolean hasNext() throws QueryEvaluationException { + + final boolean hasNext = super.hasNext(); + + if (hasNext == false) { + + done = true; + + } + + return hasNext; + + } + + @Override + public synchronized void close() throws QueryEvaluationException { + + super.close(); + + if (!done) { + + try { + cancel(queryId); + } catch (Exception ex) { + throw new QueryEvaluationException(ex); + } + + done = true; + + } + + }; + + }; + + needToCancel = false; + + return tqrImpl; + +// final TupleQueryResultBuilder handler = new TupleQueryResultBuilder(); +// +// parser.setTupleQueryResultHandler(handler); +// +// parser.parse(entity.getContent()); +// +// // done. +// return handler.getQueryResult(); + + } finally { + +// // terminate the http connection. +// response.disconnect(); + if (result == null) { + try { + EntityUtils.consume(entity); + } catch (IOException ex) { } + } + + if (needToCancel) { + try { + cancel(queryId); + } catch(Exception ex) { } + } + + } + + } + + /** * Builds a graph from an RDF result set (statements, not binding sets). * * @param response This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |