|
From: <mrp...@us...> - 2013-09-28 19:19:07
|
Revision: 7422
http://bigdata.svn.sourceforge.net/bigdata/?rev=7422&view=rev
Author: mrpersonick
Date: 2013-09-28 19:18:59 +0000 (Sat, 28 Sep 2013)
Log Message:
-----------
remote query cancellation
Modified Paths:
--------------
branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java
branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java 2013-09-28 18:35:26 UTC (rev 7421)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/service/RemoteServiceCallImpl.java 2013-09-28 19:18:59 UTC (rev 7422)
@@ -27,6 +27,8 @@
package com.bigdata.rdf.sparql.ast.service;
+import java.util.UUID;
+
import org.apache.http.HttpResponse;
import org.apache.http.impl.client.DefaultHttpClient;
import org.openrdf.query.BindingSet;
@@ -134,7 +136,11 @@
// opts.queryStr = queryStr;
+ final UUID queryId = UUID.randomUUID();
+
o.addRequestParam("query", queryStr);
+
+ o.addRequestParam("queryId", queryId.toString());
final RemoteRepository repo = new RemoteRepository(uriStr,
new DefaultHttpClient(params.getClientConnectionManager()),
@@ -152,14 +158,16 @@
try {
- final HttpResponse resp = repo.doConnect(o);
+// final HttpResponse resp = repo.doConnect(o);
+//
+// RemoteRepository.checkResponseCode(resp);
+//
+// queryResult = repo.tupleResults(resp);
+//
+//// queryResult = parseResults(checkResponseCode(doSparqlQuery(opts)));
+
+ queryResult = repo.tupleResults(o, queryId);
- RemoteRepository.checkResponseCode(resp);
-
- queryResult = repo.tupleResults(resp);
-
-// queryResult = parseResults(checkResponseCode(doSparqlQuery(opts)));
-
} finally {
/*
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-28 18:35:26 UTC (rev 7421)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2013-09-28 19:18:59 UTC (rev 7422)
@@ -151,7 +151,7 @@
* The name of a request parameter whose value is the {@link UUID} of a
* top-level query.
*/
- public static final String QUERY_ID = "queryId";
+ private static final String QUERY_ID = "queryId";
/**
* The name of a request parameter used to cancel a running query. At least
@@ -160,7 +160,7 @@
*
* @see #QUERY_ID
*/
- public static final String CANCEL_QUERY = "cancelQuery";
+ protected static final String CANCEL_QUERY = "cancelQuery";
/**
* Request a snapshot of the journal (HA only). The snapshot will be written
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-28 18:35:26 UTC (rev 7421)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepository.java 2013-09-28 19:18:59 UTC (rev 7422)
@@ -99,8 +99,6 @@
import org.xml.sax.Attributes;
import org.xml.sax.ext.DefaultHandler2;
-import com.bigdata.rdf.sail.webapp.StatusServlet;
-
// Note: Do not import. Not part of the bigdata-client.jar
//
//import com.bigdata.rdf.sparql.ast.service.RemoteServiceOptions;
@@ -329,13 +327,13 @@
opts.method = "GET";
- HttpResponse response = null;
+// HttpResponse response = null;
opts.setAcceptHeader(ConnectOptions.DEFAULT_GRAPH_ACCEPT_HEADER);
- checkResponseCode(response = doConnect(opts));
+// checkResponseCode(response = doConnect(opts));
- return graphResults(response);
+ return graphResults(opts, null);
}
@@ -531,11 +529,14 @@
*/
public void cancel(final UUID queryId) throws Exception {
+ if (queryId == null)
+ return;
+
final ConnectOptions opts = newUpdateConnectOptions();
- opts.addRequestParam(StatusServlet.CANCEL_QUERY);
+ opts.addRequestParam("cancelQuery");
- opts.addRequestParam(StatusServlet.QUERY_ID, queryId.toString());
+ opts.addRequestParam("queryId", queryId.toString());
checkResponseCode(doConnect(opts));
@@ -1037,14 +1038,10 @@
@Override
public GraphQueryResult evaluate() throws Exception {
- HttpResponse response = null;
-
setupConnectOptions();
- checkResponseCode(response = doConnect(opts));
+ return graphResults(opts, getQueryId());
- return graphResults(response);
-
}
}
@@ -1072,30 +1069,34 @@
@Override
public boolean evaluate() throws Exception {
+
+ setupConnectOptions();
- HttpResponse response = null;
- try {
+ return booleanResults(opts, getQueryId());
- setupConnectOptions();
-
- checkResponseCode(response = doConnect(opts));
-
- return booleanResults(response);
-
- } finally {
-
- try {
-
- if (response != null)
- EntityUtils.consume(response.getEntity());
-
- } catch (Exception ex) {
-
- log.warn(ex);
-
- }
-
- }
+// HttpResponse response = null;
+// try {
+//
+// setupConnectOptions();
+//
+// checkResponseCode(response = doConnect(opts));
+//
+// return booleanResults(response);
+//
+// } finally {
+//
+// try {
+//
+// if (response != null)
+// EntityUtils.consume(response.getEntity());
+//
+// } catch (Exception ex) {
+//
+// log.warn(ex);
+//
+// }
+//
+// }
}
@@ -1535,90 +1536,6 @@
* @throws Exception
* If anything goes wrong.
*/
- public TupleQueryResult tupleResults(final HttpResponse response)
- throws Exception {
-
- HttpEntity entity = null;
- BackgroundTupleResult result = null;
- try {
-
- entity = response.getEntity();
-
- final String contentType = entity.getContentType().getValue();
-
- final MiniMime mimeType = new MiniMime(contentType);
-
- final TupleQueryResultFormat format = TupleQueryResultFormat
- .forMIMEType(mimeType.getMimeType());
-
- if (format == null)
- throw new IOException(
- "Could not identify format for service response: serviceURI="
- + sparqlEndpointURL + ", contentType=" + contentType
- + " : response=" + getResponseBody(response));
-
- final TupleQueryResultParserFactory parserFactory = TupleQueryResultParserRegistry
- .getInstance().get(format);
-
- if (parserFactory == null)
- throw new IOException(
- "No parser for format for service response: serviceURI="
- + sparqlEndpointURL + ", contentType=" + contentType
- + ", format=" + format + " : response="
- + getResponseBody(response));
-
- final TupleQueryResultParser parser = parserFactory.getParser();
-
- final InputStream in = entity.getContent();
-
- result = new BackgroundTupleResult(parser, in, entity);
-
- executor.execute(result);
-
- final MapBindingSet bindings = new MapBindingSet();
-
- final InsertBindingSetCursor cursor =
- new InsertBindingSetCursor(result, bindings);
-
- final List<String> list = new ArrayList<String>(
- result.getBindingNames());
-
- return new TupleQueryResultImpl(list, cursor);
-
-// final TupleQueryResultBuilder handler = new TupleQueryResultBuilder();
-//
-// parser.setTupleQueryResultHandler(handler);
-//
-// parser.parse(entity.getContent());
-//
-// // done.
-// return handler.getQueryResult();
-
- } finally {
-
-// // terminate the http connection.
-// response.disconnect();
- if (result == null) {
- try {
- EntityUtils.consume(entity);
- } catch (IOException ex) { }
- }
-
- }
-
- }
-
- /**
- * Extracts the solutions from a SPARQL query.
- *
- * @param response
- * The connection from which to read the results.
- *
- * @return The results.
- *
- * @throws Exception
- * If anything goes wrong.
- */
public TupleQueryResult tupleResults(final ConnectOptions opts, final UUID queryId)
throws Exception {
@@ -1693,20 +1610,24 @@
}
@Override
- public synchronized void close() throws QueryEvaluationException {
+ public void close() throws QueryEvaluationException {
- super.close();
+ try {
- if (notDone.compareAndSet(true, false)) {
-
- try {
- cancel(queryId);
- } catch (Exception ex) {
- throw new QueryEvaluationException(ex);
- }
-
- }
+ super.close();
+ } finally {
+
+ if (notDone.compareAndSet(true, false)) {
+
+ try {
+ cancel(queryId);
+ } catch (Exception ex) { }
+
+ }
+
+ }
+
};
};
@@ -1726,7 +1647,7 @@
// // terminate the http connection.
// response.disconnect();
- if (result == null) {
+ if (entity != null && result == null) {
try {
EntityUtils.consume(entity);
} catch (IOException ex) { }
@@ -1753,12 +1674,18 @@
* @throws Exception
* If anything goes wrong.
*/
- public GraphQueryResult graphResults(final HttpResponse response) throws Exception {
+ public GraphQueryResult graphResults(final ConnectOptions opts, final UUID queryId)
+ throws Exception {
+ HttpResponse response = null;
HttpEntity entity = null;
BackgroundGraphResult result = null;
try {
+ response = doConnect(opts);
+
+ checkResponseCode(response);
+
entity = response.getEntity();
final String baseURI = "";
@@ -1808,11 +1735,53 @@
// charset=application/rdf+xml
}
- result = new BackgroundGraphResult(
- parser, entity.getContent(), charset, baseURI, entity);
+ final BackgroundGraphResult tmp = new BackgroundGraphResult(
+ parser, entity.getContent(), charset, baseURI, entity) {
+
+ final AtomicBoolean notDone = new AtomicBoolean(true);
+
+ @Override
+ public boolean hasNext() throws QueryEvaluationException {
+
+ final boolean hasNext = super.hasNext();
+
+ if (hasNext == false) {
+
+ notDone.set(false);
+
+ }
+
+ return hasNext;
+
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+
+ try {
+
+ super.close();
+
+ } finally {
+
+ if (notDone.compareAndSet(true, false)) {
+
+ try {
+ cancel(queryId);
+ } catch (Exception ex) { }
+
+ }
+
+ }
+
+ };
+
+ };
- executor.execute(result);
+ executor.execute(tmp);
+ result = tmp;
+
return result;
// final Graph g = new GraphImpl();
@@ -1828,10 +1797,14 @@
// // terminate the http connection.
// response.disconnect();
- if (result == null) {
+ if (response != null && result == null) {
try {
EntityUtils.consume(entity);
} catch (IOException ex) { }
+
+ try {
+ cancel(queryId);
+ } catch (Exception ex) { }
}
}
@@ -1851,11 +1824,17 @@
* If anything goes wrong, including if the result set does not
* encode a single boolean value.
*/
- protected boolean booleanResults(final HttpResponse response) throws Exception {
+ protected boolean booleanResults(final ConnectOptions opts, final UUID queryId) throws Exception {
+ HttpResponse response = null;
HttpEntity entity = null;
+ Boolean result = null;
try {
+ response = doConnect(opts);
+
+ checkResponseCode(response);
+
entity = response.getEntity();
final String contentType = entity.getContentType().getValue();
@@ -1879,7 +1858,7 @@
final BooleanQueryResultParser parser = factory.getParser();
- final boolean result = parser.parse(entity.getContent());
+ result = parser.parse(entity.getContent());
return result;
@@ -1887,9 +1866,15 @@
// // terminate the http connection.
// response.disconnect();
- try {
- EntityUtils.consume(entity);
- } catch (IOException ex) { }
+ if (result == null) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (IOException ex) { }
+
+ try {
+ cancel(queryId);
+ } catch (Exception ex) { }
+ }
}
Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java
===================================================================
--- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java 2013-09-28 18:35:26 UTC (rev 7421)
+++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/client/RemoteRepositoryManager.java 2013-09-28 19:18:59 UTC (rev 7422)
@@ -171,27 +171,29 @@
opts.method = "GET";
- HttpResponse response = null;
- GraphQueryResult result = null;
+// HttpResponse response = null;
+// GraphQueryResult result = null;
opts.setAcceptHeader(ConnectOptions.DEFAULT_GRAPH_ACCEPT_HEADER);
- try {
- // check response in try.
- checkResponseCode(response = doConnect(opts));
-
- // return asynchronous parse of result.
- return result = graphResults(response);
-
- } finally {
- if (result == null) {
- // Consume entity if bad response.
- try {
- EntityUtils.consume(response.getEntity());
- } catch (IOException ex) {
- }
- }
- }
+ return graphResults(opts, null);
+
+// try {
+// // check response in try.
+// checkResponseCode(response = doConnect(opts));
+//
+// // return asynchronous parse of result.
+// return result = graphResults(response);
+//
+// } finally {
+// if (result == null) {
+// // Consume entity if bad response.
+// try {
+// EntityUtils.consume(response.getEntity());
+// } catch (IOException ex) {
+// }
+// }
+// }
}
/**
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|