This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-08-07 13:25:46
|
Revision: 7251 http://bigdata.svn.sourceforge.net/bigdata/?rev=7251&view=rev Author: thompsonbry Date: 2013-08-07 13:25:39 +0000 (Wed, 07 Aug 2013) Log Message: ----------- Minor edits on FutureTaskMon (final attributes and simplification of the stack trace acquisition). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2013-08-06 14:25:52 UTC (rev 7250) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/concurrent/FutureTaskMon.java 2013-08-07 13:25:39 UTC (rev 7251) @@ -44,11 +44,11 @@ private volatile boolean didStart = false; - public FutureTaskMon(Callable<T> callable) { + public FutureTaskMon(final Callable<T> callable) { super(callable); } - public FutureTaskMon(Runnable runnable, T result) { + public FutureTaskMon(final Runnable runnable, final T result) { super(runnable, result); } @@ -76,14 +76,13 @@ final boolean ret = super.cancel(mayInterruptIfRunning); - if (didStart && mayInterruptIfRunning && ret && log.isDebugEnabled()) { - try { - throw new RuntimeException("cancel call trace"); - } catch (RuntimeException re) { - log.debug("May interrupt running task", re); - } - } + if (didStart && mayInterruptIfRunning && ret && log.isDebugEnabled()) { + log.debug("May have interrupted running task", + new RuntimeException("Stack trace of cancel() invocation")); + + } + return ret; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-06 14:25:58
|
Revision: 7250 http://bigdata.svn.sourceforge.net/bigdata/?rev=7250&view=rev Author: thompsonbry Date: 2013-08-06 14:25:52 +0000 (Tue, 06 Aug 2013) Log Message: ----------- javadoc on IBlockingBuffer pertaining to [1]. [1] https://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/IBlockingBuffer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/IBlockingBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/IBlockingBuffer.java 2013-08-06 13:24:48 UTC (rev 7249) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/IBlockingBuffer.java 2013-08-06 14:25:52 UTC (rev 7250) @@ -29,6 +29,7 @@ package com.bigdata.relation.accesspath; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import com.bigdata.relation.IMutableRelation; import com.bigdata.relation.IRelation; @@ -97,7 +98,12 @@ /** * Set the {@link Future} for the source processing writing on the - * {@link IBlockingBuffer}. + * {@link IBlockingBuffer} (the producer). + * <p> + * Note: You should always wrap the task as a {@link FutureTask} and set the + * {@link Future} on the {@link IBlockingBuffer} before you start the + * consumer. This ensures that the producer will be cancelled if the + * consumer is interrupted. * * @param future * The {@link Future}. @@ -106,7 +112,10 @@ * if the argument is <code>null</code>. * @throws IllegalStateException * if the future has already been set. - * + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + * * @todo There should be a generic type for this. */ public void setFuture(Future future); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-06 13:24:56
|
Revision: 7249 http://bigdata.svn.sourceforge.net/bigdata/?rev=7249&view=rev Author: thompsonbry Date: 2013-08-06 13:24:48 +0000 (Tue, 06 Aug 2013) Log Message: ----------- I was able to refactor AbstractMasterTask to remove the hacked pattern and properly set the Future before starting the subtask. Old: {{{ /** * Submit the subtask to an {@link Executor}. * * @param subtask * The subtask. * * @return The {@link Future}. */ abstract protected Future<? extends AbstractSubtaskStats> submitSubtask(S subtask); }}} New: {{{ /** * Submit the subtask to an {@link Executor}. * * @param subtask * The {@link FutureTask} used to execute thee subtask. * * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> * BlockingBuffer.close() does not unblock threads </a> */ abstract protected void submitSubtask( FutureTask<? extends AbstractSubtaskStats> subtask); }}} The implementation of submitSubtask is straightforward and looks as follows (the actual code depends on which executor service is being used): {{{ @Override protected void submitSubtask( final FutureTask<? extends AbstractSubtaskStats> subtask) { getFederation().getExecutorService().submit(subtask); } }}} AbstractMasterTask now uses the correct pattern. {{{ // Wrap the computation as a FutureTask. @SuppressWarnings({ "unchecked", "rawtypes" }) final FutureTask<? extends AbstractSubtaskStats> ft = new FutureTask( sink); // Set Future on the BlockingBuffer. out.setFuture(ft); // Assign a worker thread to the sink. submitSubtask(ft); }}} This passes the test suite for com.bigdata.service. A similar problematic pattern was identified in ResourceBufferTask at line 398. {{{ public IAsynchronousClientTask call() throws Exception { /* * @todo This is being done explicitly because the task is not being * submitted against the client's IRemoteExecutor service directly, * but instead against the ExecutorService for its internal * Federation reference. It would be better to obtain the * non-proxied IRemoteExecutor and run against that. I think that * I fixed this before... */ task.setFederation(getFederation()); /** * Note: while this is not an IBlockingBuffer, it should use the * same pattern. * * @see <a * href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> * BlockingBuffer.close() does not unblock threads </a> */ final FutureTask ft = new FutureTask(task); task.setFuture(ft); getFederation().getExecutorService().submit(ft); return (IAsynchronousClientTask) ((JiniFederation) getFederation()) .getProxy(task, true/* enableDGC */); } }}} Again, this change passes the com.bigdata.services test suite. See https://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexWriteTask.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractKeyRangeMasterTestCase.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractMasterTestCase.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2013-08-05 19:14:10 UTC (rev 7248) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2013-08-06 13:24:48 UTC (rev 7249) @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -897,26 +898,25 @@ // if (oldval == null) { /** - * Hack pattern ensures that the Future is cancelled if we exit - * by any code path after the Future has been submitted for - * evaluation and before the Future has been set on the - * BlockingBuffer. + * Start subtask. * * @see <a * href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> * BlockingBuffer.close() does not unblock threads </a> */ - Future<? extends AbstractSubtaskStats> future = null; - try { - // assign a worker thread to the sink. - future = submitSubtask(sink); - // set Future (can now be cancelled) - out.setFuture(future); - } finally { - if (future != null && buffer.getFuture() == null) { - // Future exists but not set on BlockingBuffer. - future.cancel(true/* mayInterruptIfRunning */); - } + { + + // Wrap the computation as a FutureTask. + @SuppressWarnings({ "unchecked", "rawtypes" }) + final FutureTask<? extends AbstractSubtaskStats> ft = new FutureTask( + sink); + + // Set Future on the BlockingBuffer. + out.setFuture(ft); + + // Assign a worker thread to the sink. + submitSubtask(ft); + } stats.subtaskStartCount.incrementAndGet(); @@ -958,15 +958,26 @@ */ abstract protected S newSubtask(L locator, BlockingBuffer<E[]> out); +// /** +// * Submit the subtask to an {@link Executor}. +// * +// * @param subtask +// * The subtask. +// * +// * @return The {@link Future}. +// */ +// abstract protected Future<? extends AbstractSubtaskStats> submitSubtask(S subtask); /** * Submit the subtask to an {@link Executor}. * * @param subtask - * The subtask. - * - * @return The {@link Future}. + * The {@link FutureTask} used to execute thee subtask. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> */ - abstract protected Future<? extends AbstractSubtaskStats> submitSubtask(S subtask); + abstract protected void submitSubtask( + FutureTask<? extends AbstractSubtaskStats> subtask); /** * Drains any {@link Future}s from {@link #finishedSubtaskQueue} which are done Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexWriteTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexWriteTask.java 2013-08-05 19:14:10 UTC (rev 7248) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/IndexWriteTask.java 2013-08-06 13:24:48 UTC (rev 7249) @@ -30,7 +30,7 @@ import java.util.LinkedList; import java.util.concurrent.Callable; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -288,13 +288,12 @@ } - @SuppressWarnings("unchecked") @Override - protected Future<HS> submitSubtask(final S subtask) { + protected void submitSubtask( + final FutureTask<? extends AbstractSubtaskStats> subtask) { - return (Future<HS>) ndx.getFederation().getExecutorService().submit( - subtask); - + ndx.getFederation().getExecutorService().submit(subtask); + } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractKeyRangeMasterTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractKeyRangeMasterTestCase.java 2013-08-05 19:14:10 UTC (rev 7248) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractKeyRangeMasterTestCase.java 2013-08-06 13:24:48 UTC (rev 7249) @@ -28,18 +28,16 @@ package com.bigdata.service.ndx.pipeline; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -507,10 +505,11 @@ } @Override - protected Future<? extends AbstractSubtaskStats> submitSubtask(S subtask) { + protected void submitSubtask( + final FutureTask<? extends AbstractSubtaskStats> subtask) { - return executorService.submit(subtask); - + executorService.submit(subtask); + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractMasterTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractMasterTestCase.java 2013-08-05 19:14:10 UTC (rev 7248) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/AbstractMasterTestCase.java 2013-08-06 13:24:48 UTC (rev 7249) @@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -341,10 +341,11 @@ } @Override - protected Future<? extends AbstractSubtaskStats> submitSubtask(S subtask) { + protected void submitSubtask( + final FutureTask<? extends AbstractSubtaskStats> subtask) { - return executorService.submit(subtask); - + executorService.submit(subtask); + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java 2013-08-05 19:14:10 UTC (rev 7248) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/ResourceBufferTask.java 2013-08-06 13:24:48 UTC (rev 7249) @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -47,6 +48,7 @@ import com.bigdata.service.jini.JiniFederation; import com.bigdata.service.ndx.pipeline.AbstractPendingSetMasterTask; import com.bigdata.service.ndx.pipeline.AbstractSubtask; +import com.bigdata.service.ndx.pipeline.AbstractSubtaskStats; /** * Task drains a {@link BlockingBuffer} containing resources (really, resource @@ -404,11 +406,20 @@ * I fixed this before... */ task.setFederation(getFederation()); + + /** + * Note: while this is not an IBlockingBuffer, it should use the + * same pattern. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + final FutureTask ft = new FutureTask(task); - final Future future = getFederation().getExecutorService().submit( - task); + task.setFuture(ft); - task.setFuture(future); + getFederation().getExecutorService().submit(ft); return (IAsynchronousClientTask) ((JiniFederation) getFederation()) .getProxy(task, true/* enableDGC */); @@ -434,12 +445,11 @@ } - @SuppressWarnings("unchecked") @Override - protected Future<HS> submitSubtask(final S subtask) { + protected void submitSubtask( + final FutureTask<? extends AbstractSubtaskStats> subtask) { - return (Future<HS>) getFederation().getExecutorService() - .submit(subtask); + getFederation().getExecutorService().submit(subtask); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-05 19:14:16
|
Revision: 7248 http://bigdata.svn.sourceforge.net/bigdata/?rev=7248&view=rev Author: mrpersonick Date: 2013-08-05 19:14:10 +0000 (Mon, 05 Aug 2013) Log Message: ----------- fixed ticket 693 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithoutSids.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithoutSids.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithoutSids.java 2013-08-05 19:13:01 UTC (rev 7247) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestBigdataSailWithoutSids.java 2013-08-05 19:14:10 UTC (rev 7248) @@ -104,6 +104,7 @@ suite.addTestSuite(com.bigdata.rdf.sail.TestTicket610.class); suite.addTestSuite(com.bigdata.rdf.sail.TestTicket669.class); + suite.addTestSuite(com.bigdata.rdf.sail.TestTicket693.class); return suite; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-05 19:13:10
|
Revision: 7247 http://bigdata.svn.sourceforge.net/bigdata/?rev=7247&view=rev Author: mrpersonick Date: 2013-08-05 19:13:01 +0000 (Mon, 05 Aug 2013) Log Message: ----------- Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestTicket693.java branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/ticket693.txt Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestTicket693.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestTicket693.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestTicket693.java 2013-08-05 19:13:01 UTC (rev 7247) @@ -0,0 +1,323 @@ +/** +Copyright (C) SYSTAP, LLC 2011. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +package com.bigdata.rdf.sail; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.BindingImpl; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailTupleQuery; +import org.openrdf.rio.RDFFormat; + +import com.bigdata.rdf.axioms.NoAxioms; +import com.bigdata.rdf.vocab.NoVocabulary; + +/** + * Unit test template for use in submission of bugs. + * <p> + * This test case will delegate to an underlying backing store. You can + * specify this store via a JVM property as follows: + * <code>-DtestClass=com.bigdata.rdf.sail.TestBigdataSailWithQuads</code> + * <p> + * There are three possible configurations for the testClass: + * <ul> + * <li>com.bigdata.rdf.sail.TestBigdataSailWithQuads (quads mode)</li> + * <li>com.bigdata.rdf.sail.TestBigdataSailWithoutSids (triples mode)</li> + * <li>com.bigdata.rdf.sail.TestBigdataSailWithSids (SIDs mode)</li> + * </ul> + * <p> + * The default for triples and SIDs mode is for inference with truth maintenance + * to be on. If you would like to turn off inference, make sure to do so in + * {@link #getProperties()}. + * + * @author <a href="mailto:mrp...@us...">Mike Personick</a> + * @version $Id$ + */ +public class TestTicket693 extends QuadsTestCase { + + protected static final Logger log = Logger.getLogger(TestTicket693.class); + + /** + * Please set your database properties here, except for your journal file, + * please DO NOT SPECIFY A JOURNAL FILE. + */ + @Override + public Properties getProperties() { + + Properties props = super.getProperties(); + + /* + * For example, here is a set of five properties that turns off + * inference, truth maintenance, and the free text index. + */ + props.setProperty(BigdataSail.Options.AXIOMS_CLASS, NoAxioms.class.getName()); + props.setProperty(BigdataSail.Options.VOCABULARY_CLASS, NoVocabulary.class.getName()); + props.setProperty(BigdataSail.Options.TRUTH_MAINTENANCE, "false"); + props.setProperty(BigdataSail.Options.JUSTIFY, "false"); + props.setProperty(BigdataSail.Options.TEXT_INDEX, "false"); + + return props; + + } + + public TestTicket693() { + } + + public TestTicket693(String arg0) { + super(arg0); + } + + public void testBug1() throws Exception { + + /* + * The bigdata store, backed by a temporary journal file. + */ + final BigdataSail bigdataSail = getSail(); + + /* + * Data file containing the data demonstrating your bug. + */ + final String data = "property_paths.owl"; + final String baseURI = ""; + final RDFFormat format = RDFFormat.RDFXML; + +// final String update = +// "INSERT DATA " + +// "{ " + +// "<http://example.com/book1> a <http://example.com/Book> . " + +// "<http://example.com/book2> a <http://example.com/Book> . " + +// "<http://example.com/book3> a <http://example.com/Book> . " + +// "}"; +// + /* +bigdata results: +[s=http://example.org/A;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Class] +[s=http://example.org/B;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Class] +[s=http://example.org/C;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Class] +[s=http://example.org/D;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Class] +[s=http://example.org/E;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Class] +[s=http://example.org/B;p=http://www.w3.org/2000/01/rdf-schema#subClassOf;o=http://example.org/A] +[s=http://example.org/C;p=http://www.w3.org/2000/01/rdf-schema#subClassOf;o=http://example.org/B] +[s=http://example.org/D;p=http://www.w3.org/2000/01/rdf-schema#subClassOf;o=http://example.org/C] +[s=http://example.org/E;p=http://www.w3.org/2000/01/rdf-schema#subClassOf;o=http://example.org/D] +[s=http://www.semanticweb.org/ontologies/2013/5/untitled-ontology-287;p=http://www.w3.org/1999/02/22-rdf-syntax-ns#type;o=http://www.w3.org/2002/07/owl#Ontology] +10 results. + */ + + /* + * Query(ies) demonstrating your bug. + */ + final String withPath = IOUtils.toString(getClass().getResourceAsStream("ticket693.txt")); + + final String dumpStore = "select * where { ?s ?p ?o . }"; + + try { + + bigdataSail.initialize(); + + final BigdataSailRepository bigdataRepo = new BigdataSailRepository(bigdataSail); + + { // load the data into the bigdata store + + final RepositoryConnection cxn = bigdataRepo.getConnection(); + try { + cxn.setAutoCommit(false); + cxn.add(getClass().getResourceAsStream(data), baseURI, format); +// cxn.add(data); + cxn.commit(); + } finally { + cxn.close(); + } + + } + + { + final Collection<BindingSet> answer = new LinkedList<BindingSet>(); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/B")) + )); + + final String query = "SELECT * WHERE { ?sub rdfs:subClassOf <http://example.org/A> . } "; + + if (log.isInfoEnabled()) { + log.info("running query:\n" + query); + } + + /* + * Run the problem query using the bigdata store and then compare + * the answer. + */ + final RepositoryConnection cxn = bigdataRepo.getReadOnlyConnection(); + try { + + final SailTupleQuery tupleQuery = (SailTupleQuery) + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + tupleQuery.setIncludeInferred(false /* includeInferred */); + + final TupleQueryResult result = tupleQuery.evaluate(); + compare(result, answer); + + } finally { + cxn.close(); + } + + } + + { + final Collection<BindingSet> answer = new LinkedList<BindingSet>(); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/A")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/B")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/C")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/D")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/E")) + )); + + final String query = "SELECT * WHERE { ?sub rdfs:subClassOf* <http://example.org/A> . } "; + + if (log.isInfoEnabled()) { + log.info("running query:\n" + query); + } + + /* + * Run the problem query using the bigdata store and then compare + * the answer. + */ + final RepositoryConnection cxn = bigdataRepo.getReadOnlyConnection(); + try { + + final SailTupleQuery tupleQuery = (SailTupleQuery) + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + tupleQuery.setIncludeInferred(false /* includeInferred */); + + final TupleQueryResult result = tupleQuery.evaluate(); + compare(result, answer); + + } finally { + cxn.close(); + } + + } + + { + final Collection<BindingSet> answer = new LinkedList<BindingSet>(); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/A")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/B")) + )); + + final String query = "SELECT * WHERE { ?sub rdfs:subClassOf? <http://example.org/A> . } "; + + if (log.isInfoEnabled()) { + log.info("running query:\n" + query); + } + + /* + * Run the problem query using the bigdata store and then compare + * the answer. + */ + final RepositoryConnection cxn = bigdataRepo.getReadOnlyConnection(); + try { + + final SailTupleQuery tupleQuery = (SailTupleQuery) + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + tupleQuery.setIncludeInferred(false /* includeInferred */); + + final TupleQueryResult result = tupleQuery.evaluate(); + compare(result, answer); + + } finally { + cxn.close(); + } + + } + + { + final Collection<BindingSet> answer = new LinkedList<BindingSet>(); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/B")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/C")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/D")) + )); + answer.add(createBindingSet( + new BindingImpl("sub", new URIImpl("http://example.org/E")) + )); + + final String query = "SELECT * WHERE { ?sub rdfs:subClassOf+ <http://example.org/A> . } "; + + if (log.isInfoEnabled()) { + log.info("running query:\n" + query); + } + + /* + * Run the problem query using the bigdata store and then compare + * the answer. + */ + final RepositoryConnection cxn = bigdataRepo.getReadOnlyConnection(); + try { + + final SailTupleQuery tupleQuery = (SailTupleQuery) + cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); + tupleQuery.setIncludeInferred(false /* includeInferred */); + + final TupleQueryResult result = tupleQuery.evaluate(); + compare(result, answer); + + } finally { + cxn.close(); + } + + } + + } finally { + + bigdataSail.__tearDownUnitTest(); + + } + + } + +} Property changes on: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/TestTicket693.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/ticket693.txt =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/ticket693.txt (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/ticket693.txt 2013-08-05 19:13:01 UTC (rev 7247) @@ -0,0 +1,6 @@ +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +SELECT * +WHERE +{ +?sub rdfs:subClassOf+ <http://example.org/A> +} \ No newline at end of file Property changes on: branches/BIGDATA_RELEASE_1_2_0/bigdata-sails/src/test/com/bigdata/rdf/sail/ticket693.txt ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-05 18:42:07
|
Revision: 7246 http://bigdata.svn.sourceforge.net/bigdata/?rev=7246&view=rev Author: mrpersonick Date: 2013-08-05 18:41:58 +0000 (Mon, 05 Aug 2013) Log Message: ----------- committing a possible fix for the property path cardinality problem Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-08-05 18:05:54 UTC (rev 7245) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/paths/ArbitraryLengthPathOp.java 2013-08-05 18:41:58 UTC (rev 7246) @@ -265,7 +265,8 @@ final IBindingSet[] chunk = sitr.next(); - processChunk(chunk); + for (IBindingSet bs : chunk) + processChunk(new IBindingSet[] { bs }); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-05 18:06:04
|
Revision: 7245 http://bigdata.svn.sourceforge.net/bigdata/?rev=7245&view=rev Author: mrpersonick Date: 2013-08-05 18:05:54 +0000 (Mon, 05 Aug 2013) Log Message: ----------- checking in Jeremy's fix for ticket 684 Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java 2013-08-05 18:05:54 UTC (rev 7245) @@ -0,0 +1,858 @@ +package com.bigdata.rdf.sparql.ast.optimizers; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import com.bigdata.bop.IVariable; +import com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2; +import com.bigdata.journal.ITx; +import com.bigdata.rdf.sparql.ast.IBindingProducerNode; +import com.bigdata.rdf.sparql.ast.IReorderableNode; +import com.bigdata.rdf.sparql.ast.QueryHints; +import com.bigdata.rdf.sparql.ast.QueryRoot; +import com.bigdata.rdf.sparql.ast.StaticAnalysis; +import com.bigdata.rdf.sparql.ast.eval.AST2BOpContext; +import com.bigdata.rdf.sparql.ast.optimizers.ASTStaticJoinOptimizer.Annotations; + +/** + * This is the old static optimizer code, taken directly from + * {@link DefaultEvaluationPlan2}, but lined up with the AST API instead of the + * Rule and IPredicate API. + * + */ +public final class StaticOptimizer { + + private final StaticAnalysis sa; + + private final IBindingProducerNode[] ancestry; + + private final Set<IVariable<?>> ancestryVars; + + private final List<IReorderableNode> nodes; + + private final int arity; + + /** + * This is computed by the optimizer, and is a guess! + */ + private final long cardinality; + + private static final long NO_SHARED_VARS = Long.MAX_VALUE - 3; + + /** + * The computed evaluation order. The elements in this array are the order + * in which each tail predicate will be evaluated. The index into the array + * is the index of the tail predicate whose evaluation order you want. So + * <code>[2,0,1]</code> says that the predicates will be evaluated in the + * order tail[2], then tail[0], then tail[1]. + */ + private int[/* order */] order; + + public int[] getOrder() { + + if (order == null) { + + /* + * This will happen if you try to use toString() during the ctor + * before the order has been computed. + */ + + throw new IllegalStateException(); + + } + + return order; + + } + + /** + * Cache of the computed range counts for the predicates in the tail. The + * elements of this array are initialized to -1L, which indicates that the + * range count has NOT been computed. Range counts are computed on demand + * and MAY be zero. Only an approximate range count is obtained. Such + * approximate range counts are an upper bound on the #of elements that are + * spanned by the access pattern. Therefore if the range count reports ZERO + * (0L) it is a real zero and the access pattern does not match anything in + * the data. The only other caveat is that the range counts are valid as of + * the commit point on which the access pattern is reading. If you obtain + * them for {@link ITx#READ_COMMITTED} or {@link ITx#UNISOLATED} views then + * they could be invalidated by concurrent writers. + */ + private long[/* tailIndex */] rangeCount; + + private Tail[/* tailIndex */] tail; + + /** + * Keeps track of which tails have been used already and which still need to + * be evaluated. + */ + private boolean[/* tailIndex */] used; + + /** + * See {@link Annotations#OPTIMISTIC}. + */ + private final double optimistic; + + public StaticOptimizer(StaticOptimizer parent, List<IReorderableNode> nodes) { + this(parent.sa, parent.ancestry, nodes, parent.optimistic); + } + + StaticOptimizer(final QueryRoot queryRoot, final AST2BOpContext context, + final IBindingProducerNode[] ancestry, + final List<IReorderableNode> nodes, final double optimistic) { + this(new StaticAnalysis(queryRoot, context), ancestry, nodes, + optimistic); + } + + private StaticOptimizer(final StaticAnalysis sa, + final IBindingProducerNode[] ancestry, + final List<IReorderableNode> nodes, final double optimistic) { + + if (ancestry == null) + throw new IllegalArgumentException(); + + if (nodes == null) + throw new IllegalArgumentException(); + + this.sa = sa; + + this.ancestry = ancestry; + + this.ancestryVars = new LinkedHashSet<IVariable<?>>(); + + this.nodes = nodes; + + this.arity = nodes.size(); + + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("arity: " + arity); + } + + this.optimistic = optimistic; + + this.cardinality = calc(); + + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + for (int i = 0; i < arity; i++) { + ASTStaticJoinOptimizer.log.debug(order[i]); + } + } + + } + + /** + * Computes and sets the evaluation order, and returns an estimated + * cardinality. + */ + private long calc() { + + if (order != null) + throw new IllegalStateException( + "calc should only be called from the constructor"); + + order = new int[arity]; + rangeCount = new long[arity]; + used = new boolean[arity]; + tail = new Tail[arity]; + + // clear arrays. + for (int i = 0; i < arity; i++) { + order[i] = -1; // -1 is used to detect logic errors. + rangeCount[i] = -1L; // -1L indicates no range count yet. + used[i] = false; // not yet evaluated + } + + if (arity == 0) { + return 1l; + } + + if (arity == 1) { + order[0] = 0; + return cardinality(0); + } + + /* + * Seems like the easiest way to handle the ancestry is the exact same + * way we handle the "run first" statement patterns (text search), which + * is that we collect up the variables that are bound and then give + * preferential treatment to the predicates that can join on those + * variables. + */ + for (IBindingProducerNode join : ancestry) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log + .debug("considering join node from ancestry: " + join); + } + sa.getDefinitelyProducedBindings(join, ancestryVars, true/* recursive */); + } + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("bindings from ancestry: " + + Arrays.toString(ancestryVars.toArray())); + } + + /* + * See if there is a best tail to run first. The "run first" query hint + * gets priority, then the tails that share variables with the ancestry. + */ + int preferredFirstTail = -1; + + for (int i = 0; i < arity; i++) { + + /* + * We need the optimizer to play nice with the run first hint. + * Choose the first "run first" tail we see. + */ + if (nodes.get(i).getProperty(QueryHints.RUN_FIRST, false)) { + preferredFirstTail = i; + break; + } + } + + /* + * If there was no "run first" query hint, then go to the ancestry. + */ + if (preferredFirstTail == -1) + preferredFirstTail = getNextTailThatSharesVarsWithAncestry(); + + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("preferred first tail: " + + preferredFirstTail); + } + + // special case if there are only two tails + if (arity == 2) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("two tails left"); + if (preferredFirstTail != -1) { + order[0] = preferredFirstTail; + order[1] = preferredFirstTail == 0 ? 1 : 0; + } else { + order[0] = cardinality(0) <= cardinality(1) ? 0 : 1; + order[1] = cardinality(0) <= cardinality(1) ? 1 : 0; + } + return computeJoinCardinality(getTail(0), getTail(1)); + } + + /* + * There will be (tails-1) joins, we just need to figure out what they + * should be. + */ + StaticOptimizer.Join join; + if (preferredFirstTail == -1) + join = getFirstJoin(); + else + join = getFirstJoin(preferredFirstTail); + + long cardinality = join.cardinality; + + int t1 = ((Tail) join.getD1()).getTailIndex(); + int t2 = ((Tail) join.getD2()).getTailIndex(); + if (preferredFirstTail == -1) { + order[0] = cardinality(t1) <= cardinality(t2) ? t1 : t2; + order[1] = cardinality(t1) <= cardinality(t2) ? t2 : t1; + } else { + order[0] = t1; + order[1] = t2; + } + used[order[0]] = true; + used[order[1]] = true; + for (int i = 2; i < arity; i++) { + join = getNextJoin(join); + order[i] = ((Tail) join.getD2()).getTailIndex(); + used[order[i]] = true; + } + return cardinality; + } + + /** + * Start by looking at every possible initial join. Take every tail and + * match it with every other tail to find the lowest possible cardinality. + * See + * {@link #computeJoinCardinality(com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2.IJoinDimension, com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2.IJoinDimension)} + * for more on this. + */ + private StaticOptimizer.Join getFirstJoin() { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("evaluating first join"); + } + long minJoinCardinality = Long.MAX_VALUE; + long minTailCardinality = Long.MAX_VALUE; + long minOtherTailCardinality = Long.MAX_VALUE; + Tail minT1 = null; + Tail minT2 = null; + for (int i = 0; i < arity; i++) { + // only check unused tails + if (used[i]) { + continue; + } + Tail t1 = getTail(i); + long t1Cardinality = cardinality(i); + for (int j = 0; j < arity; j++) { + // check only non-same and unused tails + if (i == j || used[j]) { + continue; + } + Tail t2 = getTail(j); + long t2Cardinality = cardinality(j); + long joinCardinality = computeJoinCardinality(t1, t2); + long tailCardinality = Math.min(t1Cardinality, t2Cardinality); + long otherTailCardinality = Math.max(t1Cardinality, + t2Cardinality); + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("evaluating " + i + " X " + + j + ": cardinality= " + joinCardinality); + if (joinCardinality < minJoinCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minTailCardinality = tailCardinality; + minOtherTailCardinality = otherTailCardinality; + minT1 = t1; + minT2 = t2; + } else if (joinCardinality == minJoinCardinality) { + if (tailCardinality < minTailCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log + .debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minTailCardinality = tailCardinality; + minOtherTailCardinality = otherTailCardinality; + minT1 = t1; + minT2 = t2; + } else if (tailCardinality == minTailCardinality) { + if (otherTailCardinality < minOtherTailCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log + .debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minTailCardinality = tailCardinality; + minOtherTailCardinality = otherTailCardinality; + minT1 = t1; + minT2 = t2; + } + } + } + } + } + // the join variables is the union of the join dimensions' variables + Set<String> vars = new HashSet<String>(); + vars.addAll(minT1.getVars()); + vars.addAll(minT2.getVars()); + return new Join(minT1, minT2, minJoinCardinality, vars); + } + + private StaticOptimizer.Join getFirstJoin(final int preferredFirstTail) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("evaluating first join"); + } + + long minJoinCardinality = Long.MAX_VALUE; + long minOtherTailCardinality = Long.MAX_VALUE; + Tail minT2 = null; + final int i = preferredFirstTail; + final Tail t1 = getTail(i); + for (int j = 0; j < arity; j++) { + // check only non-same and unused tails + if (i == j || used[j]) { + continue; + } + Tail t2 = getTail(j); + long t2Cardinality = cardinality(j); + long joinCardinality = computeJoinCardinality(t1, t2); + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("evaluating " + i + " X " + j + + ": cardinality= " + joinCardinality); + if (joinCardinality < minJoinCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minOtherTailCardinality = t2Cardinality; + minT2 = t2; + } else if (joinCardinality == minJoinCardinality) { + if (t2Cardinality < minOtherTailCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minOtherTailCardinality = t2Cardinality; + minT2 = t2; + } + } + } + + // the join variables is the union of the join dimensions' variables + Set<String> vars = new HashSet<String>(); + vars.addAll(t1.getVars()); + vars.addAll(minT2.getVars()); + return new Join(t1, minT2, minJoinCardinality, vars); + } + + private Tail getTail(int tailIndex) { + if (tail[tailIndex] == null) { + tail[tailIndex] = new Tail(tailIndex, rangeCount(tailIndex), + getVars(tailIndex)); + } + return tail[tailIndex]; + } + + /** + * Similar to {@link #getFirstJoin()}, but we have one join dimension + * already calculated. + * + * @param d1 + * the first join dimension + * @return the new join with the lowest cardinality from the remaining tails + */ + private StaticOptimizer.Join getNextJoin(IJoinDimension d1) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("evaluating next join"); + } + long minJoinCardinality = Long.MAX_VALUE; + long minTailCardinality = Long.MAX_VALUE; + Tail minTail = null; + for (int i = 0; i < arity; i++) { + // only check unused tails + if (used[i]) { + continue; + } + Tail tail = getTail(i); + long tailCardinality = cardinality(i); + long joinCardinality = computeJoinCardinality(d1, tail); + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("evaluating " + + d1.toJoinString() + " X " + i + ": cardinality= " + + joinCardinality); + if (joinCardinality < minJoinCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minTailCardinality = tailCardinality; + minTail = tail; + } else if (joinCardinality == minJoinCardinality) { + if (tailCardinality < minTailCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + joinCardinality); + minJoinCardinality = joinCardinality; + minTailCardinality = tailCardinality; + minTail = tail; + } + } + } + + /* + * If we are at the "no shared variables" tails, the first thing we do + * is look to the ancestry for the next tail. + */ + if (minJoinCardinality == NO_SHARED_VARS) { + final int i = getNextTailThatSharesVarsWithAncestry(); + if (i >= 0) { + final long tailCardinality = cardinality(i); + minJoinCardinality = tailCardinality; + minTail = getTail(i); + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + tailCardinality); + } + } + + /* + * If we are still at the "no shared variables" state, then simply order + * by range count and choose the min. + */ + if (minJoinCardinality == NO_SHARED_VARS) { + minJoinCardinality = Long.MAX_VALUE; + for (int i = 0; i < arity; i++) { + // only check unused tails + if (used[i]) { + continue; + } + Tail tail = getTail(i); + long tailCardinality = cardinality(i); + if (tailCardinality < minJoinCardinality) { + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) + ASTStaticJoinOptimizer.log.debug("found a new min: " + + tailCardinality); + minJoinCardinality = tailCardinality; + minTail = tail; + } + } + } + + // the join variables is the union of the join dimensions' variables + Set<String> vars = new HashSet<String>(); + vars.addAll(d1.getVars()); + vars.addAll(minTail.getVars()); + return new Join(d1, minTail, minJoinCardinality, vars); + } + + /** + * Return the range count for the predicate, ignoring any bindings. The + * range count for the tail predicate is cached the first time it is + * requested and returned from the cache thereafter. The range counts are + * requested using the "non-exact" range count query, so the range counts + * are actually the upper bound. However, if the upper bound is ZERO (0) + * then the range count really is ZERO (0). + * + * @param tailIndex + * The index of the predicate in the tail of the rule. + * + * @return The range count for that tail predicate. + */ + public long rangeCount(final int tailIndex) { + + if (rangeCount[tailIndex] == -1L) { + + final long rangeCount = (long) nodes.get(tailIndex) + .getEstimatedCardinality(this); + + this.rangeCount[tailIndex] = rangeCount; + + } + + return rangeCount[tailIndex]; + + } + + /** + * Return the cardinality of a particular tail, which is the range count if + * not optional and infinite if optional. + */ + public long cardinality(final int tailIndex) { + return rangeCount(tailIndex); + } + + public String toString() { + return Arrays.toString(getOrder()); + } + + /** + * This is the secret sauce. There are three possibilities for computing the + * join cardinality, which we are defining as the upper-bound for solutions + * for a particular join. First, if there are no shared variables then the + * cardinality will just be the simple product of the cardinality of each + * join dimension. If there are shared variables but no unshared variables, + * then the cardinality will be the minimum cardinality from the join + * dimensions. If there are shared variables but also some unshared + * variables, then the join cardinality will be the maximum cardinality from + * each join dimension. + * <p> + * TODO: Any join involving an optional will have infinite cardinality, so + * that optionals get placed at the end. + * + * @param d1 + * the first join dimension + * @param d2 + * the second join dimension + * @return the join cardinality + */ + protected long computeJoinCardinality(IJoinDimension d1, IJoinDimension d2) { + // // two optionals is worse than one + // if (d1.isOptional() && d2.isOptional()) { + // return BOTH_OPTIONAL; + // } + // if (d1.isOptional() || d2.isOptional()) { + // return ONE_OPTIONAL; + // } + final boolean sharedVars = hasSharedVars(d1, d2); + final boolean unsharedVars = hasUnsharedVars(d1, d2); + final long joinCardinality; + if (sharedVars == false) { + // no shared vars - take the sum + // joinCardinality = d1.getCardinality() + d2.getCardinality(); + // different approach - give preference to shared variables + joinCardinality = NO_SHARED_VARS; + } else { + if (unsharedVars == false) { + // shared vars and no unshared vars - take the min + joinCardinality = Math.min(d1.getCardinality(), + d2.getCardinality()); + } else { + // shared vars and unshared vars - take the max + /* + * This modification to the join planner results in + * significantly faster queries for the bsbm benchmark (3x - 5x + * overall). It takes a more optimistic perspective on the + * intersection of two statement patterns, predicting that this + * will constraint, rather than increase, the multiplicity of + * the solutions. However, this COULD lead to pathological cases + * where the resulting join plan is WORSE than it would have + * been otherwise. For example, this change produces a 3x to 5x + * improvement in the BSBM benchmark results. However, it has a + * negative effect on LUBM Q2. + * + * Update: Ok so just to go into a little detail - yesterday's + * change means we choose the join ordering based on an + * optimistic view of the cardinality of any particular join. If + * you have two triple patterns that share variables but that + * also have unshared variables, then technically the maximum + * cardinality of the join is the maximum range count of the two + * tails. But often the true cardinality of the join is closer + * to the minimum range count than the maximum. So yesterday we + * started assigning an expected cardinality for the join of the + * minimum range count rather than the maximum. What this means + * is that a lot of the time when those joins move toward the + * front of the line the query will do a lot better, but + * occasionally (LUBM 2), the query will do much much worse + * (when the true cardinality is closer to the max range count). + * + * Today we put in an extra tie-breaker condition. We already + * had one tie-breaker - if two joins have the same expected + * cardinality we chose the one with the lower minimum range + * count. But the new tie-breaker is that if two joins have the + * same expected cardinality and minimum range count, we now + * chose the one that has the minimum range count on the other + * tail (the minimum maximum if that makes sense). + * + * 11/14/2011: The static join order optimizer should consider + * the swing in stakes when choosing between either the MIN or + * the MAX of the cardinality of two join dimensions in order to + * decide which join to schedule next. Historically it took the + * MAX, but there are counter examples to that decision such as + * LUBM Q2. Subsequently it was modified to take the MIN, but + * BSBM BI Q1 is a counter example for that. + * + * Modify the static optimizer to consider the swing in stakes + * between the choice of MAX versus MIN. I believe that this + * boils down to something like "If an incorrect guess of MIN + * would cause us to suffer a very bad MAX, then choose based on + * the MAX to avoid paying that penalty." + */ + joinCardinality = (long) ((long) (optimistic * Math.min( + d1.getCardinality(), d2.getCardinality())) + ((1.0d - optimistic) * Math + .max(d1.getCardinality(), d2.getCardinality()))); + } + } + return joinCardinality; + } + + /** + * Get the named variables for a given tail. Is there a better way to do + * this? + * + * @param tail + * the tail + * @return the named variables + */ + protected Set<String> getVars(int tail) { + final IReorderableNode node = nodes.get(tail); + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug(node); + } + + final Set<IVariable<?>> vars = new LinkedHashSet<IVariable<?>>(); + sa.getDefinitelyProducedBindings(node, vars, false); + + final Set<String> varNames = new LinkedHashSet<String>(); + for (IVariable<?> v : vars) + varNames.add(v.getName()); + + return varNames; + } + + /** + * Look for shared variables. + * + * @param d1 + * the first join dimension + * @param d2 + * the second join dimension + * @return true if there are shared variables, false otherwise + */ + protected boolean hasSharedVars(IJoinDimension d1, IJoinDimension d2) { + for (String var : d1.getVars()) { + if (d2.getVars().contains(var)) { + return true; + } + } + return false; + } + + /** + * Check to see if the specified tail shares any variables with the + * ancestry. + */ + protected boolean sharesVarsWithAncestry(final int tail) { + + final Set<IVariable<?>> tailVars = sa + .getDefinitelyProducedBindings(nodes.get(tail), + new LinkedHashSet<IVariable<?>>(), true/* recursive */); + + return !Collections.disjoint(ancestryVars, tailVars); + + } + + /** + * Get the next tail (unused, non-optional) that shares a var with the + * ancestry. If there are many, choose the one with the lowest cardinality. + * Return -1 if none are found. + */ + protected int getNextTailThatSharesVarsWithAncestry() { + + int nextTail = -1; + long minCardinality = Long.MAX_VALUE; + + // give preferential treatment to a tail that shares variables with the + // ancestry. collect all of them up and then choose the one + // that has the lowest cardinality + for (int i = 0; i < arity; i++) { + // only check unused tails + if (used[i]) { + continue; + } + + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("considering tail: " + + nodes.get(i)); + } + + if (ASTStaticJoinOptimizer.log.isDebugEnabled()) { + ASTStaticJoinOptimizer.log.debug("vars: " + + Arrays.toString(getVars(i).toArray())); + } + + if (sharesVarsWithAncestry(i)) { + /* + * We have a shared var with the ancestry. + */ + final long tailCardinality = cardinality(i); + if (tailCardinality < minCardinality) { + nextTail = i; + minCardinality = tailCardinality; + } + } + + } + + return nextTail; + + } + + /** + * Look for unshared variables. + * + * @param d1 + * the first join dimension + * @param d2 + * the second join dimension + * @return true if there are unshared variables, false otherwise + */ + protected boolean hasUnsharedVars(IJoinDimension d1, IJoinDimension d2) { + for (String var : d1.getVars()) { + if (d2.getVars().contains(var) == false) { + return true; + } + } + for (String var : d2.getVars()) { + if (d1.getVars().contains(var) == false) { + return true; + } + } + return false; + } + + /** + * A join dimension can be either a tail, or a previous join. Either way we + * need to know its cardinality, its variables, and its tails. + */ + private interface IJoinDimension { + long getCardinality(); + + Set<String> getVars(); + + String toJoinString(); + // boolean isOptional(); + } + + /** + * A join implementation of a join dimension. The join can consist of two + * tails, or one tail and another join. Theoretically it could be two joins + * as well, which might be a future optimization worth thinking about. + */ + private static class Join implements IJoinDimension { + + private final IJoinDimension d1, d2; + private final long cardinality; + private final Set<String> vars; + + public Join(IJoinDimension d1, IJoinDimension d2, long cardinality, + Set<String> vars) { + this.d1 = d1; + this.d2 = d2; + this.cardinality = cardinality; + this.vars = vars; + } + + public IJoinDimension getD1() { + return d1; + } + + public IJoinDimension getD2() { + return d2; + } + + public Set<String> getVars() { + return vars; + } + + public long getCardinality() { + return cardinality; + } + + public String toJoinString() { + return d1.toJoinString() + " X " + d2.toJoinString(); + } + + } + + /** + * A tail implementation of a join dimension. + */ + private class Tail implements IJoinDimension { + + private final int tailIndex; + private final long cardinality; + private final Set<String> vars; + + public Tail(int tail, long cardinality, Set<String> vars) { + this.tailIndex = tail; + this.cardinality = cardinality; + this.vars = vars; + } + + public int getTailIndex() { + return tailIndex; + } + + public long getCardinality() { + return cardinality; + } + + public Set<String> getVars() { + return vars; + } + + // public boolean isOptional() { + // return nodes.get(tail).isOptional(); + // } + + public String toJoinString() { + return String.valueOf(tailIndex); + } + + } + + public long getCardinality() { + return cardinality; + } + +} \ No newline at end of file Property changes on: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/StaticOptimizer.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-08-05 17:22:51
|
Revision: 7244 http://bigdata.svn.sourceforge.net/bigdata/?rev=7244&view=rev Author: mrpersonick Date: 2013-08-05 17:22:38 +0000 (Mon, 05 Aug 2013) Log Message: ----------- checking in Jeremy's fix for ticket 684 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/ArbitraryLengthPathNode.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/IReorderableNode.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/JoinGroupNode.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StatementPatternNode.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/UnionNode.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTStaticJoinOptimizer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/ArbitraryLengthPathNode.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/ArbitraryLengthPathNode.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/ArbitraryLengthPathNode.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -10,6 +10,7 @@ import com.bigdata.bop.NV; import com.bigdata.rdf.sparql.ast.PathNode.PathMod; import com.bigdata.rdf.sparql.ast.eval.AST2BOpBase; +import com.bigdata.rdf.sparql.ast.optimizers.StaticOptimizer; /** * A special kind of AST node that represents the SPARQL 1.1 arbitrary length @@ -210,14 +211,14 @@ // @Override public boolean isReorderable() { - final long estCard = getEstimatedCardinality(); + final long estCard = getEstimatedCardinality(null); return estCard >= 0 && estCard < Long.MAX_VALUE; } // @Override - public long getEstimatedCardinality() { + public long getEstimatedCardinality(StaticOptimizer opt) { final JoinGroupNode group = subgroup(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/IReorderableNode.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/IReorderableNode.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/IReorderableNode.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -28,6 +28,7 @@ package com.bigdata.rdf.sparql.ast; import com.bigdata.bop.BOp; +import com.bigdata.rdf.sparql.ast.optimizers.StaticOptimizer; /** * Interface for things which can be re-ordered by the static join @@ -48,7 +49,8 @@ * Return the estimated cardinality - either the range count of a * statement pattern or some computed estimated cardinality for a join * group. + * @param opt This optimizer can be used to help work out the estimate */ - long getEstimatedCardinality(); + long getEstimatedCardinality(StaticOptimizer opt); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/JoinGroupNode.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/JoinGroupNode.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/JoinGroupNode.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -1,5 +1,6 @@ package com.bigdata.rdf.sparql.ast; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -7,6 +8,7 @@ import com.bigdata.bop.BOp; import com.bigdata.bop.IVariable; import com.bigdata.rdf.internal.constraints.InBOp; +import com.bigdata.rdf.sparql.ast.eval.AST2BOpContext; import com.bigdata.rdf.sparql.ast.service.ServiceNode; /** @@ -411,6 +413,18 @@ } + public List<IReorderableNode> getReorderableChildren() { + final List<IReorderableNode> nodes = getChildren(IReorderableNode.class); + final Iterator<IReorderableNode> it = nodes.iterator(); + while (it.hasNext()) { + final IReorderableNode node = it.next(); + if (!node.isReorderable()) { + it.remove(); + } + } + return nodes; + } + /* * Note: I took this out and put a simpler version of toString(indent) into * the base class. The multiple passes over the AST when rendering it into Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StatementPatternNode.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StatementPatternNode.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StatementPatternNode.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -19,6 +19,7 @@ import com.bigdata.rdf.sparql.ast.eval.AST2BOpUtility; import com.bigdata.rdf.sparql.ast.optimizers.ASTGraphGroupOptimizer; import com.bigdata.rdf.sparql.ast.optimizers.ASTSimpleOptionalOptimizer; +import com.bigdata.rdf.sparql.ast.optimizers.StaticOptimizer; import com.bigdata.rdf.spo.DistinctTermAdvancer; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPOAccessPath; @@ -644,7 +645,7 @@ * @see com.bigdata.rdf.sparql.ast.IReorderableNode#getEstimatedCardinality() */ @Override - public long getEstimatedCardinality() { + public long getEstimatedCardinality(StaticOptimizer opt) { return getProperty(AST2BOpBase.Annotations.ESTIMATED_CARDINALITY, -1l); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/UnionNode.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/UnionNode.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/UnionNode.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -5,6 +5,7 @@ import org.apache.log4j.Logger; import com.bigdata.bop.BOp; +import com.bigdata.rdf.sparql.ast.optimizers.StaticOptimizer; /** * A special kind of group {@link IGroupNode} that represents the sparql union @@ -12,7 +13,7 @@ * <p> * Note: This node only accepts {@link JoinGroupNode}s as children. */ -public class UnionNode extends GraphPatternGroup<JoinGroupNode> { +public class UnionNode extends GraphPatternGroup<JoinGroupNode> implements IReorderableNode { /** * @@ -55,15 +56,6 @@ @Override public UnionNode addChild(final JoinGroupNode child) { - // can only add non-optional join groups as children to union - if (!(child instanceof JoinGroupNode)) { - - throw new IllegalArgumentException("UnionNode only permits " - + JoinGroupNode.class.getSimpleName() - + " children, but child=" + child); - - } - final JoinGroupNode group = (JoinGroupNode) child; // can only add non-optional join groups as children to union @@ -95,4 +87,29 @@ } + + @Override + public long getEstimatedCardinality(StaticOptimizer optimizer) { + long cardinality = 0; + for (JoinGroupNode child : this) { + StaticOptimizer opt = new StaticOptimizer(optimizer, child.getReorderableChildren()); + cardinality += opt.getCardinality(); + } + return cardinality; + } + + @Override + public boolean isReorderable() { + for (JoinGroupNode child : this) { + for (IGroupMemberNode grandchild : child) { + if (! (grandchild instanceof IReorderableNode)) + return false; + if (! ((IReorderableNode)grandchild).isReorderable()) + return false; + } + } + return true; + } + + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -148,12 +148,6 @@ } - /** - * Eliminate a parent join group whose only child is another join group by - * lifting the child (it replaces the parent). - * - * @param op - */ private static void eliminateEmptyGroups(final QueryBase queryBase, final GraphPatternGroup<?> op) { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTStaticJoinOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTStaticJoinOptimizer.java 2013-08-05 17:02:35 UTC (rev 7243) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTStaticJoinOptimizer.java 2013-08-05 17:22:38 UTC (rev 7244) @@ -28,9 +28,6 @@ package com.bigdata.rdf.sparql.ast.optimizers; import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -44,7 +41,6 @@ import com.bigdata.bop.IConstant; import com.bigdata.bop.IVariable; import com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2; -import com.bigdata.journal.ITx; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.constraints.RangeBOp; import com.bigdata.rdf.sparql.ast.GraphPatternGroup; @@ -63,8 +59,8 @@ import com.bigdata.rdf.sparql.ast.QueryRoot; import com.bigdata.rdf.sparql.ast.RangeNode; import com.bigdata.rdf.sparql.ast.StatementPatternNode; -import com.bigdata.rdf.sparql.ast.StaticAnalysis; import com.bigdata.rdf.sparql.ast.TermNode; +import com.bigdata.rdf.sparql.ast.UnionNode; import com.bigdata.rdf.sparql.ast.eval.AST2BOpBase; import com.bigdata.rdf.sparql.ast.eval.AST2BOpContext; import com.bigdata.rdf.sparql.ast.eval.IEvaluationContext; @@ -88,7 +84,7 @@ */ public class ASTStaticJoinOptimizer implements IASTOptimizer { - private static final Logger log = Logger + static final Logger log = Logger .getLogger(ASTStaticJoinOptimizer.class); public interface Annotations extends AST2BOpBase.Annotations { @@ -269,295 +265,288 @@ } + + abstract private class GroupNodeOptimizer<T extends GraphPatternGroup<?>> { + final T op; + final AST2BOpContext ctx; + private final IBindingSet exogenousBindings; + final QueryRoot queryRoot; + public GroupNodeOptimizer(AST2BOpContext ctx, + IBindingSet exogenousBindings, QueryRoot queryRoot, + IBindingProducerNode[] ancestry, T op) { + this.op = op; + this.ctx = ctx; + this.exogenousBindings = exogenousBindings; + this.queryRoot = queryRoot; + } + + public void optimizex() { + optimizeThisLevel(); + optimizeRecursively(); + } + + private void optimizeRecursively() { + + /* + * Recursion, but only into group nodes (including within subqueries). + */ + for (int i = 0; i < op.arity(); i++) { + + final BOp child = op.get(i); + + if (child instanceof GraphPatternGroup<?>) { + + @SuppressWarnings("unchecked") + final GraphPatternGroup<IGroupMemberNode> childGroup = (GraphPatternGroup<IGroupMemberNode>) child; + + optimize(ctx, exogenousBindings, queryRoot, getAncestry(), childGroup); + + } else if (child instanceof QueryBase) { + + final QueryBase subquery = (QueryBase) child; + + @SuppressWarnings("unchecked") + final GraphPatternGroup<IGroupMemberNode> childGroup = (GraphPatternGroup<IGroupMemberNode>) subquery + .getWhereClause(); + + /* + * Only the projected variables are in scope in the subquery. + */ + + final Set<IVariable<?>> projectedVars = subquery + .getProjectedVars(new LinkedHashSet<IVariable<?>>()); + + final IVariable<?>[] variablesToKeep = BOpUtility + .toArray(projectedVars.iterator()); + + final IBindingSet tmp = exogenousBindings == null ? null + : exogenousBindings.copy(variablesToKeep); + + optimize(ctx, tmp, queryRoot, getAncestry(), childGroup); + + } + + afterOptimizingChild(child); + } + + } + + abstract void afterOptimizingChild(BOp child); + + abstract IBindingProducerNode[] getAncestry(); + + abstract void optimizeThisLevel() ; + + } + + private class JoinGroupNodeOptimizer extends GroupNodeOptimizer<JoinGroupNode> { + + final List<IBindingProducerNode> ancestry; + public JoinGroupNodeOptimizer(AST2BOpContext ctx, + IBindingSet exogenousBindings, QueryRoot queryRoot, + IBindingProducerNode[] ancestry, JoinGroupNode joinGroup) { + super(ctx,exogenousBindings,queryRoot,ancestry,joinGroup); + this.ancestry = new LinkedList<IBindingProducerNode>(Arrays.asList(ancestry)); + /* + * Look for service calls and named subquery includes, since they + * will get run before the statement pattern nodes. Add them into + * the ancestry. + */ + addToAncestry(joinGroup.getServiceNodes(),"service node"); + addToAncestry(joinGroup.getNamedSubqueryIncludes(),"named subquery include"); + } + + private void addToAncestry(List<? extends IBindingProducerNode> moreAncestors, String dbgMessage) { + for (IBindingProducerNode ancestor : moreAncestors) { + if (log.isDebugEnabled()) { + log.debug("adding a "+dbgMessage+" to ancestry:" +ancestor); + } + ancestry.add(ancestor); + } + } + + @Override + void afterOptimizingChild(BOp child) { + + /* + * Update the ancestry for recursion. Only add the + * non-optional statement pattern nodes - the ones that we + * can count on to bind their variables. + */ + if (child instanceof IBindingProducerNode) { + if (child instanceof IJoinNode) { + IJoinNode ijn = (IJoinNode)child; + if (ijn.isOptional() || ijn.isMinus() ) { + return; + } + + } + ancestry.add((IBindingProducerNode)child); + + } + } + + @Override + IBindingProducerNode[] getAncestry() { + return ancestry.toArray(new IBindingProducerNode[ancestry.size()]); + } + + @Override + void optimizeThisLevel() { + + if (isStaticOptimizer(ctx, op)) { + + optimizeJoinGroup(ctx, queryRoot, getAncestry(), op); + } + } + + } + private class UnionNodeOptimizer extends GroupNodeOptimizer<UnionNode> { + + final IBindingProducerNode[] ancestry; + public UnionNodeOptimizer(AST2BOpContext ctx, + IBindingSet exogenousBindings, QueryRoot queryRoot, + IBindingProducerNode[] ancestry, UnionNode op) { + super(ctx,exogenousBindings,queryRoot,ancestry,op); + this.ancestry = ancestry; + } + + @Override + void afterOptimizingChild(BOp child) { + // nothing to do + } + + @Override + IBindingProducerNode[] getAncestry() { + return ancestry; + } + + @Override + void optimizeThisLevel() { + // don't. + } + + } + + + private GroupNodeOptimizer<?> createGroupNodeOptimizer(final AST2BOpContext ctx, + final IBindingSet exogenousBindings, final QueryRoot queryRoot, + IBindingProducerNode[] ancestry, final GraphPatternGroup<?> op) { + if (op instanceof JoinGroupNode) { + return new JoinGroupNodeOptimizer(ctx,exogenousBindings,queryRoot,ancestry,(JoinGroupNode)op); + } else if (op instanceof UnionNode) { + return new UnionNodeOptimizer(ctx,exogenousBindings,queryRoot,ancestry,(UnionNode)op); + } else { + throw new IllegalArgumentException("Unexpected subclass of GraphPatternGroup"); + } + } + /** - * Eliminate a parent join group whose only child is another join group by - * lifting the child (it replaces the parent). * * @param ctx * @param exogenousBindings * The exogenous bindings -or- <code>null</code> iff there are * none. * @param queryRoot - * @param ancestry + * @param ancestry The nodes that are known to have already bound their variables. * @param op */ private void optimize(final AST2BOpContext ctx, final IBindingSet exogenousBindings, final QueryRoot queryRoot, IBindingProducerNode[] ancestry, final GraphPatternGroup<?> op) { - if (op instanceof JoinGroupNode) { - - final JoinGroupNode joinGroup = (JoinGroupNode) op; - - /* - * Look for service calls and named subquery includes, since they - * will get run before the statement pattern nodes. Add them into - * the ancestry. - */ - final List<ServiceNode> serviceNodes = joinGroup.getServiceNodes(); - - final List<NamedSubqueryInclude> namedSubqueryIncludes = - joinGroup.getNamedSubqueryIncludes(); - - if (serviceNodes.size() > 0 || namedSubqueryIncludes.size() > 0) { - - final List<IBindingProducerNode> tmp = new LinkedList<IBindingProducerNode>(); - - for (IBindingProducerNode ancestor : ancestry) { - - tmp.add(ancestor); - - } - - for (ServiceNode service : serviceNodes) { - - if (log.isDebugEnabled()) { - log.debug("adding a service node to ancestry:" + service); - } - - tmp.add(service); - - } - - for (NamedSubqueryInclude nsi : namedSubqueryIncludes) { - - if (log.isDebugEnabled()) { - log.debug("adding a named subquery include to ancestry:" + nsi); - } - - tmp.add(nsi); - - } - - ancestry = tmp.toArray(new IJoinNode[tmp.size()]); - - } - - /* - * First optimize this group. - */ -// @SuppressWarnings("rawtypes") -// final List<StatementPatternNode> spNodes = new LinkedList<StatementPatternNode>(); -// -// for (StatementPatternNode sp : joinGroup.getStatementPatterns()) { -// -// if(!sp.isSimpleOptional()) { -// -// // Only required statement patterns. -// spNodes.add(sp); -// -// } -// -// } + createGroupNodeOptimizer(ctx,exogenousBindings,queryRoot,ancestry,op).optimizex(); - if (isStaticOptimizer(ctx, joinGroup)) { + } - /* - * Let the optimizer handle the simple optionals too. - */ -// final List<StatementPatternNode> spNodes = joinGroup.getStatementPatterns(); - final List<IReorderableNode> nodes = joinGroup.getChildren(IReorderableNode.class); - - final Iterator<IReorderableNode> it = nodes.iterator(); - - while (it.hasNext()) { - - final IReorderableNode node = it.next(); - - // check each instance - if (!node.isReorderable()) { - - it.remove(); - - } - - } - - if (!nodes.isEmpty()) { - /* - * Now done by the ASTRangeCountOptimizer. - */ -// // Always attach the range counts. -// attachRangeCounts(ctx, spNodes, exogenousBindings); - - /* - * Find the "slots" where the reorderable nodes currently - * show up in the join group. We will later fill in these - * slots with the same nodes, but in a - * different (optimized) ordering. - */ - final int[] slots = new int[nodes.size()]; - { - int j = 0; - for (int i = 0; i < joinGroup.arity() && j < nodes.size(); i++) { - - if (joinGroup.get(i) == nodes.get(j)) { - - slots[j++] = i; - - } - - } - } + private void optimizeJoinGroup(final AST2BOpContext ctx, + final QueryRoot queryRoot, IBindingProducerNode[] ancestry, + final JoinGroupNode joinGroup) { + /* + * Let the optimizer handle the simple optionals too. + */ + final List<IReorderableNode> nodes = joinGroup.getReorderableChildren(); + + if (!nodes.isEmpty()) { -// final double optimistic = Double.parseDouble( -// joinGroup.getQueryHint(Annotations.OPTIMISTIC, -// Annotations.DEFAULT_OPTIMISTIC)); + /* + * Find the "slots" where the reorderable nodes currently + * show up in the join group. We will later fill in these + * slots with the same nodes, but in a + * different (optimized) ordering. + */ + final int[] slots = new int[nodes.size()]; + { + int j = 0; + for (int i = 0; i < joinGroup.arity() && j < nodes.size(); i++) { - final double optimistic = joinGroup.getProperty( - Annotations.OPTIMISTIC, - Annotations.DEFAULT_OPTIMISTIC); - - final List<IReorderableNode> required = - new LinkedList<IReorderableNode>(); - -// final List<IReorderableNode> optional = -// new LinkedList<IReorderableNode>(); - - IReorderableNode runLast = null; - - for (IReorderableNode sp : nodes) { - - if (runLast == null && sp.getProperty(QueryHints.RUN_LAST, false)) { + if (joinGroup.get(i) == nodes.get(j)) { - runLast = sp; - -// } else if (sp.isOptional()) { -// -// optional.add(sp); -// - } else { - - required.add(sp); - - } - - } - - /* - * Calculate the optimized join ordering for the required - * tails. - */ - final StaticOptimizer opt = new StaticOptimizer(queryRoot, - ctx, ancestry, required, optimistic); + slots[j++] = i; - final int[] order = opt.getOrder(); + } - /* - * Reorder the statement pattern nodes within the join - * group. - */ - int i = 0; - for (int j = 0; j < required.size(); j++) { + } + } - final IReorderableNode sp = required.get(order[j]); - joinGroup.setArg(slots[i++], sp); + final double optimistic = joinGroup.getProperty( + Annotations.OPTIMISTIC, + Annotations.DEFAULT_OPTIMISTIC); + + final List<IReorderableNode> required = + new LinkedList<IReorderableNode>(); + + + IReorderableNode runLast = null; + + for (IReorderableNode sp : nodes) { + + if (runLast == null && sp.getProperty(QueryHints.RUN_LAST, false)) { - } - - if (runLast != null) { - - joinGroup.setArg(slots[i++], runLast); - - } - -// if (runLast != null && !runLast.isOptional()) { -// -// joinGroup.setArg(slots[i++], runLast); -// -// } -// -// for (StatementPatternNode sp : optional) { -// -// joinGroup.setArg(slots[i++], sp); -// -// } -// -// if (runLast != null && runLast.isOptional()) { -// -// joinGroup.setArg(slots[i++], runLast); -// -// } - - } - - /* - * Update the ancestry for recursion. Only add the - * non-optional statement pattern nodes - the ones that we - * can count on to bind their variables. - */ - final List<IBindingProducerNode> tmp = new LinkedList<IBindingProducerNode>(); - - for (IBindingProducerNode ancestor : ancestry) { - - tmp.add(ancestor); - - } - - for(IReorderableNode node : nodes) { - -// if (!sp.isOptional()) { - - tmp.add(node); - -// } - - } - - if (tmp.size() > ancestry.length) { - - ancestry = tmp.toArray(new IBindingProducerNode[tmp.size()]); - - } - - } - - } - - /* - * Recursion, but only into group nodes (including within subqueries). - */ - for (int i = 0; i < op.arity(); i++) { + runLast = sp; + + } else { + + required.add(sp); + + } + + } + + /* + * Calculate the optimized join ordering for the required + * tails. + */ + final StaticOptimizer opt = new StaticOptimizer(queryRoot, + ctx, ancestry, required, optimistic); - final BOp child = op.get(i); + final int[] order = opt.getOrder(); - if (child instanceof GraphPatternGroup<?>) { + /* + * Reorder the statement pattern nodes within the join + * group. + */ + int i = 0; + for (int j = 0; j < required.size(); j++) { - @SuppressWarnings("unchecked") - final GraphPatternGroup<IGroupMemberNode> childGroup = (GraphPatternGroup<IGroupMemberNode>) child; + final IReorderableNode sp = required.get(order[j]); - optimize(ctx, exogenousBindings, queryRoot, ancestry, childGroup); - - } else if (child instanceof QueryBase) { + joinGroup.setArg(slots[i++], sp); - final QueryBase subquery = (QueryBase) child; + } + + if (runLast != null) { + + joinGroup.setArg(slots[i++], runLast); + + } + } + } - @SuppressWarnings("unchecked") - final GraphPatternGroup<IGroupMemberNode> childGroup = (GraphPatternGroup<IGroupMemberNode>) subquery - .getWhereClause(); - /* - * Only the projected variables are in scope in the subquery. - */ - - final Set<IVariable<?>> projectedVars = subquery - .getProjectedVars(new LinkedHashSet<IVariable<?>>()); - - final IVariable<?>[] variablesToKeep = BOpUtility - .toArray(projectedVars.iterator()); - - final IBindingSet tmp = exogenousBindings == null ? null - : exogenousBindings.copy(variablesToKeep); - - optimize(ctx, tmp, queryRoot, ancestry, childGroup); - - } - - } - - } + /** * Use the SPORelation from the database to grab the appropriate range @@ -659,951 +648,6 @@ } - /** - * This is the old static optimizer code, taken directly from - * {@link DefaultEvaluationPlan2}, but lined up with the AST API instead of - * the Rule and IPredicate API. - * - */ - private static final class StaticOptimizer { - - private final QueryRoot queryRoot; - - private final AST2BOpContext context; - - private final StaticAnalysis sa; - - private final IBindingProducerNode[] ancestry; - - private final Set<IVariable<?>> ancestryVars; - - private final List<IReorderableNode> nodes; - - private final int arity; - - private static final transient long BOTH_OPTIONAL = Long.MAX_VALUE-1; - - private static final transient long ONE_OPTIONAL = Long.MAX_VALUE-2; - - private static final transient long NO_SHARED_VARS = Long.MAX_VALUE-3; - - /** - * The computed evaluation order. The elements in this array are the order - * in which each tail predicate will be evaluated. The index into the array - * is the index of the tail predicate whose evaluation order you want. So - * <code>[2,0,1]</code> says that the predicates will be evaluated in the - * order tail[2], then tail[0], then tail[1]. - */ - private int[/* order */] order; - - public int[] getOrder() { - - if (order == null) { - - /* - * This will happen if you try to use toString() during the ctor - * before the order has been computed. - */ - - throw new IllegalStateException(); - - } -// calc(); - - return order; - - } - - /** - * Cache of the computed range counts for the predicates in the tail. The - * elements of this array are initialized to -1L, which indicates that the - * range count has NOT been computed. Range counts are computed on demand - * and MAY be zero. Only an approximate range count is obtained. Such - * approximate range counts are an upper bound on the #of elements that are - * spanned by the access pattern. Therefore if the range count reports ZERO - * (0L) it is a real zero and the access pattern does not match anything in - * the data. The only other caveat is that the range counts are valid as of - * the commit point on which the access pattern is reading. If you obtain - * them for {@link ITx#READ_COMMITTED} or {@link ITx#UNISOLATED} views then - * they could be invalidated by concurrent writers. - */ - private long[/*tailIndex*/] rangeCount; - - /** - * Keeps track of which tails have been used already and which still need - * to be evaluated. - */ - private transient boolean[/*tailIndex*/] used; - -// /** -// * <code>true</code> iff the rule was proven to have no solutions. -// */ -// private boolean empty = false; - - /** - * See {@link Annotations#OPTIMISTIC}. - */ - private final double optimistic; - -// public boolean isEmpty() { -// -// return empty; -// -// } - - /** - * Computes an evaluation plan for the rule. - * - * @param rangeCountFactory - * The range count factory. - * @param rule - * The rule. - */ - public StaticOptimizer(final QueryRoot queryRoot, - final AST2BOpContext context, - final IBindingProducerNode[] ancestry, - final List<IReorderableNode> nodes, - final double optimistic) { - - if (queryRoot == null) - throw new IllegalArgumentException(); - - if (context == null) - throw new IllegalArgumentException(); - - if (ancestry == null) - throw new IllegalArgumentException(); - - if (nodes == null) - throw new IllegalArgumentException(); - - this.queryRoot = queryRoot; - - this.context = context; - - this.sa = new StaticAnalysis(queryRoot, context); - - this.ancestry = ancestry; - - this.ancestryVars = new LinkedHashSet<IVariable<?>>(); - - this.nodes = nodes; - - this.arity = nodes.size(); - - if (log.isDebugEnabled()) { - log.debug("arity: " + arity); - } - - this.optimistic = optimistic; - - calc(); - - if (log.isDebugEnabled()) { - for (int i = 0; i < arity; i++) { - log.debug(order[i]); - } - } - - } - - /** - * Compute the evaluation order. - */ - private void calc() { - - if (order != null) - return; - - order = new int[arity]; - rangeCount = new long[arity]; - used = new boolean[arity]; - - // clear arrays. - for (int i = 0; i < arity; i++) { - order[i] = -1; // -1 is used to detect logic errors. - rangeCount[i] = -1L; // -1L indicates no range count yet. - used[i] = false; // not yet evaluated - } - - if (arity == 0) { - return; - } - - if (arity == 1) { - order[0] = 0; - return; - } - - /* - if (tailCount == 2) { - order[0] = cardinality(0) <= cardinality(1) ? 0 : 1; - order[1] = cardinality(0) <= cardinality(1) ? 1 : 0; - return; - } - */ - -// int startIndex = 0; - - /* - * DEAD CODE. See ASTSearchOptimizer, which lifts the full text - * search into a ServiceNode. This is captured by the ancestry - * code just below. - */ -// for (int i = 0; i < arity; i++) { -// final StatementPatternNode pred = spNodes.get(i); -//// final IAccessPathExpander expander = pred.getAccessPathExpander(); -// final TermNode p = pred.p(); -// if (p != null && p.isConstant() && p.getValue() != null && p.getValue().equals(BD.SEARCH)) { -// if (log.isDebugEnabled()) log.debug("found a run first, tail " + i); -//// final Iterator<IVariable<?>> it = BOpUtility.getArgumentVariables(pred); -//// while (it.hasNext()) { -//// runFirstVars.add(it.next()); -//// } -// final TermNode s = pred.s(); -// if (s != null && s.isVariable()) { -// runFirstVars.add((IVariable<?>) s.getValueExpression()); -// } -// order[startIndex++] = i; -// used[i] = true; -// } -// } - - /* - * Seems like the easiest way to handle the ancestry is the exact - * same way we handle the "run first" statement patterns (text search), - * which is that we collect up the variables that are bound and then - * give preferential treatment to the predicates that can join - * on those variables. - */ - for (IBindingProducerNode join : ancestry) { - if (log.isDebugEnabled()) { - log.debug("considering join node from ancestry: " + join); - } - sa.getDefinitelyProducedBindings(join, ancestryVars, false/* recursive */); - } - if (log.isDebugEnabled()) { - log.debug("bindings from ancestry: " + Arrays.toString(ancestryVars.toArray())); - } - - // LEFTOVER FROM THE OLD DAYS (see above) -// // if there are no more tails left after the expanders, we're done -// if (startIndex == arity) { -// return; -// } - - // if there is only one tail - if (arity == 1) { - if (log.isDebugEnabled()) log.debug("one tail left"); - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - order[arity-1] = i; - used[i] = true; - return; - } - } - - /* - * See if there is a best tail to run first. The "run first" - * query hint gets priority, then the tails that share variables - * with the ancestry. - */ - int preferredFirstTail = -1; - - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - - /* - * We need the optimizer to play nice with the run first hint. - * Choose the first "run first" tail we see. - */ - if (nodes.get(i).getProperty(QueryHints.RUN_FIRST, false)) { - preferredFirstTail = i; - break; - } - } - - /* - * If there was no "run first" query hint, then go to the ancestry. - */ - if (preferredFirstTail == -1) - preferredFirstTail = getNextTailThatSharesVarsWithAncestry(); - - if (log.isDebugEnabled()) { - log.debug("preferred first tail: " + preferredFirstTail); - } - - // special case if there are only two tails - if (arity == 2) { - if (log.isDebugEnabled()) log.debug("two tails left"); - int t1 = -1; - int t2 = -1; - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - // find the two unused tail indexes - if (t1 == -1) { - t1 = i; - } else { - t2 = i; - break; - } - } - if (log.isDebugEnabled()) log.debug(t1 + ", " + t2); - if (preferredFirstTail != -1) { - order[arity-2] = preferredFirstTail; - order[arity-1] = preferredFirstTail == t1 ? t2 : t1; - } else { - order[arity-2] = cardinality(t1) <= cardinality(t2) ? t1 : t2; - order[arity-1] = cardinality(t1) <= cardinality(t2) ? t2 : t1; - } - return; - } - - /* - * There will be (tails-1) joins, we just need to figure out what - * they should be. - */ - Join join; - if (preferredFirstTail == -1) - join = getFirstJoin(); - else - join = getFirstJoin(preferredFirstTail); - - int t1 = ((Tail) join.getD1()).getTail(); - int t2 = ((Tail) join.getD2()).getTail(); - if (preferredFirstTail == -1) { - order[0] = cardinality(t1) <= cardinality(t2) ? t1 : t2; - order[1] = cardinality(t1) <= cardinality(t2) ? t2 : t1; - } else { - order[0] = t1; - order[1] = t2; - } - used[order[0]] = true; - used[order[1]] = true; - for (int i = 2; i < arity; i++) { - join = getNextJoin(join); - order[i] = ((Tail) join.getD2()).getTail(); - used[order[i]] = true; - } - - } - - /** - * Start by looking at every possible initial join. Take every tail and - * match it with every other tail to find the lowest possible cardinality. - * See {@link #computeJoinCardinality(com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2.IJoinDimension, com.bigdata.bop.joinGraph.fast.DefaultEvaluationPlan2.IJoinDimension)} - * for more on this. - */ - private Join getFirstJoin() { - if (log.isDebugEnabled()) { - log.debug("evaluating first join"); - } - long minJoinCardinality = Long.MAX_VALUE; - long minTailCardinality = Long.MAX_VALUE; - long minOtherTailCardinality = Long.MAX_VALUE; - Tail minT1 = null; - Tail minT2 = null; - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - Tail t1 = new Tail(i, rangeCount(i), getVars(i)); - long t1Cardinality = cardinality(i); - for (int j = 0; j < arity; j++) { - // check only non-same and unused tails - if (i == j || used[j]) { - continue; - } - Tail t2 = new Tail(j, rangeCount(j), getVars(j)); - long t2Cardinality = cardinality(j); - long joinCardinality = computeJoinCardinality(t1, t2); - long tailCardinality = Math.min(t1Cardinality, t2Cardinality); - long otherTailCardinality = Math.max(t1Cardinality, t2Cardinality); - if(log.isDebugEnabled()) log.debug("evaluating " + i + " X " + j + ": cardinality= " + joinCardinality); - if (joinCardinality < minJoinCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minTailCardinality = tailCardinality; - minOtherTailCardinality = otherTailCardinality; - minT1 = t1; - minT2 = t2; - } else if (joinCardinality == minJoinCardinality) { - if (tailCardinality < minTailCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minTailCardinality = tailCardinality; - minOtherTailCardinality = otherTailCardinality; - minT1 = t1; - minT2 = t2; - } else if (tailCardinality == minTailCardinality) { - if (otherTailCardinality < minOtherTailCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minTailCardinality = tailCardinality; - minOtherTailCardinality = otherTailCardinality; - minT1 = t1; - minT2 = t2; - } - } - } - } - } - // the join variables is the union of the join dimensions' variables - Set<String> vars = new HashSet<String>(); - vars.addAll(minT1.getVars()); - vars.addAll(minT2.getVars()); - return new Join(minT1, minT2, minJoinCardinality, vars); - } - - private Join getFirstJoin(final int preferredFirstTail) { - if (log.isDebugEnabled()) { - log.debug("evaluating first join"); - } - - long minJoinCardinality = Long.MAX_VALUE; - long minOtherTailCardinality = Long.MAX_VALUE; - Tail minT2 = null; - final int i = preferredFirstTail; - final Tail t1 = new Tail(i, rangeCount(i), getVars(i)); - for (int j = 0; j < arity; j++) { - // check only non-same and unused tails - if (i == j || used[j]) { - continue; - } - Tail t2 = new Tail(j, rangeCount(j), getVars(j)); - long t2Cardinality = cardinality(j); - long joinCardinality = computeJoinCardinality(t1, t2); - if(log.isDebugEnabled()) log.debug("evaluating " + i + " X " + j + ": cardinality= " + joinCardinality); - if (joinCardinality < minJoinCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minOtherTailCardinality = t2Cardinality; - minT2 = t2; - } else if (joinCardinality == minJoinCardinality) { - if (t2Cardinality < minOtherTailCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minOtherTailCardinality = t2Cardinality; - minT2 = t2; - } - } - } - - // the join variables is the union of the join dimensions' variables - Set<String> vars = new HashSet<String>(); - vars.addAll(t1.getVars()); - vars.addAll(minT2.getVars()); - return new Join(t1, minT2, minJoinCardinality, vars); - } - - /** - * Similar to {@link #getFirstJoin()}, but we have one join dimension - * already calculated. - * - * @param d1 - * the first join dimension - * @return - * the new join with the lowest cardinality from the remaining tails - */ - private Join getNextJoin(IJoinDimension d1) { - if (log.isDebugEnabled()) { - log.debug("evaluating next join"); - } - long minJoinCardinality = Long.MAX_VALUE; - long minTailCardinality = Long.MAX_VALUE; - Tail minTail = null; - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - Tail tail = new Tail(i, rangeCount(i), getVars(i)); - long tailCardinality = cardinality(i); - long joinCardinality = computeJoinCardinality(d1, tail); - if(log.isDebugEnabled()) log.debug("evaluating " + d1.toJoinString() + " X " + i + ": cardinality= " + joinCardinality); - if (joinCardinality < minJoinCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minTailCardinality = tailCardinality; - minTail = tail; - } else if (joinCardinality == minJoinCardinality) { - if (tailCardinality < minTailCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + joinCardinality); - minJoinCardinality = joinCardinality; - minTailCardinality = tailCardinality; - minTail = tail; - } - } - } - - /* - * If we are at the "no shared variables" tails, the first thing - * we do is look to the ancestry for the next tail. - */ - if (minJoinCardinality == NO_SHARED_VARS) { - final int i = getNextTailThatSharesVarsWithAncestry(); - if (i >= 0) { - final long tailCardinality = cardinality(i); - minJoinCardinality = tailCardinality; - minTail = new Tail(i, rangeCount(i), getVars(i)); - if(log.isDebugEnabled()) log.debug("found a new min: " + tailCardinality); - } - } - - /* - * If we are still at the "no shared variables" state, then simply - * order by range count and choose the min. - */ - if (minJoinCardinality == NO_SHARED_VARS) { - minJoinCardinality = Long.MAX_VALUE; - for (int i = 0; i < arity; i++) { - // only check unused tails - if (used[i]) { - continue; - } - Tail tail = new Tail(i, rangeCount(i), getVars(i)); - long tailCardinality = cardinality(i); - if (tailCardinality < minJoinCardinality) { - if(log.isDebugEnabled()) log.debug("found a new min: " + tailCardinality); - minJoinCardinality = tailCardinality; - minTail = tail; - } - } - } - - // the join variables is the union of the join dimensions' variables - Set<String> vars = new HashSet<String>(); - vars.addAll(d1.getVars()); - vars.addAll(minTail.getVars()); - return new Join(d1, minTail, minJoinCardinality, vars); - } - - /** - * Return the range count for the predicate, ignoring any bindings. The - * range count for the tail predicate is cached the first time it is - * requested and returned from the cache thereafter. The range counts are - * requested using the "non-exact" range count query, so the range counts - * are actually the upper bound. However, if the upper bound is ZERO (0) - * then the range count really is ZERO (0). - * - * @param tailIndex - * The index of the predicate in the tail of the rule. - * - * @return The range count for that tail predicate. - */ - public long rangeCount(final int tailIndex) { - - if (rangeCount[tailIndex] == -1L) { - -// final IPredicate predicate = spNodes.getTail(tailIndex); -// -// final IAccessPathExpander expander = predicate.getAccessPathExpander(); -// -// if (expander != null && expander.runFirst()) { -// -// /* -// * Note: runFirst() essentially indicates that the cardinality -// * of the predicate in the data is to be ignored. Therefore we -// * do not request the actual range count and just return -1L as -// * a marker indicating that the range count is not available. -// */ -// -// return -1L; -// -// } -// -// final long rangeCount = rangeCountFactory -// .rangeCount(spNodes.getTail(tailIndex)); - -// final long rangeCount = (long) nodes.get(tailIndex).getProperty( -// Annotations.ESTIMATED_CARDINALITY, -1l); - - final long rangeCount = (long) nodes.get(tailIndex).getEstimatedCardinality(); - - this.rangeCount[tailIndex] = rangeCount; - - } - - return rangeCount[tailIndex]; - - } - - /** - * Return the cardinality of a particular tail, which is the range count - * if not optional and infinite if optional. - */ - public long cardinality(final int tailIndex) { -// IPredicate tail = spNodes.getTail(tailIndex); -// final StatementPatternNode tail = nodes.get(tailIndex); -// if (tail.isOptional()/* || tail instanceof IStarJoin */) { -// return Long.MAX_VALUE; -// } else { -// return rangeCount(tailIndex); -// } - return rangeCount(tailIndex); - } - - public String toString() { - return Arrays.toString(getOrder()); - } - - /** - * This is the secret sauce. There are three possibilities for computing - * the join cardinality, which we are defining as the upper-bound for - * solutions for a particular join. First, if there are no shared variables - * then the cardinality will just be the simple product of the cardinality of - * each join dimension. If there are shared variables but no unshared - * variables, then the cardinality will be the minimum cardinality from - * the join dimensions. If there are shared variables but also some - * unshared variables, then the join cardinality will be the maximum - * cardinality from each join dimension. - * <p> - * Any join involving an optional will have infinite cardinality, so that - * optionals get placed at the end. - * - * @param d1 - * the first join dimension - * @param d2 - * the second join dimension - * @return - * the join cardinality - */ - protected long computeJoinCardinality(IJoinDimension d1, IJoinDimension d2) { -// // two optionals is worse than one -// if (d1.isOptional() && d2.isOptional()) { -// return BOTH_OPTIONAL; -// } -// if (d1.isOptional() || d2.isOptional()) { -// return ONE_OPTIONAL; -// } - final boolean sharedVars = hasSharedVars(d1, d2); - final boolean unsharedVars = hasUnsharedVars(d1, d2); - final long joinCardinality; - if (sharedVars == false) { - // no shared vars - take the sum - // joinCardinality = d1.getCardinality() + d2.getCardinality(); - // different approach - give preference to shared variables - joinCardinality = NO_SHARED_VARS; - } else { - if (unsharedVars == false) { - // shared vars and no unshared vars - take the min - joinCardinality = - Math.min(d1.getCardinality(), d2.getCardinality()); - } else { - // shared vars and unshared vars - take the max - /* - * This modification to the join planner results in - * significantly faster queries for the bsbm benchmark (3x - - * 5x overall). It takes a more optimistic perspective on - * the intersection of two statement patterns, predicting - * that this will constraint, rather than increase, the - * multiplicity of the solutions. However, this COULD lead - * to pathological cases where the resulting join plan is - * WORSE than it would have been otherwise. For example, - * this change produces a 3x to 5x improvement in the BSBM - * benchmark results. However, it has a negative effect on - * LUBM Q2. - * - * Update: Ok so just to go into a little detail - - * yesterday's change means we choose the join ordering - * based on an optimistic view of the cardinality of any - * particular join. If you have two triple patterns that - * share variables but that also have unshared variables, - * then technically the maximum cardinality of the join is - * the maximum range count of the two tails. But often the - * true cardinality of the join is closer to the minimum - * range count than the maximum. So yesterday we started - * assigning an expected cardinality for the join of the - * minimum range count rather than the maximum. What this - * means is that a lot of the time when those joins move - * toward the front of the line the query will do a lot - * better, but occasionally (LUBM 2), the query will do much - * much worse (when the true cardinality is closer to the - * max range count). - * - * Today we put in an extra tie-breaker condition. We - * already had one tie-breaker - if two joins have the same - * expected cardinality we chose the one with the lower - * minimum range count. But the new tie-breaker is that if - * two joins have the same expected cardinality and minimum - * range count, we now chose the one that has the minimum - * range count on the other tail (the minimum maximum if - * that makes sense). - * - * 11/14/2011: - * The static join order optimizer should consider the swing - * in stakes when choosing between either the MIN or the MAX - * of the cardinality of two join dimensions in order to - * decide which join to schedule next. Historically it took - * the MAX, but there are counter examples to that decision - * such as LUBM Q2. Subsequently it was modified to take the - * MIN, but BSBM BI Q1 is a counter example for that. - * - * Modify the static optimizer to consider the swing in - * stakes between the choice of MAX versus MIN. I believe - * that this boils down to something like "If an incorrect - * guess of MIN would cause us to suffer a very bad MAX, - * then choose based on the MAX to avoid paying that - * penalty." - */ - joinCardinality = (long) ((long) - (optimistic * Math.min(d1.getCardinality(), d2.getCardinality())) + - ((1.0d-optimistic) * Math.max(d1.getCardinality(), d2.getCardinality()))); - } - } - return joinCardinality; - } - - /** - * Get the named variables for a given tail. Is there a better way to do - * this? - * - * @param tail - * the tail - * @return - * the named variables - */ - protected Set<String> getVars(int tail) { -// final Set<String> vars = new HashSet<String>(); -// IPredicate pred = spNodes.getTail(tail); -// for (int i = 0; i < pred.arity(); i++) { -// IVariableOrConstant term = pred.get(i); -// if (term.isVar()) { -// vars.add(term.getName()); -// } -// } - final IReorderableNode node = nodes.get(tail); - if (log.isDebugEnabled()) { - log.debug(node); - } -// for (int i = 0; i < sp.arity(); i++) { -// final TermNode term = (TermNode) sp.get(i); -// if (log.isDebugEnabled()) { -// log.debug(term); -// } -// if (term != null && term.isVariable()) { -// vars.add(term.getValueExpressio... [truncated message content] |
From: <jer...@us...> - 2013-08-05 17:02:43
|
Revision: 7243 http://bigdata.svn.sourceforge.net/bigdata/?rev=7243&view=rev Author: jeremy_carroll Date: 2013-08-05 17:02:35 +0000 (Mon, 05 Aug 2013) Log Message: ----------- fixed 712 - SELECT ?x { OPTIONAL { FILTER (false) }}, including test cases Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestOptionals.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.rq branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.srx branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.ttl branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-02.rq Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java 2013-08-05 15:01:56 UTC (rev 7242) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/optimizers/ASTEmptyGroupOptimizer.java 2013-08-05 17:02:35 UTC (rev 7243) @@ -234,14 +234,17 @@ } else if (arity == 1 && op instanceof JoinGroupNode && op.get(0) instanceof JoinGroupNode) { - + final JoinGroupNode parent = (JoinGroupNode) op; final JoinGroupNode child = (JoinGroupNode) op.get(0); - if (!parent.isMinus() && !child.isMinus()) { + if ( (!parent.isMinus() && !child.isMinus()) + // fix for trac 712 + && (parent.isOptional() || !child.isOptional()) + ) { - /* + /* * We can always merge two JoinGroupNodes into one, but we have * to make sure we get the optionality right. * @@ -253,6 +256,8 @@ 1. JoinGroup1 [optional=false] { JoinGroup2 [optional=false] { ... } } -> JoinGroup2 [optional=false] { ... } 2. JoinGroup1 [optional=true] { JoinGroup2 [optional=true] { ... } } -> JoinGroup2 [optional=true] { ... } 3. JoinGroup1 [optional=true] { JoinGroup2 [optional=false] { ... } } -> JoinGroup2 [optional=true] { ... } + +This case 4 appears to be misconceived: Jeremy Carroll. 4. JoinGroup1 [optional=false] { JoinGroup2 [optional=true] { ... } } -> JoinGroup2 [optional=true] { ... } */ Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestOptionals.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestOptionals.java 2013-08-05 15:01:56 UTC (rev 7242) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestOptionals.java 2013-08-05 17:02:35 UTC (rev 7243) @@ -128,8 +128,59 @@ } + /** + * Unit test for <a href="http://sourceforge.net/apps/trac/bigdata/ticket/712">trac 712</a> + * * <pre> + * select ?x + * where { + * OPTIONAL { + * FILTER ( false ) + * } + * } + * </pre> + */ + public void test_non_matching_optional_01() throws Exception { + + new TestHelper( + "non-matching-optional-01",// testURI + "non-matching-optional-01.rq", // queryURI + "non-matching-optional-01.ttl", // dataURI + "non-matching-optional-01.srx" // resultURI +// ,true// laxCardinality +// ,false // checkOrder + ).runTest(); + + } + + /** + * Unit test for <a href="http://sourceforge.net/apps/trac/bigdata/ticket/712">trac 712</a> + * + * <pre> + * select ?x + * where { + * BIND ( 1 as ?y ) + * OPTIONAL { + * FILTER ( false ) + * } + * } + * </pre> + */ + public void test_non_matching_optional_02() throws Exception { + + new TestHelper( + "non-matching-optional-02",// testURI + "non-matching-optional-02.rq", // queryURI + "non-matching-optional-01.ttl", // dataURI - same as before + "non-matching-optional-01.srx" // resultURI - same as before +// ,true// laxCardinality +// ,false // checkOrder + ).runTest(); + + } + /** + * <pre> * select * * where { * ?s rdf:type :Person . Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.rq 2013-08-05 17:02:35 UTC (rev 7243) @@ -0,0 +1,10 @@ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX foaf: <http://xmlns.com/foaf/0.1/> + +select ?x +where { + OPTIONAL { + FILTER ( false ) + } +} Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.srx =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.srx 2013-08-05 17:02:35 UTC (rev 7243) @@ -0,0 +1,12 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="x"/> + </head> + <results> + <result/> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-01.ttl 2013-08-05 17:02:35 UTC (rev 7243) @@ -0,0 +1,2 @@ + +# no data needed \ No newline at end of file Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-02.rq =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-02.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/non-matching-optional-02.rq 2013-08-05 17:02:35 UTC (rev 7243) @@ -0,0 +1,13 @@ +PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> +PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> +PREFIX foaf: <http://xmlns.com/foaf/0.1/> + +# use exactly the same test data (none) and test result (one empty binding) +# as test non-matching-optional-01 +select ?x +where { + BIND ( 1 as ?y ) + OPTIONAL { + FILTER ( false ) + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-05 15:02:05
|
Revision: 7242 http://bigdata.svn.sourceforge.net/bigdata/?rev=7242&view=rev Author: thompsonbry Date: 2013-08-05 15:01:56 +0000 (Mon, 05 Aug 2013) Log Message: ----------- I have identified several cases where IBlockingBuffer.setFuture() was not being invoked until after the Future had already begun to execute. This could result in the producer (the Future is for the producer) not being cancelled if the consumer was closed before the setFuture() was invoked. I have updated the code to use a FutureTask to wrap the computation, use setFuture(ft) for that FutureTask, and only then submit the FutureTask for evaluation. This ensures that the Future of the producer is set before the consumer starts to run. Changes were made to the following files: {{{ - AccessPath#asynchronousIterator() - AbstractClientIndexView#newWriteBuffer() - ClientIndexView#newWriteBuffer() - ClientIndexView#parallelRangeIterator() - ClientAsynchronousIterator#start() - MappedTaskMaster#newResourceBuffer() - DGExpander.InnerIterator#312 - TripleStoreUtility#notFoundInTarget() }}} In addition, the following unit tests had the bad pattern and were fixed: {{{ - TestBlockingBuffer - TestFileSystemScanner - TestMasterTask - TestMasterTaskWithTimeout - TestMasterTaskWithErrors - TestMasterTaskWithRedirect - TestMasterTaskWithSplits (note: this test is no in CI - never finished) }}} There were two places in the code where I could not apply this pattern because the consumer was started in a different lexical scope. For these two cases, I wrapped the code to ensure that the Future was cancelled if it there was an exit by a code path that did not cause setFuture() to be invoked. ProgramTask#362 {{{ try { // run the task. future = queryTask.submit(); // set the future on the BlockingBuffer. buffer.setFuture(future); } finally { if (future != null && buffer.getFuture() == null) { // Future exists but not set on BlockingBuffer. future.cancel(true/* mayInterruptIfRunning */); } } }}} AbstractMasterTask#905: {{{ Future<? extends AbstractSubtaskStats> future = null; try { // assign a worker thread to the sink. future = submitSubtask(sink); // set Future (can now be cancelled) out.setFuture(future); } finally { if (future != null && buffer.getFuture() == null) { // Future exists but not set on BlockingBuffer. future.cancel(true/* mayInterruptIfRunning */); } } }}} See https://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() leaks threads) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/proxy/ClientAsynchronousIterator.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestFileSystemScanner.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTask.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskIdleTimeout.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithErrors.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithRedirect.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithSplits.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/MappedTaskMaster.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/DGExpander.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/AccessPath.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -31,7 +31,7 @@ import java.util.Iterator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import org.apache.log4j.Logger; @@ -1182,14 +1182,21 @@ final BlockingBuffer<R[]> buffer = new BlockingBuffer<R[]>( chunkOfChunksCapacity); - final ExecutorService executorService = indexManager - .getExecutorService(); + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ - final Future<Void> future = executorService - .submit(new ChunkConsumerTask<R>(this, src, buffer)); + // Wrap computation as FutureTask. + final FutureTask<Void> ft = new FutureTask<Void>( + new ChunkConsumerTask<R>(this, src, buffer)); - buffer.setFuture(future); + // Set Future on BlockingBuffer *before* starting computation. + buffer.setFuture(ft); + // Start computation. + indexManager.getExecutorService().submit(ft); + return new ChunkConsumerIterator<R>(buffer.iterator(), keyOrder); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/rule/eval/ProgramTask.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -355,13 +355,25 @@ * @todo if the #of results is small and they are available with * little latency then return the results inline using a fully * buffered iterator. + * + * Note: hack pattern to ensure Future is cancelled if we exit by + * any code path before the future has been set on the BlockingBuffer. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> */ + try { + // run the task. + future = queryTask.submit(); - // run the task. - future = queryTask.submit(); - - // set the future on the BlockingBuffer. - buffer.setFuture(future); + // set the future on the BlockingBuffer. + buffer.setFuture(future); + } finally { + if (future != null && buffer.getFuture() == null) { + // Future exists but not set on BlockingBuffer. + future.cancel(true/* mayInterruptIfRunning */); + } + } if (log.isDebugEnabled()) log.debug("Returning iterator reading on async query task"); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/AbstractScaleOutClientIndexView.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -33,7 +33,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -75,16 +75,15 @@ import com.bigdata.mdi.PartitionLocator; import com.bigdata.relation.accesspath.BlockingBuffer; import com.bigdata.resources.StaleLocatorException; +import com.bigdata.service.AbstractClient; import com.bigdata.service.AbstractScaleOutFederation; +import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataClient.Options; -import com.bigdata.service.AbstractClient; -import com.bigdata.service.IBigdataClient; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; import com.bigdata.service.IMetadataService; import com.bigdata.service.Split; import com.bigdata.service.ndx.pipeline.IDuplicateRemover; -import com.bigdata.service.ndx.pipeline.IndexAsyncWriteStats; import com.bigdata.service.ndx.pipeline.IndexWriteTask; import cutthecrap.utils.striterators.IFilter; @@ -1272,11 +1271,21 @@ writeBuffer// ); - final Future<? extends IndexAsyncWriteStats> future = fed - .getExecutorService().submit(task); + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ - writeBuffer.setFuture(future); + // Wrap computation as FutureTask. + @SuppressWarnings({ "unchecked", "rawtypes" }) + final FutureTask<?> ft = new FutureTask(task); + // Set Future on BlockingBuffer + writeBuffer.setFuture(ft); + + // Submit computation for evaluation. + fed.getExecutorService().submit(ft); + return task.getBuffer(); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/ClientIndexView.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -36,6 +36,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -91,7 +92,6 @@ import com.bigdata.service.IMetadataService; import com.bigdata.service.Split; import com.bigdata.service.ndx.pipeline.IDuplicateRemover; -import com.bigdata.service.ndx.pipeline.IndexAsyncWriteStats; import com.bigdata.service.ndx.pipeline.IndexWriteTask; import com.bigdata.striterator.ICloseableIterator; import com.bigdata.util.InnerCause; @@ -833,8 +833,20 @@ ts, isReadConsistentTx, fromKey, toKey, capacity, flags, filter, queryBuffer); - queryBuffer.setFuture(fed.getExecutorService().submit(task)); + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + // Wrap computation as FutureTask. + final FutureTask<Void> ft = new FutureTask<Void>(task); + + // Set Future on BlockingBuffer. + queryBuffer.setFuture(ft); + + // Submit computation for evaluation. + fed.getExecutorService().submit(ft); + return new UnchunkedTupleIterator(queryBuffer.iterator()); } @@ -2228,11 +2240,21 @@ writeBuffer// ); - final Future<? extends IndexAsyncWriteStats> future = fed - .getExecutorService().submit(task); + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ - writeBuffer.setFuture(future); + // Wrap computation as FutureTask. + @SuppressWarnings({ "unchecked", "rawtypes" }) + final FutureTask<?> ft = new FutureTask(task); + // Set Future on BlockingBuffer. + writeBuffer.setFuture(ft); + + // Submit computation for evaluation. + fed.getExecutorService().submit(ft); + return task.getBuffer(); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/ndx/pipeline/AbstractMasterTask.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -896,11 +896,29 @@ // if (oldval == null) { - // assign a worker thread to the sink. - final Future<? extends AbstractSubtaskStats> future = submitSubtask(sink); + /** + * Hack pattern ensures that the Future is cancelled if we exit + * by any code path after the Future has been submitted for + * evaluation and before the Future has been set on the + * BlockingBuffer. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + Future<? extends AbstractSubtaskStats> future = null; + try { + // assign a worker thread to the sink. + future = submitSubtask(sink); + // set Future (can now be cancelled) + out.setFuture(future); + } finally { + if (future != null && buffer.getFuture() == null) { + // Future exists but not set on BlockingBuffer. + future.cancel(true/* mayInterruptIfRunning */); + } + } - out.setFuture(future); - stats.subtaskStartCount.incrementAndGet(); // } else { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/proxy/ClientAsynchronousIterator.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/proxy/ClientAsynchronousIterator.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/service/proxy/ClientAsynchronousIterator.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -180,19 +181,27 @@ // allocate local buffer. this.localBuffer = new BlockingBuffer<E>(capacity); - - // start reader. - this.future = executorService.submit(new ReaderTask()); + + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + + // Wrap computation as FutureTask. + final FutureTask<Void> ft = new FutureTask<Void>(new ReaderTask()); /* * Set future on the local buffer so that we can interrupt it when the * client side of the iterator is closed. */ - this.localBuffer.setFuture(future); + this.localBuffer.setFuture(future = ft); // save reference to iterator draining the [localBuffer]. this.localIterator = localBuffer.iterator(); + // start reader. + executorService.submit(ft); + } /** Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -318,21 +319,26 @@ final ExecutorService service = Executors .newSingleThreadExecutor(DaemonThreadFactory .defaultThreadFactory()); - Future<?> f = null; + + FutureTask<Void> ft = null; try { - f = service.submit(new Producer()); - + // Wrap computation as FutureTask. + ft = new FutureTask<Void>(new Producer(), (Void) null/* result */); + /* * Set the Future on the BlockingBuffer. This is how it will notice * when the iterator is closed. */ - buffer.setFuture(f); + buffer.setFuture(ft); + // Submit computation for evaluation. + service.submit(ft); + Thread.sleep(200/*ms*/); // The producer should be blocked. - assertFalse(f.isDone()); + assertFalse(ft.isDone()); // Closed the buffer using the iterator. buffer.iterator().close(); @@ -340,7 +346,7 @@ // Verify producer was woken up. try { - f.get(1/* timeout */, TimeUnit.SECONDS); + ft.get(1/* timeout */, TimeUnit.SECONDS); } catch(CancellationException ex) { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestFileSystemScanner.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestFileSystemScanner.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestFileSystemScanner.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -34,7 +34,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import junit.framework.TestCase2; @@ -114,11 +114,15 @@ .defaultThreadFactory()); try { - final Future<Long> future = service.submit(new DrainBuffer()); + // Wrap computation as FutureTask. + final FutureTask<Long> ft = new FutureTask<Long>(new DrainBuffer()); // buffer will be abort()ed if task fails. - buffer.setFuture(future); - + buffer.setFuture(ft); + + // start computation + service.submit(ft); + final Long acceptCount = scanner.call(); if (log.isInfoEnabled()) @@ -128,7 +132,7 @@ buffer.close(); // compare the accept count with the drain task count. - assertEquals(acceptCount, future.get()); + assertEquals(acceptCount, ft.get()); } finally { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTask.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTask.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTask.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -29,8 +29,9 @@ package com.bigdata.service.ndx.pipeline; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.bigdata.btree.keys.KVO; import com.bigdata.relation.accesspath.BlockingBuffer; @@ -67,10 +68,15 @@ final M master = new M(masterStats, masterBuffer, executorService); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + masterBuffer.close(); masterBuffer.getFuture().get(); @@ -100,10 +106,15 @@ final M master = new M(masterStats, masterBuffer, executorService); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + // Start the consumer. + executorService.submit(ft); + final KVO<O>[] a = new KVO[0]; masterBuffer.add(a); @@ -125,9 +136,10 @@ * * @throws InterruptedException * @throws ExecutionException + * @throws TimeoutException */ public void test_startWriteStop1() throws InterruptedException, - ExecutionException { + ExecutionException, TimeoutException { final H masterStats = new H(); @@ -136,10 +148,15 @@ final M master = new M(masterStats, masterBuffer, executorService); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set the Future on the BlockingBuffer. + masterBuffer.setFuture(ft); + // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + executorService.submit(ft); + final KVO<O>[] a = new KVO[] { new KVO<O>(new byte[]{1},new byte[]{2},null/*val*/), new KVO<O>(new byte[]{1},new byte[]{3},null/*val*/) @@ -149,7 +166,8 @@ masterBuffer.close(); - masterBuffer.getFuture().get(); + // Run with timeout (test fails if Future not done before timeout). + masterBuffer.getFuture().get(5L, TimeUnit.SECONDS); assertEquals("elementsIn", a.length, masterStats.elementsIn.get()); assertEquals("chunksIn", 1, masterStats.chunksIn.get()); @@ -181,9 +199,10 @@ * * @throws InterruptedException * @throws ExecutionException + * @throws TimeoutException */ public void test_startWriteStop2() throws InterruptedException, - ExecutionException { + ExecutionException, TimeoutException { doStartWriteStop2Test(); @@ -249,9 +268,10 @@ * assumption within them which is being violated. * * @throws InterruptedException + * @throws TimeoutException */ private void doStartWriteStop2Test() throws InterruptedException, - ExecutionException { + ExecutionException, TimeoutException { final BlockingBuffer<KVO<O>[]> masterBuffer = new BlockingBuffer<KVO<O>[]>( masterQueueCapacity); @@ -260,9 +280,14 @@ final M master = new M(masterStats, masterBuffer, executorService); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); final KVO<O>[] a = new KVO[] { new KVO<O>(new byte[]{1},new byte[]{2},null/*val*/), @@ -274,7 +299,8 @@ masterBuffer.close(); - masterBuffer.getFuture().get(); + // test fails if not done before timeout. + masterBuffer.getFuture().get(5L, TimeUnit.SECONDS); assertEquals("elementsIn", a.length, masterStats.elementsIn.get()); assertEquals("chunksIn", 1, masterStats.chunksIn.get()); @@ -334,10 +360,15 @@ final M master = new M(masterStats, masterBuffer, executorService); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + { final KVO<O>[] a = new KVO[] { new KVO<O>(new byte[] { 1 }, new byte[] { 2 }, null/* val */), Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskIdleTimeout.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskIdleTimeout.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskIdleTimeout.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -31,7 +31,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -41,8 +41,6 @@ import com.bigdata.btree.keys.KVO; import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.H; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.O; import com.bigdata.util.concurrent.DaemonThreadFactory; /** @@ -92,10 +90,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + /* * write a chunk on the buffer. this will cause an output buffer to be * created. @@ -245,10 +248,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + // Set Future on BlockingBuffer + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + /* * write a chunk on the buffer. this will cause a sink to be created. */ @@ -413,10 +421,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + // scheduled service used to write on the master. final ScheduledExecutorService scheduledExecutorService = Executors .newScheduledThreadPool(1, DaemonThreadFactory @@ -656,10 +669,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + // Start the consumer. + executorService.submit(ft); + // write a chunk on the master. { final KVO<O>[] a = new KVO[] { @@ -761,10 +779,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + // Start the consumer. + executorService.submit(ft); + // write a chunk on the master. { final KVO<O>[] a = new KVO[] { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithErrors.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithErrors.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithErrors.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -29,12 +29,10 @@ package com.bigdata.service.ndx.pipeline; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import com.bigdata.btree.keys.KVO; import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.H; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.O; import com.bigdata.util.InnerCause; /** @@ -100,10 +98,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + final KVO<O>[] a = new KVO[] { new KVO<O>(new byte[]{1},new byte[]{2},null/*val*/), new KVO<O>(new byte[]{13},new byte[]{3},null/*val*/) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithRedirect.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithRedirect.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithRedirect.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -36,6 +36,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,9 +46,6 @@ import com.bigdata.btree.keys.KVO; import com.bigdata.btree.keys.KeyBuilder; import com.bigdata.relation.accesspath.BlockingBuffer; -import com.bigdata.service.ndx.pipeline.AbstractKeyRangeMasterTestCase.L; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.H; -import com.bigdata.service.ndx.pipeline.AbstractMasterTestCase.O; /** * Test ability to handle a redirect (subtask learns that the target service no @@ -142,10 +140,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); - + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + final KVO<O>[] a = new KVO[] { new KVO<O>(new byte[]{1},new byte[]{2},null/*val*/), new KVO<O>(new byte[]{13},new byte[]{3},null/*val*/) @@ -321,10 +324,15 @@ }; - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + // write on L(1) and L(14). { final KVO<O>[] a = new KVO[] { @@ -730,10 +738,15 @@ redirecter.init(initialLocatorCount); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); + + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + // Start the consumer. + executorService.submit(ft); + // start writing data. final List<Future> producerFutures = new LinkedList<Future>(); for (int i = 0; i < nproducers; i++) { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithSplits.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithSplits.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/service/ndx/pipeline/TestMasterTaskWithSplits.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -40,13 +40,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.management.openmbean.OpenDataException; - import com.bigdata.btree.keys.KVO; import com.bigdata.btree.keys.KeyBuilder; import com.bigdata.btree.keys.TestKeyBuilder; @@ -844,21 +843,38 @@ */ final RedirectTask redirecter = new RedirectTask(master, schedule); - // start the consumer. - final Future<H> future = executorService.submit(master); - masterBuffer.setFuture(future); + // Start the master. + { + // Wrap computation as FutureTask. + final FutureTask<H> ft = new FutureTask<H>(master); - // start writing data. - final List<Future> producerFutures = new LinkedList<Future>(); + // Set Future on BlockingBuffer. + masterBuffer.setFuture(ft); + + // Start the consumer. + executorService.submit(ft); + } + + // Setup producers. + final List<FutureTask<Void>> producerFutures = new LinkedList<FutureTask<Void>>(); + for (int i = 0; i < nproducers; i++) { - producerFutures.add(executorService.submit(new ProducerTask( + // Wrap computation as FutureTask. + producerFutures.add(new FutureTask<Void>(new ProducerTask( masterBuffer))); } + // Start writing data. + for (FutureTask<Void> ft : producerFutures) { + + executorService.submit(ft); + + } + // start redirects. - final Future redirecterFuture = executorService.submit(redirecter); + final Future<Void> redirecterFuture = executorService.submit(redirecter); try { @@ -880,7 +896,7 @@ } // check producers. - for (Future f : producerFutures) { + for (Future<Void> f : producerFutures) { if (f.isDone()) { break; } @@ -905,7 +921,7 @@ redirecterFuture.get(); // await termination and check producer futures for errors. - for (Future f : producerFutures) { + for (Future<Void> f : producerFutures) { f.get(); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/MappedTaskMaster.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/MappedTaskMaster.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/service/jini/master/MappedTaskMaster.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -406,11 +407,21 @@ }; - final Future<? extends ResourceBufferStatistics> future = getFederation() - .getExecutorService().submit(task); + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + + // Wrap computation as FutureTask. + @SuppressWarnings({ "unchecked", "rawtypes" }) + final FutureTask<?> ft = new FutureTask(task); - resourceBuffer.setFuture(future); + // Set Future on BlockingBuffer. + resourceBuffer.setFuture(ft); + // Submit FutureTask for computation. + getFederation().getExecutorService().submit(ft); + /* * Attach to the counters reported by the client to the LBS. */ Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/DGExpander.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/DGExpander.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/DGExpander.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -16,7 +16,6 @@ import com.bigdata.bop.ap.Predicate; import com.bigdata.btree.BTree; import com.bigdata.btree.IIndex; -import com.bigdata.btree.ITupleIterator; import com.bigdata.counters.CAT; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.spo.ISPO; @@ -310,7 +309,7 @@ this.buffer = new BlockingBuffer<ISPO>(sourceAccessPath .getChunkCapacity()); - Future<Void> future = null; + FutureTask<Void> future = null; try { /* @@ -325,16 +324,21 @@ * will be passed along to the iterator) and to close the * buffer (the iterator will notice that the buffer has been * closed as well as that the cause was set on the buffer). + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> */ - // run the task. - future = sourceAccessPath.getIndexManager() - .getExecutorService().submit( - newRunIteratorsTask(buffer)); + // Wrap task as FutureTask. + future = new FutureTask<Void>(newRunIteratorsTask(buffer)); // set the future on the BlockingBuffer. buffer.setFuture(future); + // submit task for execution. + sourceAccessPath.getIndexManager().getExecutorService() + .submit(future); + /* * The outer access path will impose the "DISTINCT SPO" * constraint. Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java 2013-08-05 12:51:58 UTC (rev 7241) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/store/TripleStoreUtility.java 2013-08-05 15:01:56 UTC (rev 7242) @@ -32,6 +32,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import org.apache.log4j.Logger; import org.openrdf.model.Statement; @@ -42,8 +43,8 @@ import com.bigdata.rdf.axioms.NoAxioms; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.model.BigdataStatement; +import com.bigdata.rdf.rio.AbstractStatementBuffer.StatementBuffer2; import com.bigdata.rdf.rio.StatementBuffer; -import com.bigdata.rdf.rio.AbstractStatementBuffer.StatementBuffer2; import com.bigdata.rdf.rules.BackchainAccessPath; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; @@ -305,48 +306,61 @@ * Run task. The task consumes externalized statements from [expected] * and writes statements not found in [actual] onto the blocking buffer. */ - buffer.setFuture(actual.getExecutorService().submit( - new Callable<Void>() { + final Callable<Void> myTask = new Callable<Void>() { - public Void call() throws Exception { + public Void call() throws Exception { - try { + try { - while (itr2.hasNext()) { + while (itr2.hasNext()) { - // a statement from the source db. - final BigdataStatement stmt = itr2.next(); + // a statement from the source db. + final BigdataStatement stmt = itr2.next(); - // if (log.isInfoEnabled()) log.info("Source: " - // + stmt); + // if (log.isInfoEnabled()) log.info("Source: " + // + stmt); - // add to the buffer. - sb.add(stmt); + // add to the buffer. + sb.add(stmt); - } + } - } finally { + } finally { - itr2.close(); + itr2.close(); - } + } - /* - * Flush everything in the StatementBuffer so that it - * shows up in the BlockingBuffer's iterator(). - */ + /* + * Flush everything in the StatementBuffer so that it + * shows up in the BlockingBuffer's iterator(). + */ - final long nnotFound = sb.flush(); + final long nnotFound = sb.flush(); - if (log.isInfoEnabled()) - log.info("Flushed: #notFound=" + nnotFound); + if (log.isInfoEnabled()) + log.info("Flushed: #notFound=" + nnotFound); - return null; + return null; - } + } - })); + }; + /** + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707"> + * BlockingBuffer.close() does not unblock threads </a> + */ + + // Wrap computation as FutureTask. + final FutureTask<Void> ft = new FutureTask<Void>(myTask); + + // Set Future on BlockingBuffer. + buffer.setFuture(ft); + + // Submit computation for evaluation. + actual.getExecutorService().submit(ft); + /* * Return iterator reading "not found" statements from the blocking * buffer. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-08-05 12:52:06
|
Revision: 7241 http://bigdata.svn.sourceforge.net/bigdata/?rev=7241&view=rev Author: martyncutcher Date: 2013-08-05 12:51:58 +0000 (Mon, 05 Aug 2013) Log Message: ----------- Move QuorumTokenTransitions into quorum package amend HAJournalServer.enterRunState to check for token change if RunState has not changed. Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Added Paths: ----------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumTokenTransitions.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-08-02 12:39:24 UTC (rev 7240) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-08-05 12:51:58 UTC (rev 7241) @@ -152,6 +152,7 @@ import com.bigdata.quorum.QuorumActor; import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumMember; +import com.bigdata.quorum.QuorumTokenTransitions; import com.bigdata.rawstore.IAllocationContext; import com.bigdata.rawstore.IAllocationManagerStore; import com.bigdata.rawstore.IPSOutputStream; @@ -5337,188 +5338,11 @@ } - /** - * Wraps the token/join transitions in a testable manner. - * - * Both enables JUnit testing and also provides context to represent - * better abstractions on the quorum/service states. - */ - static public class QuorumTokenTransitions { - - final long currentQuorumToken; - final long newQuorumToken; - final long currentHaReady; - - final boolean didBreak; - final boolean didMeet; - final boolean didJoinMetQuorum; - final boolean didLeaveMetQuorum; - - final boolean wasMet; - final boolean isMet; - final boolean isJoined; - final boolean wasJoined; - - public QuorumTokenTransitions(final long currentQuorumToken, - final long newQuorumToken, final QuorumService<HAGlue> service, - final long haReady) { - - this(currentQuorumToken, newQuorumToken, service != null - && service.isJoinedMember(newQuorumToken), haReady); - - } - - public QuorumTokenTransitions(final long currentQuorumToken, - final long newQuorumToken, final boolean joined, - final long haReady) { - - this.currentHaReady = haReady; - this.currentQuorumToken = currentQuorumToken; - this.newQuorumToken = newQuorumToken; - - isJoined = joined; - wasJoined = haReady != Quorum.NO_QUORUM; - wasMet = currentQuorumToken != Quorum.NO_QUORUM; - isMet = newQuorumToken != Quorum.NO_QUORUM; - - // Both quorum token and haReadyToken agree with newValue. - final boolean noTokenChange = currentQuorumToken == newQuorumToken - && currentQuorumToken == currentHaReady; - - /* - * TODO: more understanding required as to the effect of this clause - */ - if (noTokenChange && isJoined) { - didBreak = false; - didMeet = false; - didJoinMetQuorum = didJoin(); - didLeaveMetQuorum = false; - } else if (isBreak()) { - didBreak = true; // quorum break. - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = wasJoined; // if service was joined with met quorum, then it just left the met quorum. - } else if (isMeet()) { - - /* - * Quorum meet. - * - * We must wait for the lock to update the token. - */ - - didBreak = false; - didMeet = true; // quorum meet. - didJoinMetQuorum = false; - didLeaveMetQuorum = false; - - } else if (didJoin()) { - - /* - * This service is joining a quorum that is already met. - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = true; // service joined with met quorum. - didLeaveMetQuorum = false; - - } else if (didLeaveMet()) { - - /* - * This service is leaving a quorum that is already met (but - * this is not a quorum break since the new token is not - * NO_QUORUM). - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = true; // service left met quorum. quorum - // still met. - - } else { - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = false; - - // throw new AssertionError("Bad state from: oldToken=" + currentQuorumToken + ", newToken=" + newQuorumToken + ", haReady=" + haReady + ", isJoined=" + isJoined); - } - - checkStates(); - } - - // TODO Document rationale for each assertion. - private void checkStates() { - if (wasJoined && wasMet && currentHaReady > currentQuorumToken) - throw new AssertionError("haReady greater than current token"); - - if (wasMet && isMet && newQuorumToken < currentQuorumToken) { - throw new AssertionError("next token less than current token"); - } - - if (wasMet && isMet && newQuorumToken != currentQuorumToken) { - throw new AssertionError("New quorum token without quorum break first"); - } - - if (didMeet && didJoinMetQuorum) { - /* - * It is not possible to both join with an existing quorum and - * to be one of the services that caused a quorum to meet. These - * conditions are exclusive. - */ - throw new AssertionError("didMeet && didJoinMetQuorum"); - } - - /** - * This is a bit odd, it is okay, but probably didLeaveMetQuorum will - * only be true iff isJoined - */ -// if (didBreak && didLeaveMetQuorum) { -// throw new AssertionError("didBreak && didLeaveMetQuorum"); -// } -// if (didLeaveMetQuorum && !isJoined) { // TODO Why not valid? -// throw new AssertionError("didLeaveMetQuorum && !isJoined"); -// } - } - - /* - * Methods for making decisions in the ctor. They are NOT for public - * use. There are simple public fields that report out the decisions - * make using these methods. - */ - -// public final boolean noChange() { -// return currentQuorumToken == newQuorumToken; -// } - - private final boolean isBreak() { - return wasMet && !isMet; - } - - private final boolean isMeet() { - return isMet & !wasMet; - } - - private final boolean didJoin() { - return isMet && isJoined && !wasJoined; - } - -// private final boolean didLeave() { -// return isBreak() && wasJoined; -// } - - private final boolean didLeaveMet() { - return isMet && wasJoined && !isJoined; - } - - public final String showState() { - return ""; // TODO showState(). - } - } protected void setQuorumToken(final long newValue) { + log.warn("current: " + quorumToken + ", new: " + newValue); + // Protect for potential NPE if (quorum == null) return; Added: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumTokenTransitions.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumTokenTransitions.java (rev 0) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumTokenTransitions.java 2013-08-05 12:51:58 UTC (rev 7241) @@ -0,0 +1,185 @@ +package com.bigdata.quorum; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; + +/** + * Wraps the token/join transitions in a testable manner. + * + * Both enables JUnit testing and also provides context to represent + * better abstractions on the quorum/service states. + */ +public class QuorumTokenTransitions { + + final long currentQuorumToken; + final long newQuorumToken; + final long currentHaReady; + + public final boolean didBreak; + public final boolean didMeet; + public final boolean didJoinMetQuorum; + public final boolean didLeaveMetQuorum; + + final boolean wasMet; + final boolean isMet; + final boolean isJoined; + final boolean wasJoined; + + public QuorumTokenTransitions(final long currentQuorumToken, + final long newQuorumToken, final QuorumService<HAGlue> service, + final long haReady) { + + this(currentQuorumToken, newQuorumToken, service != null + && service.isJoinedMember(newQuorumToken), haReady); + + } + + public QuorumTokenTransitions(final long currentQuorumToken, + final long newQuorumToken, final boolean joined, + final long haReady) { + + this.currentHaReady = haReady; + this.currentQuorumToken = currentQuorumToken; + this.newQuorumToken = newQuorumToken; + + isJoined = joined; + wasJoined = haReady != Quorum.NO_QUORUM; + wasMet = currentQuorumToken != Quorum.NO_QUORUM; + isMet = newQuorumToken != Quorum.NO_QUORUM; + + // Both quorum token and haReadyToken agree with newValue. + final boolean noTokenChange = currentQuorumToken == newQuorumToken + && currentQuorumToken == currentHaReady; + + /* + * TODO: more understanding required as to the effect of this clause + */ + if (noTokenChange && isJoined) { + didBreak = false; + didMeet = false; + didJoinMetQuorum = didJoin(); + didLeaveMetQuorum = false; + } else if (isBreak()) { + didBreak = true; // quorum break. + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = wasJoined; // if service was joined with met quorum, then it just left the met quorum. + } else if (isMeet()) { + + /* + * Quorum meet. + * + * We must wait for the lock to update the token. + */ + + didBreak = false; + didMeet = true; // quorum meet. + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + + } else if (didJoin()) { + + /* + * This service is joining a quorum that is already met. + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = true; // service joined with met quorum. + didLeaveMetQuorum = false; + + } else if (didLeaveMet()) { + + /* + * This service is leaving a quorum that is already met (but + * this is not a quorum break since the new token is not + * NO_QUORUM). + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = true; // service left met quorum. quorum + // still met. + + } else { + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + + // throw new AssertionError("Bad state from: oldToken=" + currentQuorumToken + ", newToken=" + newQuorumToken + ", haReady=" + haReady + ", isJoined=" + isJoined); + } + + checkStates(); + } + + // TODO Document rationale for each assertion. + private void checkStates() { + if (wasJoined && wasMet && currentHaReady > currentQuorumToken) + throw new AssertionError("haReady greater than current token"); + + if (wasMet && isMet && newQuorumToken < currentQuorumToken) { + throw new AssertionError("next token less than current token"); + } + + if (wasMet && isMet && newQuorumToken != currentQuorumToken) { + throw new AssertionError("New quorum token without quorum break first, current: " + currentQuorumToken + ", new: " + newQuorumToken); + } + + if (didMeet && didJoinMetQuorum) { + /* + * It is not possible to both join with an existing quorum and + * to be one of the services that caused a quorum to meet. These + * conditions are exclusive. + */ + throw new AssertionError("didMeet && didJoinMetQuorum"); + } + + /** + * This is a bit odd, it is okay, but probably didLeaveMetQuorum will + * only be true iff isJoined + */ +// if (didBreak && didLeaveMetQuorum) { +// throw new AssertionError("didBreak && didLeaveMetQuorum"); +// } +// if (didLeaveMetQuorum && !isJoined) { // TODO Why not valid? +// throw new AssertionError("didLeaveMetQuorum && !isJoined"); +// } + } + + /* + * Methods for making decisions in the ctor. They are NOT for public + * use. There are simple public fields that report out the decisions + * make using these methods. + */ + +// public final boolean noChange() { +// return currentQuorumToken == newQuorumToken; +// } + + private final boolean isBreak() { + return wasMet && !isMet; + } + + private final boolean isMeet() { + return isMet & !wasMet; + } + + private final boolean didJoin() { + return isMet && isJoined && !wasJoined; + } + +// private final boolean didLeave() { +// return isBreak() && wasJoined; +// } + + private final boolean didLeaveMet() { + return isMet && wasJoined && !isJoined; + } + + public final String showState() { + return ""; // TODO showState(). + } +} + Modified: branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java =================================================================== --- branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-08-02 12:39:24 UTC (rev 7240) +++ branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-08-05 12:51:58 UTC (rev 7241) @@ -34,8 +34,6 @@ import junit.framework.AssertionFailedError; -import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.AbstractJournal.QuorumTokenTransitions; import com.bigdata.quorum.MockQuorumFixture.MockQuorumMember; /** @@ -686,36 +684,36 @@ final long[] tst = tokens[i]; // token combinations with isJoined == true - final AbstractJournal.QuorumTokenTransitions qtjoined = new AbstractJournal.QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, true/*isJoined*/, tst[2]/*haReady*/); + final QuorumTokenTransitions qtjoined = new QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, true/*isJoined*/, tst[2]/*haReady*/); // token combinations with isJoined == false - final AbstractJournal.QuorumTokenTransitions qtnotjoined = new AbstractJournal.QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, false/*isJoined*/, tst[2]/*haReady*/); + final QuorumTokenTransitions qtnotjoined = new QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, false/*isJoined*/, tst[2]/*haReady*/); } // test invalid scenarios to confirm AssertionErrors are thrown try { - new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 1/*haReady*/); + new QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 1/*haReady*/); fail("Expected assertion error, cannot progress quorum token without break"); } catch (AssertionError ae) { // expected; } try { - new AbstractJournal.QuorumTokenTransitions(2/*current token*/, 1/*new token*/, true/*isJoined*/, 1/*haReady*/); + new QuorumTokenTransitions(2/*current token*/, 1/*new token*/, true/*isJoined*/, 1/*haReady*/); fail("Expected assertion error, new valid < current valid"); } catch (AssertionError ae) { // expected; } try { - new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 1/*new token*/, true/*isJoined*/, 2/*haReady*/); + new QuorumTokenTransitions(1/*current token*/, 1/*new token*/, true/*isJoined*/, 2/*haReady*/); fail("Expected assertion error, haReady > newToken"); } catch (AssertionError ae) { // expected; } try { - new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 2/*haReady*/); + new QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 2/*haReady*/); fail("Expected assertion error, haReady > currentToken"); } catch (AssertionError ae) { // expected; Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-08-02 12:39:24 UTC (rev 7240) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-08-05 12:51:58 UTC (rev 7241) @@ -1319,15 +1319,32 @@ throw new IllegalArgumentException(); synchronized (runStateRef) { - + + + /* + * This check appears to cause some transitions to be lost. + * + * TODO: It would seem that a more precise check is required. + */ if (runStateTask.runState .equals(lastSubmittedRunStateRef.get())) { + + /* + * FIXME: Checking if the token has changed fixes some test + * scenarios but breaks others. + */ + if (journal.getQuorumToken() == journal.getQuorum().token()) { + haLog.warn("Will not reenter active run state: " + + runStateTask.runState + + ", currentToken: " + journal.getQuorumToken() + + ", newToken: " + journal.getQuorum().token() + ); + + return null; + } else { + haLog.warn("Re-entering current state since token has changed: " + runStateTask.runState); + } - haLog.warn("Will not reenter active run state: " - + runStateTask.runState); - - return null; - } final FutureTask<Void> ft = new FutureTaskMon<Void>( This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-02 12:39:41
|
Revision: 7240 http://bigdata.svn.sourceforge.net/bigdata/?rev=7240&view=rev Author: thompsonbry Date: 2013-08-02 12:39:24 +0000 (Fri, 02 Aug 2013) Log Message: ----------- Backport of fix for [1] to the 1.2.x maintenance branch. Note: This fix was applied to the READ_CACHE branch in r7203. [1] https://sourceforge.net/apps/trac/bigdata/ticket/646 (River not compatible with newer 1.6.0 and 1.7.0 JVMs) Revision Links: -------------- http://bigdata.svn.sourceforge.net/bigdata/?rev=7203&view=rev Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/browser.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/checkconfigurationfile.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/checkser.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/classdep.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/classserver.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/computedigest.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/computehttpmdcodebase.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/destroy.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/envcheck.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/extra.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/fiddler.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/group.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jarwrapper.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jini-core.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jini-ext.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-debug-policy.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-lib.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-platform.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-resources.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/mahalo.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/mercury.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/norm.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/outrigger-snaplogstore.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/outrigger.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix-group.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix-init.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/preferredlistgen.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/reggie.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/serviceui.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/sharedvm.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/start.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/sun-util.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/tools.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/browser-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/fiddler-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/group-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/jsk-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/mahalo-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/mercury-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/norm-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/outrigger-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/phoenix-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/reggie-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/sdm-dl.jar branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-ext/jsk-policy.jar Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/browser.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/checkconfigurationfile.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/checkser.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/classdep.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/classserver.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/computedigest.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/computehttpmdcodebase.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/destroy.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/envcheck.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/extra.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/fiddler.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/group.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jarwrapper.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jini-core.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jini-ext.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-debug-policy.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-lib.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-platform.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/jsk-resources.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/mahalo.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/mercury.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/norm.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/outrigger-snaplogstore.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/outrigger.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix-group.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix-init.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/phoenix.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/preferredlistgen.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/reggie.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/serviceui.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/sharedvm.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/start.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/sun-util.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib/tools.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/browser-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/fiddler-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/group-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/jsk-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/mahalo-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/mercury-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/norm-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/outrigger-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/phoenix-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/reggie-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-dl/sdm-dl.jar =================================================================== (Binary files differ) Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/lib/jini/lib-ext/jsk-policy.jar =================================================================== (Binary files differ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-01 14:36:07
|
Revision: 7239 http://bigdata.svn.sourceforge.net/bigdata/?rev=7239&view=rev Author: thompsonbry Date: 2013-08-01 14:36:01 +0000 (Thu, 01 Aug 2013) Log Message: ----------- javadoc on the test case (a TODO) Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java Modified: branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java =================================================================== --- branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-08-01 14:27:20 UTC (rev 7238) +++ branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-08-01 14:36:01 UTC (rev 7239) @@ -35,6 +35,7 @@ import junit.framework.AssertionFailedError; import com.bigdata.journal.AbstractJournal; +import com.bigdata.journal.AbstractJournal.QuorumTokenTransitions; import com.bigdata.quorum.MockQuorumFixture.MockQuorumMember; /** @@ -653,6 +654,11 @@ * * Clearly didBreak/didMeet and didLeave/didJoin are exclusive (or should be), * but what about didMeet and didLeave or didBreak and didJoin? + * + * TODO This test only verifies the expected presence or absence of an + * assertion error thrown by the {@link QuorumTokenTransitions} class. + * It should do actual post-condition checks on the public fields of + * that class. */ public void testQuorumTransitions() { // combinations are possible current/new/haReady - each with a true/false isJoined This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-08-01 14:27:52
|
Revision: 7238 http://bigdata.svn.sourceforge.net/bigdata/?rev=7238&view=rev Author: thompsonbry Date: 2013-08-01 14:27:20 +0000 (Thu, 01 Aug 2013) Log Message: ----------- Some edit/cleanup with Martyn talking through setQuorumToken(). See [1] [1] https://sourceforge.net/apps/trac/bigdata/ticket/695 (HAJournalServer reports "follower" but is in SeekConsensus and is not participating in commits) Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-31 14:57:49 UTC (rev 7237) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-08-01 14:27:20 UTC (rev 7238) @@ -5344,10 +5344,11 @@ * better abstractions on the quorum/service states. */ static public class QuorumTokenTransitions { - final long currentQuorumToken; - final long newQuorumToken; - final long currentHaReady; - + + final long currentQuorumToken; + final long newQuorumToken; + final long currentHaReady; + final boolean didBreak; final boolean didMeet; final boolean didJoinMetQuorum; @@ -5358,11 +5359,18 @@ final boolean isJoined; final boolean wasJoined; - public QuorumTokenTransitions(final long currentQuorumToken, final long newQuorumToken, QuorumService<HAGlue> service, final long haReady) { - this(currentQuorumToken, newQuorumToken, service != null && service.isJoinedMember(newQuorumToken), haReady); + public QuorumTokenTransitions(final long currentQuorumToken, + final long newQuorumToken, final QuorumService<HAGlue> service, + final long haReady) { + + this(currentQuorumToken, newQuorumToken, service != null + && service.isJoinedMember(newQuorumToken), haReady); + } - public QuorumTokenTransitions(final long currentQuorumToken, final long newQuorumToken, final boolean joined, final long haReady) { + public QuorumTokenTransitions(final long currentQuorumToken, + final long newQuorumToken, final boolean joined, + final long haReady) { this.currentHaReady = haReady; this.currentQuorumToken = currentQuorumToken; @@ -5373,6 +5381,7 @@ wasMet = currentQuorumToken != Quorum.NO_QUORUM; isMet = newQuorumToken != Quorum.NO_QUORUM; + // Both quorum token and haReadyToken agree with newValue. final boolean noTokenChange = currentQuorumToken == newQuorumToken && currentQuorumToken == currentHaReady; @@ -5439,6 +5448,7 @@ checkStates(); } + // TODO Document rationale for each assertion. private void checkStates() { if (wasJoined && wasMet && currentHaReady > currentQuorumToken) throw new AssertionError("haReady greater than current token"); @@ -5447,14 +5457,19 @@ throw new AssertionError("next token less than current token"); } - if (didMeet && didJoinMetQuorum) { - throw new AssertionError("didMeet && didJoinMetQuorum"); - } - if (wasMet && isMet && newQuorumToken != currentQuorumToken) { throw new AssertionError("New quorum token without quorum break first"); } + if (didMeet && didJoinMetQuorum) { + /* + * It is not possible to both join with an existing quorum and + * to be one of the services that caused a quorum to meet. These + * conditions are exclusive. + */ + throw new AssertionError("didMeet && didJoinMetQuorum"); + } + /** * This is a bit odd, it is okay, but probably didLeaveMetQuorum will * only be true iff isJoined @@ -5462,221 +5477,241 @@ // if (didBreak && didLeaveMetQuorum) { // throw new AssertionError("didBreak && didLeaveMetQuorum"); // } -// if (didLeaveMetQuorum && !isJoined) { +// if (didLeaveMetQuorum && !isJoined) { // TODO Why not valid? // throw new AssertionError("didLeaveMetQuorum && !isJoined"); // } } - public final boolean noChange() { - return currentQuorumToken == newQuorumToken; - } + /* + * Methods for making decisions in the ctor. They are NOT for public + * use. There are simple public fields that report out the decisions + * make using these methods. + */ - public final boolean isBreak() { +// public final boolean noChange() { +// return currentQuorumToken == newQuorumToken; +// } + + private final boolean isBreak() { return wasMet && !isMet; } - public final boolean isMeet() { + private final boolean isMeet() { return isMet & !wasMet; } - public final boolean didJoin() { + private final boolean didJoin() { return isMet && isJoined && !wasJoined; } - public final boolean didLeave() { - return isBreak() && wasJoined; - } +// private final boolean didLeave() { +// return isBreak() && wasJoined; +// } - public final boolean didLeaveMet() { + private final boolean didLeaveMet() { return isMet && wasJoined && !isJoined; } public final String showState() { - return ""; - + return ""; // TODO showState(). } - } + } + protected void setQuorumToken(final long newValue) { - /* - * Protect for potential NPE - */ + // Protect for potential NPE if (quorum == null) - return; + return; // This quorum member. final QuorumService<HAGlue> localService = quorum.getClient(); - final QuorumTokenTransitions transitionState = new QuorumTokenTransitions(quorumToken, newValue, localService, haReadyToken); + // Figure out the state transitions involved. + final QuorumTokenTransitions transitionState = new QuorumTokenTransitions( + quorumToken, newValue, localService, haReadyToken); - /* - * The token is [volatile]. Save it's state on entry. Figure out if this - * is a quorum meet or a quorum break. - */ - { - /* - * TODO: remove this code once the refactoring with QuorumTokenTransitions is stable, right - * now it is used to sanity check the new code. - */ - final long oldValue = quorumToken; - final long oldReady = haReadyToken; - final HAStatusEnum oldStatus = haStatus; +// /* +// * The token is [volatile]. Save it's state on entry. Figure out if this +// * is a quorum meet or a quorum break. +// */ +// { +// /* +// * TODO: remove this code once the refactoring with QuorumTokenTransitions is stable, right +// * now it is used to sanity check the new code. +// */ +// final long oldValue = quorumToken; +// final long oldReady = haReadyToken; +// final HAStatusEnum oldStatus = haStatus; +// +// if (haLog.isInfoEnabled()) +// haLog.info("oldValue=" + oldValue + ", newToken=" + newValue +// + ", oldReady=" + oldReady); +// +// /* +// * Note that previously the noTokenChange condition was a short +// * circuit exit. +// * +// * This has been removed so we must account for this condition to +// * determine correct transition +// */ +// final boolean noTokenChange = oldValue == newValue +// && oldValue == oldReady; +// // if (oldValue == newValue && oldValue == oldReady) { +// // log.warn("NO TOKEN CHANGE"); +// // // No change. +// // return; +// // +// // } +// +// final boolean didBreak; +// final boolean didMeet; +// final boolean didJoinMetQuorum; +// final boolean didLeaveMetQuorum; +// final boolean isJoined = localService != null +// && localService.isJoinedMember(newValue); +// +// final boolean wasJoined = oldReady != Quorum.NO_QUORUM; +// +// /** +// * Adding set of initial conditions to account for noTokenChange +// * +// * TODO: should there be more than one initial condition? +// */ +// if (noTokenChange && isJoined) { +// didBreak = false; // quorum break. +// didMeet = false; +// didJoinMetQuorum = false; // true; +// didLeaveMetQuorum = false; // haReadyToken != Quorum.NO_QUORUM; +// // // if service was joined with met +// // quorum, then it just left the met +// // quorum. +// } else if (newValue == Quorum.NO_QUORUM +// && oldValue != Quorum.NO_QUORUM) { +// +// /* +// * Quorum break. +// * +// * Immediately invalidate the token. Do not wait for a lock. +// */ +// +// this.quorumToken = newValue; +// +// didBreak = true; // quorum break. +// didMeet = false; +// didJoinMetQuorum = false; +// didLeaveMetQuorum = wasJoined; // if service was joined with met +// // quorum, then it just left the +// // met quorum. +// +// } else if (newValue != Quorum.NO_QUORUM +// && oldValue == Quorum.NO_QUORUM) { +// +// /* +// * Quorum meet. +// * +// * We must wait for the lock to update the token. +// */ +// +// didBreak = false; +// didMeet = true; // quorum meet. +// didJoinMetQuorum = false; +// didLeaveMetQuorum = false; +// +// } else if (newValue != Quorum.NO_QUORUM // quorum exists +// && oldReady == Quorum.NO_QUORUM // service was not joined +// // with met quorum. +// && isJoined // service is now joined with met quorum. +// ) { +// +// /* +// * This service is joining a quorum that is already met. +// */ +// +// didBreak = false; +// didMeet = false; +// didJoinMetQuorum = true; // service joined with met quorum. +// didLeaveMetQuorum = false; +// +// } else if (newValue != Quorum.NO_QUORUM // quorum exists +// && wasJoined // service was joined with met quorum +// && !isJoined // service is no longer joined with met quorum. +// ) { +// +// /* +// * This service is leaving a quorum that is already met (but +// * this is not a quorum break since the new token is not +// * NO_QUORUM). +// */ +// +// didBreak = false; +// didMeet = false; +// didJoinMetQuorum = false; +// didLeaveMetQuorum = true; // service left met quorum. quorum +// // still met. +// +// } else { +// +// // /* +// // * No change in state. +// // */ +// // +// // log.warn("No change"// +// // + ": qorumToken(" + oldValue + " => " + newValue + ")"// +// // + ", haReadyToken(" + haReadyToken + ")"// +// // ); +// +// didBreak = false; +// didMeet = false; +// didJoinMetQuorum = false; +// didLeaveMetQuorum = false; +// +// return; +// +// } +// +// log.warn("didBreak: " + didBreak +// + ", didMeet: " + didMeet +// + ", didJoinMetQuorum: " + didJoinMetQuorum +// + ", didLeaveMetQuorum: " + didLeaveMetQuorum +// + ", isJoined: " + isJoined +// + ", wasJoined: " + wasJoined +// ); +// +// assert didBreak == transitionState.didBreak; +// assert didMeet == transitionState.didMeet; +// assert didJoinMetQuorum == transitionState.didJoinMetQuorum; +// assert didLeaveMetQuorum == transitionState.didLeaveMetQuorum; +// assert isJoined == transitionState.isJoined; +// assert wasJoined == transitionState.wasJoined; +// } - if (haLog.isInfoEnabled()) - haLog.info("oldValue=" + oldValue + ", newToken=" + newValue - + ", oldReady=" + oldReady); + if (transitionState.didBreak) { + /* + * If the quorum broke then set the token immediately without + * waiting for the lock. + * + * Note: This is a volatile write. We want the break to become + * visible as soon as possible in order to fail things which examine + * the token. + * + * TODO Why not clear the haReadyToken and haStatus here as well? + * However, those changes are not going to be noticed by threads + * awaiting a state change until we do signalAll() and that requires + * the lock. + * + * TODO Why not clear the haReadyToken and haStatus on a leave using + * a volatile write? Again, threads blocked in awaitHAReady() would + * not notice until we actually take the lock and do signalAll(). + */ + this.quorumToken = Quorum.NO_QUORUM; + } - /* - * Note that previously the noTokenChange condition was a short - * circuit exit. - * - * This has been removed so we must account for this condition to - * determine correct transition - */ - final boolean noTokenChange = oldValue == newValue - && oldValue == oldReady; - // if (oldValue == newValue && oldValue == oldReady) { - // log.warn("NO TOKEN CHANGE"); - // // No change. - // return; - // - // } - - final boolean didBreak; - final boolean didMeet; - final boolean didJoinMetQuorum; - final boolean didLeaveMetQuorum; - final boolean isJoined = localService != null - && localService.isJoinedMember(newValue); - - final boolean wasJoined = oldReady != Quorum.NO_QUORUM; - - /** - * Adding set of initial conditions to account for noTokenChange - * - * TODO: should there be more than one initial condition? - */ - if (noTokenChange && isJoined) { - didBreak = false; // quorum break. - didMeet = false; - didJoinMetQuorum = false; // true; - didLeaveMetQuorum = false; // haReadyToken != Quorum.NO_QUORUM; - // // if service was joined with met - // quorum, then it just left the met - // quorum. - } else if (newValue == Quorum.NO_QUORUM - && oldValue != Quorum.NO_QUORUM) { - - /* - * Quorum break. - * - * Immediately invalidate the token. Do not wait for a lock. - */ - - this.quorumToken = newValue; - - didBreak = true; // quorum break. - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = wasJoined; // if service was joined with met - // quorum, then it just left the - // met quorum. - - } else if (newValue != Quorum.NO_QUORUM - && oldValue == Quorum.NO_QUORUM) { - - /* - * Quorum meet. - * - * We must wait for the lock to update the token. - */ - - didBreak = false; - didMeet = true; // quorum meet. - didJoinMetQuorum = false; - didLeaveMetQuorum = false; - - } else if (newValue != Quorum.NO_QUORUM // quorum exists - && oldReady == Quorum.NO_QUORUM // service was not joined - // with met quorum. - && isJoined // service is now joined with met quorum. - ) { - - /* - * This service is joining a quorum that is already met. - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = true; // service joined with met quorum. - didLeaveMetQuorum = false; - - } else if (newValue != Quorum.NO_QUORUM // quorum exists - && wasJoined // service was joined with met quorum - && !isJoined // service is no longer joined with met quorum. - ) { - - /* - * This service is leaving a quorum that is already met (but - * this is not a quorum break since the new token is not - * NO_QUORUM). - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = true; // service left met quorum. quorum - // still met. - - } else { - - // /* - // * No change in state. - // */ - // - // log.warn("No change"// - // + ": qorumToken(" + oldValue + " => " + newValue + ")"// - // + ", haReadyToken(" + haReadyToken + ")"// - // ); - - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = false; - - return; - - } - - log.warn("didBreak: " + didBreak - + ", didMeet: " + didMeet - + ", didJoinMetQuorum: " + didJoinMetQuorum - + ", didLeaveMetQuorum: " + didLeaveMetQuorum - + ", isJoined: " + isJoined - + ", wasJoined: " + wasJoined - ); - - assert didBreak == transitionState.didBreak; - assert didMeet == transitionState.didMeet; - assert didJoinMetQuorum == transitionState.didJoinMetQuorum; - assert didLeaveMetQuorum == transitionState.didLeaveMetQuorum; - assert isJoined == transitionState.isJoined; - assert wasJoined == transitionState.wasJoined; - } - /* * Both a meet and a break require an exclusive write lock. */ final boolean isLeader; final boolean isFollower; final long localCommitCounter; - - /* - * If the quorum broke then set the token immediately without waiting for the lock. - * TODO: explain why this is necessary/what would go wrong if we waited for the lock? - */ - if (transitionState.didBreak) - this.quorumToken = newValue; /* * TODO: Is this lock synchronization a problem? With token update delayed on a lock could a second thread @@ -5709,15 +5744,13 @@ * It may well be that this is the source of the stochastic failures we see. */ - /* - * didLeaveMetQuorum is only ever set when didBreak is also true, - * so test separately - */ if (transitionState.didLeaveMetQuorum) { - /* - * Is it okay to set this token prior to the abort methods? - */ + /* + * The service was joined with a met quorum. + * + * TODO Is it okay to set this token prior to the abort methods? + */ quorumToken = newValue; // volatile write. localCommitCounter = -1; @@ -5757,14 +5790,20 @@ haReadyCondition.signalAll(); // signal ALL. } else if (transitionState.didBreak) { - /* - * As we were not joined at the break there is nothing to do save for - * updating the token - */ - quorumToken = newValue; // volatile write. + + /* + * Note: [didLeaveMetQuorum] was handled above. So, this else if + * only applies to a service that observes a quorum break but + * which was not joined with the met quorum. As we were not + * joined at the break there is nothing to do save for updating + * the token. + */ + quorumToken = Quorum.NO_QUORUM; // volatile write. + localCommitCounter = -1; isLeader = isFollower = false; + } else if (transitionState.didMeet || transitionState.didJoinMetQuorum) { /** Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-07-31 14:57:49 UTC (rev 7237) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-08-01 14:27:20 UTC (rev 7238) @@ -1710,16 +1710,6 @@ if (log.isInfoEnabled()) log.info("Current Token: " + journal.getHAReady() + ", new: " + getQuorum().token()); - /* - * We must ensure that the token is reset to generate the required - * additional events. - * - * Processing the events at this point, after serviceLeave, is not sufficient. - * - * TODO It is possible that we could avoid the clear/set pattern with more - * state analysis in setQuorumToken and this should be investigated. - */ - // journal.setQuorumToken(Quorum.NO_QUORUM); journal.setQuorumToken(getQuorum().token()); /** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-07-31 14:58:01
|
Revision: 7237 http://bigdata.svn.sourceforge.net/bigdata/?rev=7237&view=rev Author: martyncutcher Date: 2013-07-31 14:57:49 +0000 (Wed, 31 Jul 2013) Log Message: ----------- A utility AbstractJournal.QuorumTokenTransitions class provides an abstraction to AbstractJournal.setQuorumToken of the transitions implicit in the old/new quorum tokens and whether the service was and/or is joined. At present the legacy code in AbstractJournal.setQuorumToken is still active but the results are not used save to compare with the new class results. The new class also facilitates simple junit testing and a new test has been add to the TestSingletonQuorumSemantics suite. Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-31 12:46:18 UTC (rev 7236) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-31 14:57:49 UTC (rev 7237) @@ -5337,34 +5337,167 @@ } - protected void setQuorumToken(final long newValue) { - - /* - * The token is [volatile]. Save it's state on entry. Figure out if this - * is a quorum meet or a quorum break. - */ + /** + * Wraps the token/join transitions in a testable manner. + * + * Both enables JUnit testing and also provides context to represent + * better abstractions on the quorum/service states. + */ + static public class QuorumTokenTransitions { + final long currentQuorumToken; + final long newQuorumToken; + final long currentHaReady; + + final boolean didBreak; + final boolean didMeet; + final boolean didJoinMetQuorum; + final boolean didLeaveMetQuorum; - final long oldValue = quorumToken; - final long oldReady = haReadyToken; - final HAStatusEnum oldStatus = haStatus; + final boolean wasMet; + final boolean isMet; + final boolean isJoined; + final boolean wasJoined; + + public QuorumTokenTransitions(final long currentQuorumToken, final long newQuorumToken, QuorumService<HAGlue> service, final long haReady) { + this(currentQuorumToken, newQuorumToken, service != null && service.isJoinedMember(newQuorumToken), haReady); + } + + public QuorumTokenTransitions(final long currentQuorumToken, final long newQuorumToken, final boolean joined, final long haReady) { + + this.currentHaReady = haReady; + this.currentQuorumToken = currentQuorumToken; + this.newQuorumToken = newQuorumToken; + + isJoined = joined; + wasJoined = haReady != Quorum.NO_QUORUM; + wasMet = currentQuorumToken != Quorum.NO_QUORUM; + isMet = newQuorumToken != Quorum.NO_QUORUM; + + final boolean noTokenChange = currentQuorumToken == newQuorumToken + && currentQuorumToken == currentHaReady; - if (haLog.isInfoEnabled()) - haLog.info("oldValue=" + oldValue + ", newToken=" + newValue); + /* + * TODO: more understanding required as to the effect of this clause + */ + if (noTokenChange && isJoined) { + didBreak = false; + didMeet = false; + didJoinMetQuorum = didJoin(); + didLeaveMetQuorum = false; + } else if (isBreak()) { + didBreak = true; // quorum break. + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = wasJoined; // if service was joined with met quorum, then it just left the met quorum. + } else if (isMeet()) { - /* - * Note that previously the noTokenChange condition was a short circuit exit. - * - * This has been removed so we must account for this condition to determine - * correct transition - */ - final boolean noTokenChange = oldValue == newValue && oldValue == oldReady; -// if (oldValue == newValue && oldValue == oldReady) { -// log.warn("NO TOKEN CHANGE"); -// // No change. -// return; -// -// } + /* + * Quorum meet. + * + * We must wait for the lock to update the token. + */ + + didBreak = false; + didMeet = true; // quorum meet. + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + + } else if (didJoin()) { + + /* + * This service is joining a quorum that is already met. + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = true; // service joined with met quorum. + didLeaveMetQuorum = false; + + } else if (didLeaveMet()) { + + /* + * This service is leaving a quorum that is already met (but + * this is not a quorum break since the new token is not + * NO_QUORUM). + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = true; // service left met quorum. quorum + // still met. + + } else { + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + + // throw new AssertionError("Bad state from: oldToken=" + currentQuorumToken + ", newToken=" + newQuorumToken + ", haReady=" + haReady + ", isJoined=" + isJoined); + } + + checkStates(); + } + private void checkStates() { + if (wasJoined && wasMet && currentHaReady > currentQuorumToken) + throw new AssertionError("haReady greater than current token"); + + if (wasMet && isMet && newQuorumToken < currentQuorumToken) { + throw new AssertionError("next token less than current token"); + } + + if (didMeet && didJoinMetQuorum) { + throw new AssertionError("didMeet && didJoinMetQuorum"); + } + + if (wasMet && isMet && newQuorumToken != currentQuorumToken) { + throw new AssertionError("New quorum token without quorum break first"); + } + + /** + * This is a bit odd, it is okay, but probably didLeaveMetQuorum will + * only be true iff isJoined + */ +// if (didBreak && didLeaveMetQuorum) { +// throw new AssertionError("didBreak && didLeaveMetQuorum"); +// } +// if (didLeaveMetQuorum && !isJoined) { +// throw new AssertionError("didLeaveMetQuorum && !isJoined"); +// } + } + + public final boolean noChange() { + return currentQuorumToken == newQuorumToken; + } + + public final boolean isBreak() { + return wasMet && !isMet; + } + + public final boolean isMeet() { + return isMet & !wasMet; + } + + public final boolean didJoin() { + return isMet && isJoined && !wasJoined; + } + + public final boolean didLeave() { + return isBreak() && wasJoined; + } + + public final boolean didLeaveMet() { + return isMet && wasJoined && !isJoined; + } + + public final String showState() { + return ""; + + } + } + protected void setQuorumToken(final long newValue) { + /* * Protect for potential NPE */ @@ -5374,114 +5507,222 @@ // This quorum member. final QuorumService<HAGlue> localService = quorum.getClient(); - final boolean didBreak; - final boolean didMeet; - final boolean didJoinMetQuorum; - final boolean didLeaveMetQuorum; - final boolean isJoined = localService != null && localService.isJoinedMember(newValue); + final QuorumTokenTransitions transitionState = new QuorumTokenTransitions(quorumToken, newValue, localService, haReadyToken); - /** - * Adding set of initial conditions to account for noTokenChange - * - * TODO: should there be more than one initial condition? + /* + * The token is [volatile]. Save it's state on entry. Figure out if this + * is a quorum meet or a quorum break. */ - if (noTokenChange && isJoined) { - didBreak = false; // quorum break. - didMeet = false; - didJoinMetQuorum = true; - didLeaveMetQuorum = haReadyToken != Quorum.NO_QUORUM; // if service was joined with met quorum, then it just left the met quorum. - } else if (newValue == Quorum.NO_QUORUM && oldValue != Quorum.NO_QUORUM) { + { + /* + * TODO: remove this code once the refactoring with QuorumTokenTransitions is stable, right + * now it is used to sanity check the new code. + */ + final long oldValue = quorumToken; + final long oldReady = haReadyToken; + final HAStatusEnum oldStatus = haStatus; - /* - * Quorum break. - * - * Immediately invalidate the token. Do not wait for a lock. - */ - - this.quorumToken = newValue; + if (haLog.isInfoEnabled()) + haLog.info("oldValue=" + oldValue + ", newToken=" + newValue + + ", oldReady=" + oldReady); - didBreak = true; // quorum break. - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = haReadyToken != Quorum.NO_QUORUM; // if service was joined with met quorum, then it just left the met quorum. + /* + * Note that previously the noTokenChange condition was a short + * circuit exit. + * + * This has been removed so we must account for this condition to + * determine correct transition + */ + final boolean noTokenChange = oldValue == newValue + && oldValue == oldReady; + // if (oldValue == newValue && oldValue == oldReady) { + // log.warn("NO TOKEN CHANGE"); + // // No change. + // return; + // + // } - } else if (newValue != Quorum.NO_QUORUM && oldValue == Quorum.NO_QUORUM) { + final boolean didBreak; + final boolean didMeet; + final boolean didJoinMetQuorum; + final boolean didLeaveMetQuorum; + final boolean isJoined = localService != null + && localService.isJoinedMember(newValue); + + final boolean wasJoined = oldReady != Quorum.NO_QUORUM; - /* - * Quorum meet. - * - * We must wait for the lock to update the token. - */ - - didBreak = false; - didMeet = true; // quorum meet. - didJoinMetQuorum = false; - didLeaveMetQuorum = false; + /** + * Adding set of initial conditions to account for noTokenChange + * + * TODO: should there be more than one initial condition? + */ + if (noTokenChange && isJoined) { + didBreak = false; // quorum break. + didMeet = false; + didJoinMetQuorum = false; // true; + didLeaveMetQuorum = false; // haReadyToken != Quorum.NO_QUORUM; + // // if service was joined with met + // quorum, then it just left the met + // quorum. + } else if (newValue == Quorum.NO_QUORUM + && oldValue != Quorum.NO_QUORUM) { - } else if (newValue != Quorum.NO_QUORUM // quorum exists - && oldReady == Quorum.NO_QUORUM // service was not joined with met quorum. - && isJoined // service is now joined with met quorum. - ) { + /* + * Quorum break. + * + * Immediately invalidate the token. Do not wait for a lock. + */ - /* - * This service is joining a quorum that is already met. - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = true; // service joined with met quorum. - didLeaveMetQuorum = false; - - } else if (newValue != Quorum.NO_QUORUM // quorum exists - && oldReady != Quorum.NO_QUORUM // service was joined with met quorum - && !isJoined // service is no longer joined with met quorum. - ) { + this.quorumToken = newValue; - /* - * This service is leaving a quorum that is already met (but this is - * not a quorum break since the new token is not NO_QUORUM). - */ - - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = true; // service left met quorum. quorum still met. + didBreak = true; // quorum break. + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = wasJoined; // if service was joined with met + // quorum, then it just left the + // met quorum. - } else { + } else if (newValue != Quorum.NO_QUORUM + && oldValue == Quorum.NO_QUORUM) { -// /* -// * No change in state. -// */ -// -// log.warn("No change"// -// + ": qorumToken(" + oldValue + " => " + newValue + ")"// -// + ", haReadyToken(" + haReadyToken + ")"// -// ); - - didBreak = false; - didMeet = false; - didJoinMetQuorum = false; - didLeaveMetQuorum = false; + /* + * Quorum meet. + * + * We must wait for the lock to update the token. + */ - return; - - } + didBreak = false; + didMeet = true; // quorum meet. + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + } else if (newValue != Quorum.NO_QUORUM // quorum exists + && oldReady == Quorum.NO_QUORUM // service was not joined + // with met quorum. + && isJoined // service is now joined with met quorum. + ) { + + /* + * This service is joining a quorum that is already met. + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = true; // service joined with met quorum. + didLeaveMetQuorum = false; + + } else if (newValue != Quorum.NO_QUORUM // quorum exists + && wasJoined // service was joined with met quorum + && !isJoined // service is no longer joined with met quorum. + ) { + + /* + * This service is leaving a quorum that is already met (but + * this is not a quorum break since the new token is not + * NO_QUORUM). + */ + + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = true; // service left met quorum. quorum + // still met. + + } else { + + // /* + // * No change in state. + // */ + // + // log.warn("No change"// + // + ": qorumToken(" + oldValue + " => " + newValue + ")"// + // + ", haReadyToken(" + haReadyToken + ")"// + // ); + + didBreak = false; + didMeet = false; + didJoinMetQuorum = false; + didLeaveMetQuorum = false; + + return; + + } + + log.warn("didBreak: " + didBreak + + ", didMeet: " + didMeet + + ", didJoinMetQuorum: " + didJoinMetQuorum + + ", didLeaveMetQuorum: " + didLeaveMetQuorum + + ", isJoined: " + isJoined + + ", wasJoined: " + wasJoined + ); + + assert didBreak == transitionState.didBreak; + assert didMeet == transitionState.didMeet; + assert didJoinMetQuorum == transitionState.didJoinMetQuorum; + assert didLeaveMetQuorum == transitionState.didLeaveMetQuorum; + assert isJoined == transitionState.isJoined; + assert wasJoined == transitionState.wasJoined; + } + /* * Both a meet and a break require an exclusive write lock. */ final boolean isLeader; final boolean isFollower; final long localCommitCounter; + + /* + * If the quorum broke then set the token immediately without waiting for the lock. + * TODO: explain why this is necessary/what would go wrong if we waited for the lock? + */ + if (transitionState.didBreak) + this.quorumToken = newValue; + /* + * TODO: Is this lock synchronization a problem? With token update delayed on a lock could a second thread + * process a new token based on incorrect state since the first thread has not updated the token? + * For example: NO_TOKEN -> valid token -> NO_TOKEN + */ final WriteLock lock = _fieldReadWriteLock.writeLock(); lock.lock(); try { + + /* + * The following condition tests are slightly confusing, it is not clear that they + * represent all real states. + * + * Essentially + * didBreak - abort + * didLeaveMetQuorum - abort + * didJoinMetQuorum - follower gets rootBlocks + * didMeet - just sets token + * + * Are there other valid states? + * + * If a void call is made - no token change, no leave or join - then this should + * result in an assertion error. + * + * FIXME: It is possible that these are being thrown and the quorum is then re-forming + * but we need to be able to trap these errors (if they occur) to understand the circumstances. + * It may well be that this is the source of the stochastic failures we see. + */ - if (didBreak || didLeaveMetQuorum) { + /* + * didLeaveMetQuorum is only ever set when didBreak is also true, + * so test separately + */ + if (transitionState.didLeaveMetQuorum) { + /* + * Is it okay to set this token prior to the abort methods? + */ + quorumToken = newValue; // volatile write. + + localCommitCounter = -1; + isLeader = isFollower = false; + /* * We also need to discard any active read/write tx since there * is no longer a quorum and a read/write tx was running on the @@ -5509,18 +5750,28 @@ * we have to wait until we observe that to cast a new vote. */ - localCommitCounter = -1; - isLeader = isFollower = false; - - quorumToken = newValue; // volatile write. - haReadyToken = Quorum.NO_QUORUM; // volatile write. haStatus = HAStatusEnum.NotReady; // volatile write. haReadyCondition.signalAll(); // signal ALL. - } else if (didMeet || didJoinMetQuorum) { + } else if (transitionState.didBreak) { + /* + * As we were not joined at the break there is nothing to do save for + * updating the token + */ + quorumToken = newValue; // volatile write. + + localCommitCounter = -1; + isLeader = isFollower = false; + } else if (transitionState.didMeet || transitionState.didJoinMetQuorum) { + + /** + * TODO: This state is a bit confused, is it possible that both conditions + * are true? Could we already be joined at the time we learn that the quorum is + * met? If not then it might make more sense to test separately + */ final long tmp; @@ -5641,9 +5892,9 @@ if (log.isInfoEnabled()) log.info("Calling localAbort if NOT didJoinMetQuorum: " - + didJoinMetQuorum); + + transitionState.didJoinMetQuorum); - if (!didJoinMetQuorum) { + if (!transitionState.didJoinMetQuorum) { doLocalAbort(); @@ -5655,7 +5906,7 @@ } else { - throw new AssertionError(); + throw new AssertionError("VOID setToken"); } @@ -5666,16 +5917,7 @@ } if (haLog.isInfoEnabled()) - haLog.info("qorumToken(" + oldValue + " => " + newValue - + "), haReadyToken(" + oldReady + " => " + haReadyToken // - + "), haStatus(" + oldStatus + " => " + haStatus // - + "), isLeader=" + isLeader// - + ", isFollower=" + isFollower// - + ", didMeet=" + didMeet// - + ", didBreak=" + didBreak// - + ", didJoin=" + didJoinMetQuorum// - + ", didLeave=" + didLeaveMetQuorum// - ); + haLog.info(transitionState.showState()); if (isLeader || isFollower) { @@ -5705,27 +5947,27 @@ * * @return the quorum token for which the service became HA ready. */ - final public long awaitHAReady() throws InterruptedException, - AsynchronousQuorumCloseException, QuorumException { - final WriteLock lock = _fieldReadWriteLock.writeLock(); - lock.lock(); - try { - long t = Quorum.NO_QUORUM; - while (((t = haReadyToken) != Quorum.NO_QUORUM) - && getQuorum().getClient() != null) { - haReadyCondition.await(); - } - final QuorumService<?> client = getQuorum().getClient(); - if (client == null) - throw new AsynchronousQuorumCloseException(); - if (!client.isJoinedMember(t)) { - throw new QuorumException(); - } - return t; - } finally { - lock.unlock(); - } - } +// final public long awaitHAReady() throws InterruptedException, +// AsynchronousQuorumCloseException, QuorumException { +// final WriteLock lock = _fieldReadWriteLock.writeLock(); +// lock.lock(); +// try { +// long t = Quorum.NO_QUORUM; +// while (((t = haReadyToken) == Quorum.NO_QUORUM) +// && getQuorum().getClient() != null) { +// haReadyCondition.await(); +// } +// final QuorumService<?> client = getQuorum().getClient(); +// if (client == null) +// throw new AsynchronousQuorumCloseException(); +// if (!client.isJoinedMember(t)) { +// throw new QuorumException(); +// } +// return t; +// } finally { +// lock.unlock(); +// } +// } /** * Await the service being ready to partitipate in an HA quorum. The Modified: branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java =================================================================== --- branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-07-31 12:46:18 UTC (rev 7236) +++ branches/READ_CACHE2/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-07-31 14:57:49 UTC (rev 7237) @@ -34,6 +34,7 @@ import junit.framework.AssertionFailedError; +import com.bigdata.journal.AbstractJournal; import com.bigdata.quorum.MockQuorumFixture.MockQuorumMember; /** @@ -634,4 +635,85 @@ } + /** + * Check all valid quorum transitions with: + * currentToken, + * nextToken, + * isJoined state + * haReady token. + * + * Compare tokens to check Quorum state. + * Compare tokens with haReady and isJoined to check service state. + * + * Possible outcomes are: + * quorum didBreak + * quorum didMeet + * service didLeave + * service didJoin + * + * Clearly didBreak/didMeet and didLeave/didJoin are exclusive (or should be), + * but what about didMeet and didLeave or didBreak and didJoin? + */ + public void testQuorumTransitions() { + // combinations are possible current/new/haReady - each with a true/false isJoined + final long tokens[][] = new long[][] { + // current, new, haReady + new long[] {-1, -1, -1}, // wasn't met, not met, wasn't joined + new long[] {-1, 0, -1}, // met + new long[] {0, 0, -1}, // remains met + new long[] {0, -1, -1}, // break + + new long[] {-1, -1, 0}, // notmet, wasJoined + new long[] {-1, 0, 0}, // met, wasJoined + new long[] {0, 0, 0}, // remains met, wasJoined + new long[] {0, -1, 0}, // break, wasJoined + + // Are these scenarios plausible only as a result of locking problem + // or possibly Zookeeper bounce? +// new long[] {0, 2, -1}, // token bumped more than one +// new long[] {0, 2, 0}, // token bumped more than one +// new long[] {0, 2, 1}, // token bumped more than one +// new long[] {0, 2, 2}, // token bumped more than one + }; + + for (int i = 0; i < tokens.length; i++) { + final long[] tst = tokens[i]; + + // token combinations with isJoined == true + final AbstractJournal.QuorumTokenTransitions qtjoined = new AbstractJournal.QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, true/*isJoined*/, tst[2]/*haReady*/); + + // token combinations with isJoined == false + final AbstractJournal.QuorumTokenTransitions qtnotjoined = new AbstractJournal.QuorumTokenTransitions(tst[0]/*current token*/, tst[1]/*new token*/, false/*isJoined*/, tst[2]/*haReady*/); + } + + // test invalid scenarios to confirm AssertionErrors are thrown + try { + new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 1/*haReady*/); + fail("Expected assertion error, cannot progress quorum token without break"); + } catch (AssertionError ae) { + // expected; + } + + try { + new AbstractJournal.QuorumTokenTransitions(2/*current token*/, 1/*new token*/, true/*isJoined*/, 1/*haReady*/); + fail("Expected assertion error, new valid < current valid"); + } catch (AssertionError ae) { + // expected; + } + + try { + new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 1/*new token*/, true/*isJoined*/, 2/*haReady*/); + fail("Expected assertion error, haReady > newToken"); + } catch (AssertionError ae) { + // expected; + } + + try { + new AbstractJournal.QuorumTokenTransitions(1/*current token*/, 2/*new token*/, true/*isJoined*/, 2/*haReady*/); + fail("Expected assertion error, haReady > currentToken"); + } catch (AssertionError ae) { + // expected; + } + } + } Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-07-31 12:46:18 UTC (rev 7236) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2013-07-31 14:57:49 UTC (rev 7237) @@ -587,12 +587,12 @@ doStartAB_C_MultiTransactionResync(50, 5); } - public void _testStressStartAB_C_MultiTransactionResync_500_0() + public void _testStressStartAB_C_MultiTransactionResync_5tx_then_50ms_delay() throws Exception { for (int i = 0; i < 40; i++) { try { - doStartAB_C_MultiTransactionResync(500, 0); + doStartAB_C_MultiTransactionResync(50, 5); } catch (Throwable t) { fail("Fail after " + (i + 1) + " trials : " + t, t); } finally { Modified: branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-07-31 12:46:18 UTC (rev 7236) +++ branches/READ_CACHE2/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-07-31 14:57:49 UTC (rev 7237) @@ -488,7 +488,7 @@ * fails the commit if there is a single failure, even though the quourm * might have a consensus around the new commit point.) * - * TODO Consider leader failure scenariors in this test suite, not just + * TODO Consider leader failure scenarios in this test suite, not just * scenarios where B fails. Probably we should also cover failures of C (the * 2nd follower). We should also cover scenariors where the quorum is barely * met and a single failure causes a rejected commit (local decision) or @@ -515,24 +515,6 @@ // Simple transaction. simpleTransaction(); -// try { -// // Simple transaction. -// simpleTransaction(); -// fail("Expecting failed transaction"); -// } catch(HttpException ex) { -// if (!ex.getMessage().contains( -// SpuriousTestException.class.getName())) { -// /* -// * Wrong inner cause. -// * -// * Note: The stack trace of the local exception does not include -// * the remote stack trace. The cause is formatted into the HTTP -// * response body. -// */ -// fail("Expecting " + ClocksNotSynchronizedException.class, t); -// } -// } - // Verify quorum is unchanged. assertEquals(token, quorum.token()); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-31 12:46:25
|
Revision: 7236 http://bigdata.svn.sourceforge.net/bigdata/?rev=7236&view=rev Author: thompsonbry Date: 2013-07-31 12:46:18 +0000 (Wed, 31 Jul 2013) Log Message: ----------- The failure to wake the producer when you call buffer.iterator().close() is because you are not calling buffer.setFuture() with the Future of the producer in your test. I have committed a test which does this and shows that the producer is correctly awoken (the test observes a CancellationException for the Future). Per the ticket, please verify under a debugger that the iterator() and/or buffer were closed for your stack traces. I suspect that they were not closed and that the traces exist because the query was not correctly terminated. It is also possible that setFuture() was not called on the buffer or perhaps that a race condition existing for setting the Future on the buffer. However, I need more information to diagnose this furthre. See https://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-07-31 11:56:25 UTC (rev 7235) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-07-31 12:46:18 UTC (rev 7236) @@ -28,6 +28,7 @@ package com.bigdata.relation.accesspath; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -267,7 +268,111 @@ } + /** + * Test that a thread blocked on add() will be unblocked by + * {@link BlockingBuffer#close()}. For this test, there is no consumer and + * there is a single producer. We just fill up the buffer and when it is + * full, we verify that the producer is blocked. We then use the object + * returned by {@link BlockingBuffer#iterator()} to + * {@link IAsynchronousIterator#close()} to close the buffer and verify that + * the producer was woken up. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707" > + * BlockingBuffer.close() does not unblock threads </a> + */ + public void test_blockingBuffer_close_noConsumer_closedThroughIterator() throws Exception { + + // object added to the queue by the producer threads. + final Object e0 = new Object(); + + // the buffer + final BlockingBuffer<Object> buffer = new BlockingBuffer<Object>(2/* capacity */); + + // Set to the first cause for the producer. + final AtomicReference<Throwable> producerCause = new AtomicReference<Throwable>(); + + // the producer - just drops an object onto the buffer. + final class Producer implements Runnable { + @Override + public void run() { + try { + buffer.add(e0); + } catch(Throwable t) { + // set the first cause. + if (producerCause.compareAndSet(null/* expect */, t)) { + log.warn("First producer cause: " + t); + } + // ensure that producer does not re-run by throwing out + // cause. + throw new RuntimeException(t); + } + } + } + + final Producer p = new Producer(); + + p.run(); // should not block. + p.run(); // should not block. + + final ExecutorService service = Executors + .newSingleThreadExecutor(DaemonThreadFactory + .defaultThreadFactory()); + Future<?> f = null; + try { + + f = service.submit(new Producer()); + + /* + * Set the Future on the BlockingBuffer. This is how it will notice + * when the iterator is closed. + */ + buffer.setFuture(f); + + Thread.sleep(200/*ms*/); + + // The producer should be blocked. + assertFalse(f.isDone()); + + // Closed the buffer using the iterator. + buffer.iterator().close(); + + // Verify producer was woken up. + try { + + f.get(1/* timeout */, TimeUnit.SECONDS); + + } catch(CancellationException ex) { + + if(log.isInfoEnabled()) + log.info("Ignoring expected exception: "+ex); + + } +// } catch (ExecutionException ex) { +// +// if (!InnerCause.isInnerCause(ex, BufferClosedException.class)) { +// +// /* +// * Should have thrown a BufferClosedException. +// */ +// +// fail("Unexpected cause: " + ex, ex); +// +// } +// +// } + + } finally { + + service.shutdownNow(); + + service.awaitTermination(1/* timeout */, TimeUnit.SECONDS); + + } + + } + + /** * Test that threads blocked on add() will be unblocked by * {@link BlockingBuffer#close()}. A pool of producer threads run. A * (smaller) pool of (slower) consumer thread(s) also runs. The This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-31 11:56:32
|
Revision: 7235 http://bigdata.svn.sourceforge.net/bigdata/?rev=7235&view=rev Author: thompsonbry Date: 2013-07-31 11:56:25 +0000 (Wed, 31 Jul 2013) Log Message: ----------- Added two unit tests and an optional stress test to verify that BlockingBuffer.close() correctly wakes up a producer that is blocked on add(). @see https://sourceforge.net/apps/trac/bigdata/ticket/707 (BlockingBuffer.close() does not unblock threads) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBufferWithChunks.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-07-31 11:37:41 UTC (rev 7234) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBuffer.java 2013-07-31 11:56:25 UTC (rev 7235) @@ -32,11 +32,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase2; +import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; /** @@ -126,7 +130,7 @@ assertEquals("elementCount", 2L, buffer.getElementsAddedCount()); // future of task writing a 3rd element on the buffer. - final Future producerFuture = service.submit(new Runnable() { + final Future<?> producerFuture = service.submit(new Runnable() { public void run() { /* @@ -144,7 +148,7 @@ }); // future of task draining the buffer. - final Future consumerFuture = service.submit(new Runnable() { + final Future<?> consumerFuture = service.submit(new Runnable() { public void run() { assertEquals(e0, itr.next()); @@ -173,4 +177,309 @@ } + /** + * Test that a thread blocked on add() will be unblocked by + * {@link BlockingBuffer#close()}. For this test, there is no consumer and + * there is a single producer. We just fill up the buffer and when it is + * full, we verify that the producer is blocked. We then use + * {@link BlockingBuffer#close()} to close the buffer and verify that the + * producer was woken up. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707" > + * BlockingBuffer.close() does not unblock threads </a> + */ + public void test_blockingBuffer_close_noConsumer() throws Exception { + + // object added to the queue by the producer threads. + final Object e0 = new Object(); + + // the buffer + final BlockingBuffer<Object> buffer = new BlockingBuffer<Object>(2/* capacity */); + + // Set to the first cause for the producer. + final AtomicReference<Throwable> producerCause = new AtomicReference<Throwable>(); + + // the producer - just drops an object onto the buffer. + final class Producer implements Runnable { + @Override + public void run() { + try { + buffer.add(e0); + } catch(Throwable t) { + // set the first cause. + if (producerCause.compareAndSet(null/* expect */, t)) { + log.warn("First producer cause: " + t); + } + // ensure that producer does not re-run by throwing out + // cause. + throw new RuntimeException(t); + } + } + } + + final Producer p = new Producer(); + + p.run(); // should not block. + p.run(); // should not block. + + final ExecutorService service = Executors + .newSingleThreadExecutor(DaemonThreadFactory + .defaultThreadFactory()); + Future<?> f = null; + try { + + f = service.submit(new Producer()); + + Thread.sleep(200/*ms*/); + + // The producer should be blocked. + assertFalse(f.isDone()); + + // Closed the buffer. + buffer.close(); + + // Verify producer was woken up. + try { + + f.get(1/* timeout */, TimeUnit.SECONDS); + + } catch (ExecutionException ex) { + + if (!InnerCause.isInnerCause(ex, BufferClosedException.class)) { + + /* + * Should have thrown a BufferClosedException. + */ + + fail("Unexpected cause: " + ex, ex); + + } + + } + + } finally { + + service.shutdownNow(); + + service.awaitTermination(1/* timeout */, TimeUnit.SECONDS); + + } + + } + + /** + * Test that threads blocked on add() will be unblocked by + * {@link BlockingBuffer#close()}. A pool of producer threads run. A + * (smaller) pool of (slower) consumer thread(s) also runs. The + * {@link BlockingBuffer} is closed once the producer threads begin to + * block. We then verify that all producer threads are appropriately + * notified of the asynchronous cancellation of their request. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/707" > + * BlockingBuffer.close() does not unblock threads </a> + */ + public void test_blockingBuffer_close() throws InterruptedException, + ExecutionException, TimeoutException { + + // capacity of the target buffer. + final int BUFFER_CAPACITY = 1000; + + // max #of concurrent consumer/producer threads. + final int POOL_SIZE = 10; + + // producer rate. + final long PRODUCER_RATE_MILLIS = 1; + + // consumer rate (must be slower than the producer!) + final long CONSUMER_RATE_MILLIS = 2; + + assertTrue(CONSUMER_RATE_MILLIS > PRODUCER_RATE_MILLIS); + + // object added to the queue by the producer threads. + final Object e0 = new Object(); + + // the buffer + final BlockingBuffer<Object> buffer = new BlockingBuffer<Object>(BUFFER_CAPACITY); + + // the iterator draining that buffer. + final IAsynchronousIterator<Object> itr = buffer.iterator(); + + // Set to the first cause for the producer. + final AtomicReference<Throwable> producerCause = new AtomicReference<Throwable>(); + + // Set to the first cause for the consumer. + final AtomicReference<Throwable> consumerCause = new AtomicReference<Throwable>(); + + // the producer - just drops an object onto the buffer. + final class Producer implements Runnable { + @Override + public void run() { + try { + buffer.add(e0); + } catch(Throwable t) { + // set the first cause. + if (producerCause.compareAndSet(null/* expect */, t)) { + log.warn("First producer cause: " + t); + } + // ensure that producer does not re-run by throwing out + // cause. + throw new RuntimeException(t); + } + } + } + + // the consumer - drains an object from the buffer iff available. + final class Consumer implements Runnable { + @Override + public void run() { + try { + if (itr.hasNext()) { + itr.next(); + } + } catch (Throwable t) { + // set the first cause. + if (consumerCause.compareAndSet(null/* expect */, t)) { + log.warn("First consumer cause: " + t); + } + // ensure that consumer does not re-run by throwing out + // cause. + throw new RuntimeException(t); + } + } + } + + /* + * Setup scheduled thread pool. One thread will be the producer. The + * other is the consumer. We will schedule producer tasks at a higher + * rate than consumer tasks. Eventually the blocking buffer will block + * in the producer. Once we notice this condition, we will close() the + * blocking buffer and verify that the producer thread was appropriately + * notified. + */ + final ScheduledExecutorService service = Executors + .newScheduledThreadPool(POOL_SIZE, + DaemonThreadFactory.defaultThreadFactory()); + + final ScheduledFuture<?> producerFuture = service.scheduleAtFixedRate( + new Producer(), 0/* initialDelay */, PRODUCER_RATE_MILLIS, + TimeUnit.MILLISECONDS); + + final ScheduledFuture<?> consumerFuture = service.scheduleAtFixedRate( + new Consumer(), 0/* initialDelay */, CONSUMER_RATE_MILLIS, + TimeUnit.MILLISECONDS); + + try { + + // Wait for the buffer to become full. + while (true) { +// FIXME Variant where the consumer is terminated before we test the buffer for being full (or simply never run) - this test might even exist already. + if (buffer.size() == BUFFER_CAPACITY) { + + // log out message. includes buffer stats. + log.warn("Buffer is full: " + buffer.toString()); + + buffer.close(); + + break; + + } + + Thread.sleep(PRODUCER_RATE_MILLIS/* ms */); + + } + + // Wait for the consumer to drain the buffer. + while (!buffer.isEmpty()) { + + // fail if consumer is done. should run until cancelled. + assertFalse(consumerFuture.isDone()); + + Thread.sleep(CONSUMER_RATE_MILLIS/* ms */); + + } + + log.info("Buffer has been drained."); + + /* + * The producer should no longer be running. It should have failed + * when the blocking buffer was closed. Once failed, it should not + * have been re-scheduled for execution. + */ + try { + + // wait just a bit to be safe, but should be done. + producerFuture.get(50/* timeout */, TimeUnit.MILLISECONDS); + + } catch (TimeoutException ex) { + + fail("Producer is still running (blocked): " + ex, ex); + + } catch (ExecutionException ex) { + + if (InnerCause.isInnerCause(ex, BufferClosedException.class)) { + // This is an acceptable first cause for the producer. + return; + } + + // otherwise throw out the 1st cause for the producer. + throw ex; + + } + + /* + * The consumer should still be running. It will not do anything + * since there is nothing to be drained anymore. + */ + assertFalse(consumerFuture.isDone()); + + } catch(Throwable t) { + + /* + * Ensure worker threads are cancelled if the test fails. + */ + + consumerFuture.cancel(true/* mayInterruptIfRunning */); + + producerFuture.cancel(true/* mayInterruptIfRunning */); + + fail("Test failure: rootCause=" + t, t); + + } finally { + + service.shutdownNow(); + + if (!consumerFuture.isDone() || !producerFuture.isDone()) { + /* + * Note: If the service does not terminate after a timeout then + * we have a hung thread on add(). + */ + service.awaitTermination(5/* timeout */, TimeUnit.SECONDS); + } + + } + + } + + /** + * Stress test version of {@link #test_blockingBuffer_close()}. + */ + public void _testStress_blockingBuffer_close() throws InterruptedException, + ExecutionException, TimeoutException { + + for (int i = 0; i < 100; i++) { + + try { + + test_blockingBuffer_close(); + + } catch (Throwable t) { + + fail("Failed @ i=" + i + " : " + t, t); + + } + + } + + } + } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBufferWithChunks.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBufferWithChunks.java 2013-07-31 11:37:41 UTC (rev 7234) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/test/com/bigdata/relation/accesspath/TestBlockingBufferWithChunks.java 2013-07-31 11:56:25 UTC (rev 7235) @@ -156,7 +156,7 @@ final AtomicBoolean proceedFlag = new AtomicBoolean(false); // future of task writing a 3rd element on the buffer. - final Future producerFuture = service.submit(new Callable<Void>() { + final Future<?> producerFuture = service.submit(new Callable<Void>() { public Void call() throws Exception { lock.lockInterruptibly(); @@ -184,7 +184,7 @@ }); // future of task draining the buffer. - final Future consumerFuture = service.submit(new Callable<Void>() { + final Future<?> consumerFuture = service.submit(new Callable<Void>() { public Void call() throws Exception { try { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-31 11:37:48
|
Revision: 7234 http://bigdata.svn.sourceforge.net/bigdata/?rev=7234&view=rev Author: thompsonbry Date: 2013-07-31 11:37:41 +0000 (Wed, 31 Jul 2013) Log Message: ----------- Bug fix for [1] Rather, I would say that putIfAbsent() is simply not checking to see whether the QueryEngine has been shutdown. {{{ protected AbstractRunningQuery putIfAbsent(final UUID queryId, final AbstractRunningQuery runningQuery) { if (queryId == null) throw new IllegalArgumentException(); if (runningQuery == null) throw new IllegalArgumentException(); // First, check [runningQueries] w/o acquiring a lock. { final AbstractRunningQuery tmp = runningQueries.get(queryId); if (tmp != null) { // Found existing query. return tmp; } } /* * A lock is used to address a race condition here with the concurrent * registration and halt of a query. */ lock.lock(); try { // Test for a recently terminated query. final Future<Void> doneQueryFuture = doneQueries.get(queryId); if (doneQueryFuture != null) { // Throw out an appropriate exception for a halted query. handleDoneQuery(queryId, doneQueryFuture); // Should never get here. throw new AssertionError(); } // Test again for an active query while holding the lock. final AbstractRunningQuery tmp = runningQueries.putIfAbsent(queryId, runningQuery); if (tmp != null) { // Another thread won the race. return tmp; } // Query was newly registered. <== CHANGE IS APPLIED HERE return runningQuery; } finally { lock.unlock(); } } }}} The proposed change is below. The position of this change in the method is marked above. {{{ /* * Query was newly registered. */ try { // Verify QueryEngine is running. assertRunning(); } catch (IllegalStateException ex) { /** * The query engine either is not initialized or was shutdown * concurrent with adding the new query to the running query * table. We yank the query out of the running query table in * order to have no net effect and then throw out the exception * indicating that the QueryEngine has been shutdown. * * @see <a * href="https://sourceforge.net/apps/trac/bigdata/ticket/705"> * Race condition in QueryEngine.putIfAbsent() </a> */ runningQueries.remove(queryId, runningQuery); throw ex; } }}} The BOP, QueryEngine, and TestBigdataSailWithQuads test suites are unchanged by this modification. [1] https://sourceforge.net/apps/trac/bigdata/ticket/705 (Race condition in QueryEngine.putIfAbsent()) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-07-31 10:20:11 UTC (rev 7233) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2013-07-31 11:37:41 UTC (rev 7234) @@ -886,7 +886,11 @@ shutdown = true; - // stop the query engine. + /* + * Stop the QueryEngineTask: this is the task that accepts chunks that + * are available for evaluation and assigns them to the + * AbstractRunningQuery. + */ final Future<?> f = engineFuture.get(); if (f != null) { if (log.isInfoEnabled()) @@ -894,7 +898,7 @@ f.cancel(true/* mayInterruptIfRunning */); } - // stop the service on which we ran the query engine. + // stop the service on which we ran the QueryEngineTask. final ExecutorService s = engineService.get(); if (s != null) { if (log.isInfoEnabled()) @@ -1425,8 +1429,8 @@ * a safety check against UUID collisions which might be non-random. */ throw new RuntimeException("Query exists with that UUID: uuid=" - + runningQuery.getQueryId()); - + + runningQuery.getQueryId()); + } // final String tag = query.getProperty(QueryHints.TAG, @@ -1521,7 +1525,34 @@ } - // Query was newly registered. + /* + * Query was newly registered. + */ + try { + + // Verify QueryEngine is running. + assertRunning(); + + } catch (IllegalStateException ex) { + + /** + * The query engine either is not initialized or was shutdown + * concurrent with adding the new query to the running query + * table. We yank the query out of the running query table in + * order to have no net effect and then throw out the exception + * indicating that the QueryEngine has been shutdown. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/705"> + * Race condition in QueryEngine.putIfAbsent() </a> + */ + + runningQueries.remove(queryId, runningQuery); + + throw ex; + + } + return runningQuery; } finally { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-31 10:20:18
|
Revision: 7233 http://bigdata.svn.sourceforge.net/bigdata/?rev=7233&view=rev Author: thompsonbry Date: 2013-07-31 10:20:11 +0000 (Wed, 31 Jul 2013) Log Message: ----------- Bug fix for [1] [1] https://sourceforge.net/apps/trac/bigdata/ticket/706 (MultiSourceSequentialCloseableIterator.nextSource() can throw NPE) Commit is against the 1.2.x maintenance branch. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialCloseableIterator.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialCloseableIterator.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialCloseableIterator.java 2013-07-25 07:25:23 UTC (rev 7232) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialCloseableIterator.java 2013-07-31 10:20:11 UTC (rev 7233) @@ -131,19 +131,29 @@ // current is known to be [null]. lock.lock(); try { - /* Close iterator which has been consumed. + /** + * Close iterator which has been consumed. * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/706" + * > MultiSourceSequentialCloseableIterator.nextSource() can + * throw NPE </a> */ - if (log.isInfoEnabled()) - log.info("Closing source: " + current); - current.close(); + ICloseableIterator<E> t = this.current; + { + if (t != null) { + if (log.isInfoEnabled()) + log.info("Closing source: " + t); + t.close(); + } + } // remove the head of the queue (non-blocking) - while ((current = sources.poll()) != null) { - if (current.hasNext()) { - return current; + while ((t = current = sources.poll()) != null) { + if (t.hasNext()) { + return t; } else { // Note: should already be closed since exhausted. - current.close(); + t.close(); } } // no more sources with data, close while holding lock. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2013-07-25 07:25:39
|
Revision: 7232 http://bigdata.svn.sourceforge.net/bigdata/?rev=7232&view=rev Author: martyncutcher Date: 2013-07-25 07:25:23 +0000 (Thu, 25 Jul 2013) Log Message: ----------- Remove need for double setQuorumToken in ErrorTask by removing short-circuit test for no token change. This required additional condition testing in setQuorumToken to avoid regression in other test scenarios. Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-18 09:11:49 UTC (rev 7231) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-25 07:25:23 UTC (rev 7232) @@ -5351,12 +5351,25 @@ if (haLog.isInfoEnabled()) haLog.info("oldValue=" + oldValue + ", newToken=" + newValue); - if (oldValue == newValue && oldValue == haReadyToken) { - - // No change. - return; - - } + /* + * Note that previously the noTokenChange condition was a short circuit exit. + * + * This has been removed so we must account for this condition to determine + * correct transition + */ + final boolean noTokenChange = oldValue == newValue && oldValue == oldReady; +// if (oldValue == newValue && oldValue == oldReady) { +// log.warn("NO TOKEN CHANGE"); +// // No change. +// return; +// +// } + + /* + * Protect for potential NPE + */ + if (quorum == null) + return; // This quorum member. final QuorumService<HAGlue> localService = quorum.getClient(); @@ -5365,8 +5378,19 @@ final boolean didMeet; final boolean didJoinMetQuorum; final boolean didLeaveMetQuorum; + final boolean isJoined = localService != null && localService.isJoinedMember(newValue); - if (newValue == Quorum.NO_QUORUM && oldValue != Quorum.NO_QUORUM) { + /** + * Adding set of initial conditions to account for noTokenChange + * + * TODO: should there be more than one initial condition? + */ + if (noTokenChange && isJoined) { + didBreak = false; // quorum break. + didMeet = false; + didJoinMetQuorum = true; + didLeaveMetQuorum = haReadyToken != Quorum.NO_QUORUM; // if service was joined with met quorum, then it just left the met quorum. + } else if (newValue == Quorum.NO_QUORUM && oldValue != Quorum.NO_QUORUM) { /* * Quorum break. @@ -5395,9 +5419,8 @@ didLeaveMetQuorum = false; } else if (newValue != Quorum.NO_QUORUM // quorum exists - && haReadyToken == Quorum.NO_QUORUM // service was not joined with met quorum. - && localService != null // - && localService.isJoinedMember(newValue) // service is now joined with met quorum. + && oldReady == Quorum.NO_QUORUM // service was not joined with met quorum. + && isJoined // service is now joined with met quorum. ) { /* @@ -5410,9 +5433,8 @@ didLeaveMetQuorum = false; } else if (newValue != Quorum.NO_QUORUM // quorum exists - && haReadyToken != Quorum.NO_QUORUM // service was joined with met quorum - && localService != null // - && !localService.isJoinedMember(newValue) // service is no longer joined with met quorum. + && oldReady != Quorum.NO_QUORUM // service was joined with met quorum + && !isJoined // service is no longer joined with met quorum. ) { /* Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-07-18 09:11:49 UTC (rev 7231) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-07-25 07:25:23 UTC (rev 7232) @@ -1711,20 +1711,25 @@ log.info("Current Token: " + journal.getHAReady() + ", new: " + getQuorum().token()); /* - * FIXME Why is this first clearing the quorum token and then - * setting it? Document or revert this change. + * We must ensure that the token is reset to generate the required + * additional events. + * + * Processing the events at this point, after serviceLeave, is not sufficient. + * + * TODO It is possible that we could avoid the clear/set pattern with more + * state analysis in setQuorumToken and this should be investigated. */ - journal.setQuorumToken(Quorum.NO_QUORUM); + // journal.setQuorumToken(Quorum.NO_QUORUM); journal.setQuorumToken(getQuorum().token()); -// assert journal.getHAReady() == Quorum.NO_QUORUM; - - /** * Dispatch Events before entering SeekConsensus! Otherwise - * the events triggered by the serviceLeave() will not be - * handled until we enter SeekConsensus, and then they will - * just kick us out of SeekConsensus again. + * the events triggered by the serviceLeave() and setQuorumToken + * will not be handled until we enter SeekConsensus, and then + * when they are received SeekConsensus will fail. + * + * The intention of this action is to ensure that when SeekConsensus is + * entered the service is in a "clean" state. */ processEvents(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-18 09:11:56
|
Revision: 7231 http://bigdata.svn.sourceforge.net/bigdata/?rev=7231&view=rev Author: thompsonbry Date: 2013-07-18 09:11:49 +0000 (Thu, 18 Jul 2013) Log Message: ----------- Javadoc and some cleanup in AbstractQuorum and HAJournalServer. This completes my review of the current change set for [1]. [1] https://sourceforge.net/apps/trac/bigdata/ticket/695 Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-18 08:54:10 UTC (rev 7230) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-07-18 09:11:49 UTC (rev 7231) @@ -5477,9 +5477,7 @@ /* * Local abort (no quorum, so 2-phase abort not required). * - * FIXME HA : local abort on quorum break -or- service leave? - * - * FIXME HA : Abort the unisolated connection? + * FIXME HA : Abort the unisolated connection? (esp for group commit). */ doLocalAbort(); @@ -5594,7 +5592,7 @@ if (!installedRBs) { - /* + /** * If we install the RBs, then a local abort was already * done. Otherwise we need to do one now (this covers the * case when setQuorumToken() is called on the leader as @@ -5602,16 +5600,33 @@ * or is a follower, but the leader is not at * commitCounter==0L, etc. * - * If didJoinMetQuorum hen we MUST be leaving Resync, so should NOT - * need to complete a localAbort. BUT what should this imply - * about installedRBs? + * If didJoinMetQuorum==true, then we MUST be leaving the + * Resync run state in the HAJournalServer, so should NOT + * need to complete a localAbort. + * + * TODO We should still review this point. If we do not + * delete a committed HALog, then why is doLocalAbort() a + * problem here? Ah. It is because doLocalAbort() is hooked + * by the HAJournalServer and will trigger a serviceLeave() + * and a transition to the error state. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in + * SeekConsensus and is not participating in + * commits</a> */ - - if (log.isInfoEnabled()) - log.info("Calling localAbort if NOT didJoinMetQuorum: " + didJoinMetQuorum); - if (!didJoinMetQuorum) - doLocalAbort(); + if (log.isInfoEnabled()) + log.info("Calling localAbort if NOT didJoinMetQuorum: " + + didJoinMetQuorum); + + if (!didJoinMetQuorum) { + + doLocalAbort(); + + } + } haReadyCondition.signalAll(); // signal ALL. @@ -6793,20 +6808,30 @@ } finally { if (!vote.get()) { - /* - * Throw away our local write set. - */ - // doLocalAbort(); // enterErrorState will do this - - /* - * Exit the service - */ - // quorum.getActor().serviceLeave(); // enterErrorState will do this - - /* +// /* +// * Throw away our local write set. +// */ +// // doLocalAbort(); // enterErrorState will do this +// +// /* +// * Exit the service +// */ +// // quorum.getActor().serviceLeave(); // enterErrorState will do this +// + /** * Since the service refuses the commit, we want it to * enter an error state and then figure out whether it * needs to resynchronize with the quorum. + * <p> + * Note: Entering the error state will cause the local + * abort and serviceLeave() actions to be taken, which + * is why then have been commented out above. + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in + * SeekConsensus and is not participating in + * commits</a> */ quorum.getClient().enterErrorState(); } Modified: branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-07-18 08:54:10 UTC (rev 7230) +++ branches/READ_CACHE2/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-07-18 09:11:49 UTC (rev 7231) @@ -1641,6 +1641,10 @@ /** * Handle an error condition on the service. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and + * is not participating in commits</a> */ private class ErrorTask extends RunStateCallable<Void> { @@ -1704,33 +1708,40 @@ * explicitly above, that can be do invocations each time we pass through here! */ if (log.isInfoEnabled()) - log.info("Current Token: " + journal.getHAReady() + ", new: " + getQuorum().token()); + log.info("Current Token: " + journal.getHAReady() + ", new: " + getQuorum().token()); + + /* + * FIXME Why is this first clearing the quorum token and then + * setting it? Document or revert this change. + */ journal.setQuorumToken(Quorum.NO_QUORUM); journal.setQuorumToken(getQuorum().token()); - // journal.setQuorumToken(Quorum.NO_QUORUM); // assert journal.getHAReady() == Quorum.NO_QUORUM; /** - * dispatch Events before entering SeekConsensus! + * Dispatch Events before entering SeekConsensus! Otherwise + * the events triggered by the serviceLeave() will not be + * handled until we enter SeekConsensus, and then they will + * just kick us out of SeekConsensus again. */ processEvents(); - /* - * Note: We can spin here to give the service an opportunity to - * handle any backlog of events that trigger a transition into - * the ERROR state. This might not be strictly necessary, and we - * do not want to spin too long. - */ +// /* +// * Note: We can spin here to give the service an opportunity to +// * handle any backlog of events that trigger a transition into +// * the ERROR state. This might not be strictly necessary, and we +// * do not want to spin too long. +// */ +// +// final long sleepMillis = 0; // 2000; // TODO CONFIG? +// +// log.warn("Sleeping " + sleepMillis + "ms to let events quiesce."); +// +// if (sleepMillis > 0) +// Thread.sleep(sleepMillis); - final long sleepMillis = 0; // 2000; // TODO CONFIG? - - log.warn("Sleeping " + sleepMillis + "ms to let events quiesce."); - - if (sleepMillis > 0) - Thread.sleep(sleepMillis); - // Seek consensus. enterRunState(new SeekConsensusTask()); @@ -2416,8 +2427,16 @@ // Until joined with the met quorum. while (!getQuorum().getMember().isJoinedMember(token)) { - assert journal.getHAReady() == Quorum.NO_QUORUM; + // This service should not be joined yet (HAReady==-1). + final long haReady = journal.getHAReady(); + if (haReady != Quorum.NO_QUORUM) { + + throw new AssertionError( + "HAReady: Expecting NO_QUOURM, not " + haReady); + + } + // The current commit point on the local store. final long commitCounter = journal.getRootBlockView() .getCommitCounter(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-18 08:54:31
|
Revision: 7230 http://bigdata.svn.sourceforge.net/bigdata/?rev=7230&view=rev Author: thompsonbry Date: 2013-07-18 08:54:10 +0000 (Thu, 18 Jul 2013) Log Message: ----------- Removed unused log file. Please accept this change on your end. I have made it more than once. Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2013-07-18 08:31:04 UTC (rev 7229) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/io/writecache/BufferedWrite.java 2013-07-18 08:54:10 UTC (rev 7230) @@ -29,15 +29,12 @@ import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicReference; -import org.apache.log4j.Logger; - import com.bigdata.counters.CAT; import com.bigdata.counters.CounterSet; import com.bigdata.io.DirectBufferPool; import com.bigdata.io.FileChannelUtility; import com.bigdata.io.IBufferAccess; import com.bigdata.io.IReopenChannel; -import com.bigdata.rwstore.RWStore; /** * The BufferedWrite merges/elides sorted scattered writes to minimize IO @@ -56,8 +53,6 @@ */ public class BufferedWrite { - protected static final Logger log = Logger.getLogger(WriteCache.class); - /** * Used to determine the size of the allocation slot onto which a record is * being written. This is used to pad the size of the IO out to the size of This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-18 08:31:11
|
Revision: 7229 http://bigdata.svn.sourceforge.net/bigdata/?rev=7229&view=rev Author: thompsonbry Date: 2013-07-18 08:31:04 +0000 (Thu, 18 Jul 2013) Log Message: ----------- Javadoc and cross linked to the ticket. @see https://sourceforge.net/apps/trac/bigdata/ticket/695 Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-07-18 08:30:42 UTC (rev 7228) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-07-18 08:31:04 UTC (rev 7229) @@ -2070,7 +2070,12 @@ } /** - * Called from ErrorTask to ensure that events are processed before entering SeekConsensus + * Called from ErrorTask in HAJournalServer to ensure that events are + * processed before entering SeekConsensus. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and is not + * participating in commits</a> */ public void processEvents() { this.lock.lock(); Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-07-18 08:30:42 UTC (rev 7228) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-07-18 08:31:04 UTC (rev 7229) @@ -374,11 +374,17 @@ } /** - * Called from ErrorTask to process the event queue before - * moving to SeekConsensus. + * Called from ErrorTask in HAJournalServer to ensure that events are + * processed before entering SeekConsensus. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and is not + * participating in commits</a> */ protected void processEvents() { - pipelineImpl.processEvents(); + + pipelineImpl.processEvents(); + } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-18 08:30:48
|
Revision: 7228 http://bigdata.svn.sourceforge.net/bigdata/?rev=7228&view=rev Author: thompsonbry Date: 2013-07-18 08:30:42 +0000 (Thu, 18 Jul 2013) Log Message: ----------- javdoc on IHALogWriter. I lifted out the isCommitted() invocation in remove() so it is just called once. This should be semantically identical. I just feel more comfortable knowing that the logged out value is identical to the value considered in the if() statement. @see https://sourceforge.net/apps/trac/bigdata/ticket/695 Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/IHALogWriter.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-07-18 08:13:32 UTC (rev 7227) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/HALogWriter.java 2013-07-18 08:30:42 UTC (rev 7228) @@ -603,11 +603,15 @@ } - /** - * On various error conditions we may need to remove the log - * - * @throws IOException - */ + /** + * On various error conditions we may need to remove the log + * + * @throws IOException + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and is + * not participating in commits\xA7</a> + */ private void remove() throws IOException { final Lock lock = m_stateLock.writeLock(); @@ -620,15 +624,18 @@ * Conditional remove iff file is open. Will not remove * something that has been closed. */ + final boolean isCommitted = m_state.isCommitted(); + if (haLog.isInfoEnabled()) - haLog.info("Will close: " + m_state.m_haLogFile + ", committed: " + m_state.isCommitted()); + haLog.info("Will close: " + m_state.m_haLogFile + ", committed: " + isCommitted); m_state.forceCloseAll(); - if (m_state.isCommitted()) return; // Do not remove a sealed HALog file! + if (isCommitted) return; // Do not remove a sealed HALog file! if (haLog.isInfoEnabled()) haLog.info("Will remove: " + m_state.m_haLogFile, new RuntimeException()); + if (m_state.m_haLogFile.exists() && !m_state.m_haLogFile.delete()) { /* @@ -653,9 +660,7 @@ } - /** - * Disable (and remove) the current log file if one is open. - */ + @Override public void disableHALog() throws IOException { if (haLog.isInfoEnabled()) Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/IHALogWriter.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/IHALogWriter.java 2013-07-18 08:13:32 UTC (rev 7227) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/ha/halog/IHALogWriter.java 2013-07-18 08:30:42 UTC (rev 7228) @@ -88,7 +88,13 @@ public void closeHALog(IRootBlockView rootBlock) throws IOException; /** - * Disable (and remove) the current log file if one is open. + * Disable (and remove) the current log file if one is open (an HALog file + * which has been committed by applying its closing root block is NOT + * removed). + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and is + * not participating in commits</a> */ public void disableHALog() throws IOException; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-07-18 08:13:55
|
Revision: 7227 http://bigdata.svn.sourceforge.net/bigdata/?rev=7227&view=rev Author: thompsonbry Date: 2013-07-18 08:13:32 +0000 (Thu, 18 Jul 2013) Log Message: ----------- Javadoc clarifications in QuorumStateChangeListener and reviewer the implementation in QuorumWatcherBase against the javadoc for mutual consistency. Add link to the ticket to AbstractQuorum. See https://sourceforge.net/apps/trac/bigdata/ticket/695 Modified Paths: -------------- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumStateChangeListener.java Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-07-15 16:46:00 UTC (rev 7226) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-07-18 08:13:32 UTC (rev 7227) @@ -3208,7 +3208,16 @@ if (client != null) { // Notify all quorum members that a service left. try { - // PREVIOUSLY called client.serviceLeave() unconditionally + /** + * PREVIOUSLY called client.serviceLeave() + * unconditionally + * + * @see <a + * href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in + * SeekConsensus and is not participating in + * commits </a> + */ final UUID clientId = client.getServiceId(); if (serviceId.equals(clientId)) client.serviceLeave(); Modified: branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumStateChangeListener.java =================================================================== --- branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumStateChangeListener.java 2013-07-15 16:46:00 UTC (rev 7226) +++ branches/READ_CACHE2/bigdata/src/java/com/bigdata/quorum/QuorumStateChangeListener.java 2013-07-18 08:13:32 UTC (rev 7227) @@ -61,6 +61,10 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id: QuorumStateChangeListener.java 4069 2011-01-09 20:58:02Z * thompsonbry $ + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/695"> + * HAJournalServer reports "follower" but is in SeekConsensus and is not + * participating in commits (javadoc clarifications)</a> */ public interface QuorumStateChangeListener { @@ -75,7 +79,7 @@ void memberRemove(); /** - * Invoked when the service is added to the write pipeline. The service + * Invoked when this service is added to the write pipeline. The service * always enters at the of the pipeline. */ void pipelineAdd(); @@ -86,16 +90,17 @@ void pipelineRemove(); /** - * Invoked when a service already in the pipeline becomes the first service - * in the write pipeline because all previous services in the pipeline order - * have been removed from the pipeline. + * Invoked for this service when the service is already in the pipeline and + * this service becomes the first service in the write pipeline because all + * previous services in the pipeline order have been removed from the + * pipeline (failover into the leader position). */ void pipelineElectedLeader(); /** - * Invoked when the downstream service in the write pipeline has changed. - * Services always enter at the end of the write pipeline, but may be - * removed at any position in the write pipeline. + * Invoked for this service when the downstream service in the write + * pipeline has changed. Services always enter at the end of the write + * pipeline, but may be removed at any position in the write pipeline. * * @param oldDownstreamId * The {@link UUID} of the service which <em>was</em> downstream @@ -106,13 +111,13 @@ * from this service in the write pipeline and <code>null</code> * iff this service <em>is</em> the last service in the pipeline. */ - void pipelineChange(UUID oldDownStreamId, UUID newDownStreamId); + void pipelineChange(final UUID oldDownStreamId, final UUID newDownStreamId); /** - * Invoked when the upstream service in the write pipeline has been removed. - * This hook provides an opportunity for the service to close out its - * connection with the old upstream service and to prepare to establish a - * new connection with the new downstream service. + * Invoked for this service when the upstream service in the write pipeline + * has been removed. This hook provides an opportunity for this service to + * close out its connection with the old upstream service and to prepare to + * establish a new connection with the new downstream service. */ void pipelineUpstreamChange(); @@ -157,6 +162,8 @@ /** * Invoked when the consensus is lost. Services do not withdraw their cast * votes until a quorum breaks and a new consensus needs to be established. + * This message is sent to each member service regardless of whether or not + * they participated in the consensus. * * @see #consensus(long) */ @@ -185,8 +192,10 @@ /** * Invoked when a quorum meets. The state of the met quorum can be queried * using the <i>token</i>. Quorum members can use this to decide whether - * they are the leader (using {@link #isLeader(long)} or joined as a - * follower (using {@link #isFollower(long)}). + * they are the leader (using {@link #isLeader(long)}, joined as a + * follower (using {@link #isFollower(long)}), or do not participate + * in the quorum (this message is sent to all quorum members, so this + * service might not be part of the met qourum). * <p> * The following pre-conditions will be satisfied before this message is * sent to the {@link QuorumMember}: @@ -231,7 +240,8 @@ * current root block; and (b) casting a vote for their current commit time. * Once a consensus is reached on the current commit time, services will be * joined in the vote order, a new leader will be elected, and the quorum - * will meet again. + * will meet again. This message is sent to all member services, regardless + * of whether they were joined with the met quorum. */ void quorumBreak(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |