From: <mrp...@us...> - 2013-09-28 19:19:07
|
Revision: 7422 http://bigdata.svn.sourceforge.net/bigdata/?rev=7422&view=rev Author: mrpersonick Date: 2013-09-28 19:18:59 +0000 (Sat, 28 Sep 2013) Log Message: ----------- remote query cancellation Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java 2013-09-28 18:35:26 UTC (rev 7421) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java 2013-09-28 19:18:59 UTC (rev 7422) @@ -27,6 +27,8 @@ package com.bigdata.rdf.sparql.ast.service; +import java.util.UUID; + import org.apache.http.HttpResponse; import org.apache.http.impl.client.DefaultHttpClient; import org.openrdf.query.BindingSet; @@ -134,7 +136,11 @@ // opts.queryStr = queryStr; + final UUID queryId = UUID.randomUUID(); + o.addRequestParam("query", queryStr); + + o.addRequestParam("queryId", queryId.toString()); final RemoteRepository repo = new RemoteRepository(uriStr, new DefaultHttpClient(params.getClientConnectionManager()), @@ -152,14 +158,16 @@ try { - final HttpResponse resp = repo.doConnect(o); +// final HttpResponse resp = repo.doConnect(o); +// +// RemoteRepository.checkResponseCode(resp); +// +// queryResult = repo.tupleResults(resp); +// +//// queryResult = parseResults(checkResponseCode(doSparqlQuery(opts))); + + queryResult = repo.tupleResults(o, queryId); - RemoteRepository.checkResponseCode(resp); - - queryResult = repo.tupleResults(resp); - -// queryResult = parseResults(checkResponseCode(doSparqlQuery(opts))); - } finally { /* Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-28 18:35:26 UTC (rev 7421) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-28 19:18:59 UTC (rev 7422) @@ -151,7 +151,7 @@ * The name of a request parameter whose value is the {@link UUID} of a * top-level query. */ - public static final String QUERY_ID = "queryId"; + private static final String QUERY_ID = "queryId"; /** * The name of a request parameter used to cancel a running query. At least @@ -160,7 +160,7 @@ * * @see #QUERY_ID */ - public static final String CANCEL_QUERY = "cancelQuery"; + protected static final String CANCEL_QUERY = "cancelQuery"; /** * Request a snapshot of the journal (HA only). The snapshot will be written Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-28 18:35:26 UTC (rev 7421) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-28 19:18:59 UTC (rev 7422) @@ -99,8 +99,6 @@ import org.xml.sax.Attributes; import org.xml.sax.ext.DefaultHandler2; -import com.bigdata.rdf.sail.webapp.StatusServlet; - // Note: Do not import. Not part of the bigdata-client.jar // //import com.bigdata.rdf.sparql.ast.service.RemoteServiceOptions; @@ -329,13 +327,13 @@ opts.method = "GET"; - HttpResponse response = null; +// HttpResponse response = null; opts.setAcceptHeader(ConnectOptions.DEFAULT_GRAPH_ACCEPT_HEADER); - checkResponseCode(response = doConnect(opts)); +// checkResponseCode(response = doConnect(opts)); - return graphResults(response); + return graphResults(opts, null); } @@ -531,11 +529,14 @@ */ public void cancel(final UUID queryId) throws Exception { + if (queryId == null) + return; + final ConnectOptions opts = newUpdateConnectOptions(); - opts.addRequestParam(StatusServlet.CANCEL_QUERY); + opts.addRequestParam("cancelQuery"); - opts.addRequestParam(StatusServlet.QUERY_ID, queryId.toString()); + opts.addRequestParam("queryId", queryId.toString()); checkResponseCode(doConnect(opts)); @@ -1037,14 +1038,10 @@ @Override public GraphQueryResult evaluate() throws Exception { - HttpResponse response = null; - setupConnectOptions(); - checkResponseCode(response = doConnect(opts)); + return graphResults(opts, getQueryId()); - return graphResults(response); - } } @@ -1072,30 +1069,34 @@ @Override public boolean evaluate() throws Exception { + + setupConnectOptions(); - HttpResponse response = null; - try { + return booleanResults(opts, getQueryId()); - setupConnectOptions(); - - checkResponseCode(response = doConnect(opts)); - - return booleanResults(response); - - } finally { - - try { - - if (response != null) - EntityUtils.consume(response.getEntity()); - - } catch (Exception ex) { - - log.warn(ex); - - } - - } +// HttpResponse response = null; +// try { +// +// setupConnectOptions(); +// +// checkResponseCode(response = doConnect(opts)); +// +// return booleanResults(response); +// +// } finally { +// +// try { +// +// if (response != null) +// EntityUtils.consume(response.getEntity()); +// +// } catch (Exception ex) { +// +// log.warn(ex); +// +// } +// +// } } @@ -1535,90 +1536,6 @@ * @throws Exception * If anything goes wrong. */ - public TupleQueryResult tupleResults(final HttpResponse response) - throws Exception { - - HttpEntity entity = null; - BackgroundTupleResult result = null; - try { - - entity = response.getEntity(); - - final String contentType = entity.getContentType().getValue(); - - final MiniMime mimeType = new MiniMime(contentType); - - final TupleQueryResultFormat format = TupleQueryResultFormat - .forMIMEType(mimeType.getMimeType()); - - if (format == null) - throw new IOException( - "Could not identify format for service response: serviceURI=" - + sparqlEndpointURL + ", contentType=" + contentType - + " : response=" + getResponseBody(response)); - - final TupleQueryResultParserFactory parserFactory = TupleQueryResultParserRegistry - .getInstance().get(format); - - if (parserFactory == null) - throw new IOException( - "No parser for format for service response: serviceURI=" - + sparqlEndpointURL + ", contentType=" + contentType - + ", format=" + format + " : response=" - + getResponseBody(response)); - - final TupleQueryResultParser parser = parserFactory.getParser(); - - final InputStream in = entity.getContent(); - - result = new BackgroundTupleResult(parser, in, entity); - - executor.execute(result); - - final MapBindingSet bindings = new MapBindingSet(); - - final InsertBindingSetCursor cursor = - new InsertBindingSetCursor(result, bindings); - - final List<String> list = new ArrayList<String>( - result.getBindingNames()); - - return new TupleQueryResultImpl(list, cursor); - -// final TupleQueryResultBuilder handler = new TupleQueryResultBuilder(); -// -// parser.setTupleQueryResultHandler(handler); -// -// parser.parse(entity.getContent()); -// -// // done. -// return handler.getQueryResult(); - - } finally { - -// // terminate the http connection. -// response.disconnect(); - if (result == null) { - try { - EntityUtils.consume(entity); - } catch (IOException ex) { } - } - - } - - } - - /** - * Extracts the solutions from a SPARQL query. - * - * @param response - * The connection from which to read the results. - * - * @return The results. - * - * @throws Exception - * If anything goes wrong. - */ public TupleQueryResult tupleResults(final ConnectOptions opts, final UUID queryId) throws Exception { @@ -1693,20 +1610,24 @@ } @Override - public synchronized void close() throws QueryEvaluationException { + public void close() throws QueryEvaluationException { - super.close(); + try { - if (notDone.compareAndSet(true, false)) { - - try { - cancel(queryId); - } catch (Exception ex) { - throw new QueryEvaluationException(ex); - } - - } + super.close(); + } finally { + + if (notDone.compareAndSet(true, false)) { + + try { + cancel(queryId); + } catch (Exception ex) { } + + } + + } + }; }; @@ -1726,7 +1647,7 @@ // // terminate the http connection. // response.disconnect(); - if (result == null) { + if (entity != null && result == null) { try { EntityUtils.consume(entity); } catch (IOException ex) { } @@ -1753,12 +1674,18 @@ * @throws Exception * If anything goes wrong. */ - public GraphQueryResult graphResults(final HttpResponse response) throws Exception { + public GraphQueryResult graphResults(final ConnectOptions opts, final UUID queryId) + throws Exception { + HttpResponse response = null; HttpEntity entity = null; BackgroundGraphResult result = null; try { + response = doConnect(opts); + + checkResponseCode(response); + entity = response.getEntity(); final String baseURI = ""; @@ -1808,11 +1735,53 @@ // charset=application/rdf+xml } - result = new BackgroundGraphResult( - parser, entity.getContent(), charset, baseURI, entity); + final BackgroundGraphResult tmp = new BackgroundGraphResult( + parser, entity.getContent(), charset, baseURI, entity) { + + final AtomicBoolean notDone = new AtomicBoolean(true); + + @Override + public boolean hasNext() throws QueryEvaluationException { + + final boolean hasNext = super.hasNext(); + + if (hasNext == false) { + + notDone.set(false); + + } + + return hasNext; + + } + + @Override + public void close() throws QueryEvaluationException { + + try { + + super.close(); + + } finally { + + if (notDone.compareAndSet(true, false)) { + + try { + cancel(queryId); + } catch (Exception ex) { } + + } + + } + + }; + + }; - executor.execute(result); + executor.execute(tmp); + result = tmp; + return result; // final Graph g = new GraphImpl(); @@ -1828,10 +1797,14 @@ // // terminate the http connection. // response.disconnect(); - if (result == null) { + if (response != null && result == null) { try { EntityUtils.consume(entity); } catch (IOException ex) { } + + try { + cancel(queryId); + } catch (Exception ex) { } } } @@ -1851,11 +1824,17 @@ * If anything goes wrong, including if the result set does not * encode a single boolean value. */ - protected boolean booleanResults(final HttpResponse response) throws Exception { + protected boolean booleanResults(final ConnectOptions opts, final UUID queryId) throws Exception { + HttpResponse response = null; HttpEntity entity = null; + Boolean result = null; try { + response = doConnect(opts); + + checkResponseCode(response); + entity = response.getEntity(); final String contentType = entity.getContentType().getValue(); @@ -1879,7 +1858,7 @@ final BooleanQueryResultParser parser = factory.getParser(); - final boolean result = parser.parse(entity.getContent()); + result = parser.parse(entity.getContent()); return result; @@ -1887,9 +1866,15 @@ // // terminate the http connection. // response.disconnect(); - try { - EntityUtils.consume(entity); - } catch (IOException ex) { } + if (result == null) { + try { + EntityUtils.consume(entity); + } catch (IOException ex) { } + + try { + cancel(queryId); + } catch (Exception ex) { } + } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java 2013-09-28 18:35:26 UTC (rev 7421) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java 2013-09-28 19:18:59 UTC (rev 7422) @@ -171,27 +171,29 @@ opts.method = "GET"; - HttpResponse response = null; - GraphQueryResult result = null; +// HttpResponse response = null; +// GraphQueryResult result = null; opts.setAcceptHeader(ConnectOptions.DEFAULT_GRAPH_ACCEPT_HEADER); - try { - // check response in try. - checkResponseCode(response = doConnect(opts)); - - // return asynchronous parse of result. - return result = graphResults(response); - - } finally { - if (result == null) { - // Consume entity if bad response. - try { - EntityUtils.consume(response.getEntity()); - } catch (IOException ex) { - } - } - } + return graphResults(opts, null); + +// try { +// // check response in try. +// checkResponseCode(response = doConnect(opts)); +// +// // return asynchronous parse of result. +// return result = graphResults(response); +// +// } finally { +// if (result == null) { +// // Consume entity if bad response. +// try { +// EntityUtils.consume(response.getEntity()); +// } catch (IOException ex) { +// } +// } +// } } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |