From: <tho...@us...> - 2013-11-11 21:48:22
|
Revision: 7530 http://bigdata.svn.sourceforge.net/bigdata/?rev=7530&view=rev Author: thompsonbry Date: 2013-11-11 21:48:16 +0000 (Mon, 11 Nov 2013) Log Message: ----------- doABCMultiLoadFollowerReads2() was failing to cancel each of the submitted futures. It looks like it was simply not refactored correctly when creating it from another test. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-11-11 18:32:53 UTC (rev 7529) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-11-11 21:48:16 UTC (rev 7530) @@ -2382,151 +2382,117 @@ } /** - * Similar to multitransaction but rather than a number of updates following a load it is simply a number of loads - * followed by queries on the folowers that are checkd for consistency. + * Similar to multitransaction but rather than a number of updates following + * a load it is simply a number of loads followed by queries on the folowers + * that are checked for consistency. * * @param loads - * @param transactionDelay - * @throws Exception + * The number of LOAD operations to perform. + * @param largeLoad + * If true, the load a large file. */ protected void doABCMultiLoadFollowerReads2(final int nTransactions, final boolean largeLoad) throws Exception { -// try { + // Start all services. + final ABC services = new ABC(true/* sequential */); - // Start all services. - final ABC services = new ABC(true/* sequential */); + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); - // Wait for a quorum meet. - final long token = quorum.awaitQuorum(awaitQuorumTimeout, - TimeUnit.MILLISECONDS); + assertEquals(token, awaitFullyMetQuorum()); - assertEquals(token, awaitFullyMetQuorum()); + final HAGlue leader = quorum.getClient().getLeader(token); - final HAGlue leader = quorum.getClient().getLeader(token); + // Verify assumption in this test. + assertEquals(leader, services.serverA); - // Verify assumption in this test. - assertEquals(leader, services.serverA); + // Wait until all services are "HA" ready. + leader.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + services.serverB + .awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + services.serverC + .awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); - // Wait until all services are "HA" ready. - leader.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); - services.serverB.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); - services.serverC.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + /* + * Now create a Callable for the final followes to repeatedly query + * against the then current commit point. The task returns the #of + * queries that were executed. The task will run until we stop issuing + * UPDATE requests. + */ + class QueryTask implements Callable<Long> { - /* - * Now create a Callable for the final followes to repeatedly query - * against the then current commit point. The task returns the #of - * queries that were executed. The task will run until we stop - * issuing UPDATE requests. - */ - class QueryTask implements Callable<Long> { - - /** The service to query. */ - private final HAGlue haGlue; - -// /** -// * The SPARQL end point for that service. -// */ -// final RemoteRepository remoteRepo; -// -// /** -// * Format for timestamps that may be used to correlate with the -// * HA log messages. -// */ -// final SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss,SSS"); + /** The service to query. */ + private final HAGlue haGlue; - /** - * @param haGlue - * The service to query. - * - * @throws IOException - */ - public QueryTask(final HAGlue haGlue) throws IOException { - -// this.haGlue = haGlue; - - /* - * Run query against one of the followers. - * - * 6537 queries for 2000 transactions (leader) - * - * 10109 queries for 2000 transactions (follower) - */ -// remoteRepo = getRemoteRepository(haGlue); - this.haGlue = haGlue; + public QueryTask(final HAGlue haGlue) throws IOException { - } + this.haGlue = haGlue; - public Long call() throws Exception { - - return getCountStar(haGlue); - -// final String query = "SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"; -// -// // Verify quorum is still valid. -// quorum.assertQuorum(token); -// -// // Run query. -// final TupleQueryResult result = remoteRepo -// .prepareTupleQuery(query).evaluate(); -// -// final BindingSet bs = result.next(); -// -// // done. -// final Value v = bs.getBinding("count").getValue(); -// -// return (long) ((org.openrdf.model.Literal) v).intValue(); - } - - }; + } - final FutureTask<Long> queryTaskFuture = new FutureTask<Long>( - new QueryTask(services.serverC)); + @Override + public Long call() throws Exception { - /* - * Sequentially run repeated loads and after each load submit queries on all services, - * checking for consistency. - */ + return getCountStar(haGlue); + } + + } // class QueryTask + + /* + * Sequentially run repeated loads and after each load submit queries on + * all services, checking for consistency. + */ + + for (int t = 0; t < nTransactions; t++) { + + // Create tasks, but do not execute yet. + final FutureTask<Void> loadTaskFuture = new FutureTask<Void>( + new LargeLoadTask(token, largeLoad/* reallyLargeLoad */)); + final FutureTask<Long> qAFuture = new FutureTask<Long>( + new QueryTask(services.serverA)); + final FutureTask<Long> qBFuture = new FutureTask<Long>( + new QueryTask(services.serverB)); + final FutureTask<Long> qCFuture = new FutureTask<Long>( + new QueryTask(services.serverC)); + try { - for (int t = 0 ; t < nTransactions; t++) { - final FutureTask<Void> loadTaskFuture = new FutureTask<Void>(new LargeLoadTask(token, largeLoad/* reallyLargeLoad */)); - executorService.submit(loadTaskFuture); - loadTaskFuture.get(); // wait on load! - final FutureTask<Long> qAFuture = new FutureTask<Long>(new QueryTask(services.serverA)); - final FutureTask<Long> qBFuture = new FutureTask<Long>(new QueryTask(services.serverB)); - final FutureTask<Long> qCFuture = new FutureTask<Long>(new QueryTask(services.serverC)); - - executorService.submit(qAFuture); - executorService.submit(qBFuture); - executorService.submit(qCFuture); - - if (log.isInfoEnabled()) - log.info("StatementsA: " + qAFuture.get() - + ", StatementsB: " + qBFuture.get() - + ", StatementsC: " + qCFuture.get() - ); - - assertTrue(qAFuture.get().equals(qBFuture.get())); - assertTrue(qAFuture.get().equals(qCFuture.get())); - } + // Execute LOAD. + executorService.submit(loadTaskFuture); + loadTaskFuture.get(); // wait on load! + // Execute query tasks. + executorService.submit(qAFuture); + executorService.submit(qBFuture); + executorService.submit(qCFuture); + + if (log.isInfoEnabled()) + log.info("StatementsA: " + qAFuture.get() + + ", StatementsB: " + qBFuture.get() + + ", StatementsC: " + qCFuture.get()); + + assertEquals(qAFuture.get(), qBFuture.get()); + assertEquals(qAFuture.get(), qCFuture.get()); + } finally { - - queryTaskFuture.cancel(true/* mayInterruptIfRunning */); + // Ensure all tasks are cancelled. + loadTaskFuture.cancel(true/* mayInterruptIfRunning */); + qAFuture.cancel(true/* mayInterruptIfRunning */); + qBFuture.cancel(true/* mayInterruptIfRunning */); + qCFuture.cancel(true/* mayInterruptIfRunning */); + } - // Finally cehck for binary compatibility - assertDigestsEquals(new HAGlue[] { services.serverA, services.serverB, services.serverC }); + } -// } finally { -// -// destroyAll(); -// -// } - + // Finally check for binary compatibility + assertDigestsEquals(new HAGlue[] { services.serverA, services.serverB, + services.serverC }); + } /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |