From: <tho...@us...> - 2014-09-22 14:54:42
|
Revision: 8656 http://sourceforge.net/p/bigdata/code/8656 Author: thompsonbry Date: 2014-09-22 14:54:35 +0000 (Mon, 22 Sep 2014) Log Message: ----------- I have integrated a high concurrency LRU into the QueryEngine. This is used to track the UUID of an operation that has been CANCELed through the REST API but which is neither (a) currently running; nor (b) recently done. In this case, the UUID is entered into the new "pendingCancelLRU". The pendingCancelLRU is checked before we start a new query (all code paths) and before we start a SPARQL UPDATE operation (REST API). If the operation is found in the pendingCancelLRU, then the Future of that operation is cancelled. This occurs before the Future is submitted for evaluation to avoid a race in which the operation might complete before it was cancelled. Changes are to: - QueryEngine: added pendingCancelLRU and associated access methods. The pendingCancelLRU is checked for QUERY in startEval(). - BigdataRDFContext: Handle pending CANCEL of UPDATE requests. - StatusServlet: Queue UUID in the pendingCancelLRU iff not found when CANCEL request is processed. The AST, SPARQL, and NSS test suites are good locally (but see #1015 for memory leaks in the test suite). Committed to CI. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-09-19 12:26:56 UTC (rev 8655) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-09-22 14:54:35 UTC (rev 8656) @@ -64,6 +64,7 @@ import com.bigdata.btree.BTree; import com.bigdata.btree.IndexSegment; import com.bigdata.btree.view.FusedView; +import com.bigdata.cache.ConcurrentWeakValueCache; import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.counters.CounterSet; import com.bigdata.counters.ICounterSetAccess; @@ -71,6 +72,7 @@ import com.bigdata.journal.IIndexManager; import com.bigdata.journal.Journal; import com.bigdata.rawstore.IRawStore; +import com.bigdata.rdf.internal.constraints.TrueBOp; import com.bigdata.rdf.sail.webapp.client.DefaultClientConnectionManagerFactory; import com.bigdata.resources.IndexManager; import com.bigdata.service.IBigdataFederation; @@ -535,7 +537,7 @@ /** * The currently executing queries. */ - final private ConcurrentHashMap<UUID/* queryId */, AbstractRunningQuery> runningQueries = new ConcurrentHashMap<UUID, AbstractRunningQuery>(); + private final ConcurrentHashMap<UUID/* queryId */, AbstractRunningQuery> runningQueries = new ConcurrentHashMap<UUID, AbstractRunningQuery>(); /** * LRU cache used to handle problems with asynchronous termination of @@ -554,7 +556,7 @@ * enough that we can not have a false cache miss on a system which is * heavily loaded by a bunch of light queries. */ - private LinkedHashMap<UUID, IHaltable<Void>> doneQueries = new LinkedHashMap<UUID,IHaltable<Void>>( + private final LinkedHashMap<UUID, IHaltable<Void>> doneQueries = new LinkedHashMap<UUID,IHaltable<Void>>( 16/* initialCapacity */, .75f/* loadFactor */, true/* accessOrder */) { private static final long serialVersionUID = 1L; @@ -568,6 +570,92 @@ }; /** + * A high concurrency cache operating as an LRU designed to close a data + * race between the asynchronous start of a submitted query or update + * operation and the explicit asynchronous CANCEL of that operation using + * its pre-assigned {@link UUID}. + * <p> + * When a CANCEL request is received, we probe both the + * {@link #runningQueries} and the {@link #doneQueries}. If no operation is + * associated with that request, then we probe the running UPDATE + * operations. Finally, if no such operation was discovered, then the + * {@link UUID} of the operation to be cancelled is entered into this + * collection. + * <p> + * Before a query starts, we consult the {@link #pendingCancelLRU}. If the + * {@link UUID} of the query is discovered, then the query is cancelled + * rather than run. + * <p> + * Note: The capacity of the backing hard reference queue is quite small. + * {@link UUID}s are only entered into this collection if a CANCEL request + * is asynchronously received either (a) before; or (b) long enough after a + * query or update is executed that is not not found in either the running + * queries map or the recently done queries map. + * + * TODO There are some cases that are not covered by this. First, we do not + * have {@link UUID}s for all REST API methods and thus they can not all be + * cancelled. If we allowed an HTTP header to specify the UUID of the + * request, then we could associate a UUID with all requests. The ongoing + * refactor to support clean interrupt of NSS requests (#753) and the + * ongoing refactor to support concurrent unisolated operations against the + * same journal (#566) will provide us with the mechanisms to identify all + * such operations so we can check their assigned UUIDs and cancel them when + * requested. + * + * @see <a href="http://trac.bigdata.com/ticket/899"> REST API Query + * Cancellation </a> + * @see <a href="http://trac.bigdata.com/ticket/753"> HA doLocalAbort() + * should interrupt NSS requests and AbstractTasks </a> + * @see <a href="http://trac.bigdata.com/ticket/566"> Concurrent unisolated + * operations against multiple KBs on the same Journal </a> + * @see #startEval(UUID, PipelineOp, Map, IChunkMessage) + */ + private final ConcurrentWeakValueCache<UUID, UUID> pendingCancelLRU = new ConcurrentWeakValueCache<>( + 50/* queueCapacity (SWAG, but see above) */); + + /** + * Add a query {@link UUID} to the LRU of query identifiers for which we + * have received a CANCEL request, but were unable to find a running QUERY, + * recently done query, or running UPDATE request. + * + * @param queryId + * The UUID of the operation to be cancelled. + * + * @see <a href="http://trac.bigdata.com/ticket/899"> REST API Query + * Cancellation </a> + */ + public void addPendingCancel(final UUID queryId) { + + if (queryId == null) + throw new IllegalArgumentException(); + + pendingCancelLRU.putIfAbsent(queryId, queryId); + + } + + /** + * Return <code>true</code> iff the {@link UUID} is the the collection of + * {@link UUID}s for which we have already received a CANCEL request. + * <p> + * Note: The {@link UUID} is removed from the pending cancel collection as a + * side-effect. + * + * @param queryId + * The {@link UUID} of the operation. + * + * @return <code>true</code> if that operation has already been marked for + * cancellation. + */ + public boolean pendingCancel(final UUID queryId) { + + if (queryId == null) + throw new IllegalArgumentException(); + + return pendingCancelLRU.remove(queryId) != null; + + } + + /** * A queue of {@link ChunkedRunningQuery}s having binding set chunks available for * consumption. * @@ -1695,6 +1783,22 @@ // if (c != null) // c.startCount.increment(); + if (pendingCancelLRU.containsKey(runningQuery.getQueryId())) { + /* + * The query was asynchronously scheduled for cancellation. + */ + + // Cancel the query. + runningQuery.cancel(true/* mayInterruptIfRunning */); + + // Remove from the CANCEL LRU. + pendingCancelLRU.remove(runningQuery.getQueryId()); + + // Return the query. It has already been cancelled. + return runningQuery; + + } + // notify query start runningQuery.startQuery(msg); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-09-19 12:26:56 UTC (rev 8655) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-09-22 14:54:35 UTC (rev 8656) @@ -75,6 +75,7 @@ import com.bigdata.BigdataStatics; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; +import com.bigdata.bop.fed.QueryEngineFactory; import com.bigdata.counters.CAT; import com.bigdata.io.NullOutputStream; import com.bigdata.journal.IIndexManager; @@ -1015,6 +1016,30 @@ m_queries.put(queryId, r); m_queries2.put(queryId2, r); + /** + * Handle data races in CANCEL of an UPDATE operation whose + * cancellation was requested before it began to execute. + * + * @see <a href="http://trac.bigdata.com/ticket/899"> REST API Query + * Cancellation </a> + */ + { + + final QueryEngine queryEngine = QueryEngineFactory + .getQueryController(getIndexManager()); + + if (queryEngine.pendingCancel(queryId2)) { + + /* + * There is a pending CANCEL for this UPDATE request, so + * cancel it now. + */ + updateFuture.cancel(true/* mayInterruptIfRunning */); + + } + + } + return update; } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-09-19 12:26:56 UTC (rev 8655) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-09-22 14:54:35 UTC (rev 8656) @@ -234,17 +234,11 @@ * * @throws IOException * + * @see <a href="http://trac.bigdata.com/ticket/899"> REST API Query + * Cancellation </a> + * * FIXME GROUP COMMIT: Review cancellation and leader fail * scenarios. - * - * FIXME CANCEL: A remote client can not act to cancel a request - * that is in the queue until it begins to execute. This is - * because the UUID is not assigned until the request begins to - * execute. This is true for both SPARQL QUERY and SPARQL UPDATE - * requests. We need to track the CANCEL requests on a LRU and - * apply them if we observe the query / update arriving after - * the CANCEL. See <a href="http://trac.bigdata.com/ticket/988" - * > REST API cancellation of queries</a> */ static void doCancelQuery(final HttpServletRequest req, final HttpServletResponse resp, final IIndexManager indexManager, @@ -276,6 +270,7 @@ if (!tryCancelQuery(queryEngine, queryId)) { if (!tryCancelUpdate(context, queryId)) { + queryEngine.addPendingCancel(queryId); if (log.isInfoEnabled()) { log.info("No such QUERY or UPDATE: " + queryId); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |